UNPKG

4.47 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter
2
3module.exports = Channel
4
5function Channel(options) {
6 options = options || {}
7
8 EventEmitter.call(this, options)
9
10 this.concurrency = options.concurrency || Infinity
11 // closed by default
12 this.closed = !(options.closed === false || options.open)
13 this.discard = options.discard
14
15 this.fns = []
16 this.results = []
17
18 this.fnIndex = 0
19 this.resultIndex = 0
20 this.pending = 0
21 this.errors = 0
22
23 this.reading = true
24}
25
26Object.defineProperty(Channel.prototype, 'pushable', {
27 get: function () {
28 return this.reading && this.pending < this.concurrency
29 }
30})
31
32// read queue
33Object.defineProperty(Channel.prototype, 'queue', {
34 get: function () {
35 var queue = this.results.length + this.fns.length
36 if (this.discard) queue += this.pending
37 return queue
38 }
39})
40
41// you can read from a channel if there's a read queue or if this channel is not closed
42Object.defineProperty(Channel.prototype, 'readable', {
43 get: function () {
44 return this.queue || !this.closed
45 }
46})
47
48Channel.prototype.open = function () {
49 this.closed = false
50 this.emit('open')
51 return this
52}
53
54Channel.prototype.close = function () {
55 this.closed = true
56 this.emit('close')
57 return this
58}
59
60// when the channel is open,
61// wait for the first push event (returns true)
62// or close event (returns false)
63Channel.prototype.pushed = function (done) {
64 if (this.closed) return done(null, false)
65 if (this.queue) return done(null, true)
66
67 var self = this
68 this.on('close', close)
69 this.on('push', push)
70
71 function close() {
72 cleanup()
73 done(null, false)
74 }
75
76 function push() {
77 cleanup()
78 done(null, true)
79 }
80
81 function cleanup() {
82 self.removeListener('close', close)
83 self.removeListener('push', push)
84 }
85}
86
87/**
88 * Push a function to the channel.
89 * If `null`, just means closing the channel.
90 */
91
92Channel.prototype.push = function (fn) {
93 if (fn == null) return this.close()
94
95 if (typeof fn !== 'function')
96 throw new TypeError('you may only push functions')
97
98 this.fns.push(fn)
99 if (!this.discard) this.results.length++
100 this._call()
101 this.emit('push')
102 return this
103}
104
105Channel.prototype._call = function () {
106 if (!this.pushable) return
107 if (!this.fns.length) return
108
109 var fn = this.fns.shift()
110 var index = this.fnIndex++
111 this.pending++
112
113 var self = this
114 fn(function (err, res) {
115 self.pending--
116 if (err) {
117 self.reading = false
118 self.errors++
119 if (self.discard) {
120 self.results.push(err)
121 } else {
122 self.results[index - self.resultIndex] = err
123 }
124 } else if (!self.discard) {
125 self.results[index - self.resultIndex] = arguments.length > 2
126 ? [].slice.call(arguments, 1)
127 : res
128 }
129
130 self.emit(String(index))
131 self.emit('callback')
132 self._call()
133 })
134}
135
136Channel.prototype._read = function (done) {
137 var results = this.results
138 // continue executing callbacks if no errors occured
139 if (!this.reading && !this.errors) this.reading = true
140
141 if (!this.discard) {
142 if (results.length && 0 in results) {
143 var res = results.shift()
144 this.resultIndex++
145 if (res instanceof Error) {
146 this.errors--
147 done(res)
148 } else {
149 done(null, res)
150 }
151 return
152 }
153 } else if (results.length) {
154 // these can only be errors
155 this.errors--
156 done(results.shift())
157 return
158 }
159
160 // wait for the next result in the queue
161 this._next(done)
162}
163
164Channel.prototype._next = function (done) {
165 var event = this.discard
166 ? 'callback'
167 : String(this.resultIndex)
168
169 var self = this
170 this.once(event, onevent)
171 if (!this.closed && !this.queue) this.once('close', onclose)
172
173 function onevent() {
174 cleanup()
175 if (!this.discard) {
176 var res = this.results.shift()
177 this.resultIndex++
178 if (res instanceof Error) {
179 this.errors--
180 done(res)
181 } else {
182 done(null, res)
183 }
184 } else if (this.results.length) {
185 // these can only be errors
186 this.errors--
187 done(this.results.shift())
188 } else {
189 done()
190 }
191 }
192
193 function onclose() {
194 cleanup()
195 done()
196 }
197
198 function cleanup() {
199 self.removeListener(event, onevent)
200 self.removeListener('close', onclose)
201 }
202}
203
204Channel.prototype._flush = function* () {
205 if (this.discard) {
206 while (this.readable) yield this
207 return
208 }
209
210 var results = []
211 while (this.readable) results.push(yield this)
212 return results
213}
\No newline at end of file