UNPKG

7.8 kBJavaScriptView Raw
1//filter will reemit the data if cb(err,pass) pass is truthy
2
3// reduce is more tricky
4// maybe we want to group the reductions or emit progress updates occasionally
5// the most basic reduce just emits one 'data' event after it has recieved 'end'
6
7var Stream = require('stream').Stream
8 , es = exports
9 , through = require('through')
10 , from = require('from')
11 , duplex = require('duplexer')
12 , map = require('map-stream')
13 , pause = require('pause-stream')
14 , split = require('split')
15 , pipeline = require('stream-combiner')
16 , immediately = global.setImmediate || process.nextTick;
17
18es.Stream = Stream //re-export Stream from core
19es.through = through
20es.from = from
21es.duplex = duplex
22es.map = map
23es.pause = pause
24es.split = split
25es.pipeline = es.connect = es.pipe = pipeline
26// merge / concat
27//
28// combine multiple streams into a single stream.
29// will emit end only once
30
31es.concat = //actually this should be called concat
32es.merge = function (/*streams...*/) {
33 var toMerge = [].slice.call(arguments)
34 if (toMerge.length === 1 && (toMerge[0] instanceof Array)) {
35 toMerge = toMerge[0] //handle array as arguments object
36 }
37 var stream = new Stream()
38 stream.setMaxListeners(0) // allow adding more than 11 streams
39 var endCount = 0
40 stream.writable = stream.readable = true
41
42 if (toMerge.length) {
43 toMerge.forEach(function (e) {
44 e.pipe(stream, {end: false})
45 var ended = false
46 e.on('end', function () {
47 if(ended) return
48 ended = true
49 endCount ++
50 if(endCount == toMerge.length)
51 stream.emit('end')
52 })
53 })
54 } else {
55 process.nextTick(function () {
56 stream.emit('end')
57 })
58 }
59
60 stream.write = function (data) {
61 this.emit('data', data)
62 }
63 stream.destroy = function () {
64 toMerge.forEach(function (e) {
65 if(e.destroy) e.destroy()
66 })
67 }
68 return stream
69}
70
71
72// writable stream, collects all events into an array
73// and calls back when 'end' occurs
74// mainly I'm using this to test the other functions
75
76es.collect =
77es.writeArray = function (done) {
78 if ('function' !== typeof done)
79 throw new Error('function writeArray (done): done must be function')
80
81 var a = new Stream ()
82 , array = [], isDone = false
83 a.write = function (l) {
84 array.push(l)
85 }
86 a.end = function () {
87 isDone = true
88 done(null, array)
89 }
90 a.writable = true
91 a.readable = false
92 a.destroy = function () {
93 a.writable = a.readable = false
94 if(isDone) return
95 done(new Error('destroyed before end'), array)
96 }
97 return a
98}
99
100//return a Stream that reads the properties of an object
101//respecting pause() and resume()
102
103es.readArray = function (array) {
104 var stream = new Stream()
105 , i = 0
106 , paused = false
107 , ended = false
108
109 stream.readable = true
110 stream.writable = false
111
112 if(!Array.isArray(array))
113 throw new Error('event-stream.read expects an array')
114
115 stream.resume = function () {
116 if(ended) return
117 paused = false
118 var l = array.length
119 while(i < l && !paused && !ended) {
120 stream.emit('data', array[i++])
121 }
122 if(i == l && !ended)
123 ended = true, stream.readable = false, stream.emit('end')
124 }
125 process.nextTick(stream.resume)
126 stream.pause = function () {
127 paused = true
128 }
129 stream.destroy = function () {
130 ended = true
131 stream.emit('close')
132 }
133 return stream
134}
135
136//
137// readable (asyncFunction)
138// return a stream that calls an async function while the stream is not paused.
139//
140// the function must take: (count, callback) {...
141//
142
143es.readable =
144function (func, continueOnError) {
145 var stream = new Stream()
146 , i = 0
147 , paused = false
148 , ended = false
149 , reading = false
150
151 stream.readable = true
152 stream.writable = false
153
154 if('function' !== typeof func)
155 throw new Error('event-stream.readable expects async function')
156
157 stream.on('end', function () { ended = true })
158
159 function get (err, data) {
160
161 if(err) {
162 stream.emit('error', err)
163 if(!continueOnError) stream.emit('end')
164 } else if (arguments.length > 1)
165 stream.emit('data', data)
166
167 immediately(function () {
168 if(ended || paused || reading) return
169 try {
170 reading = true
171 func.call(stream, i++, function () {
172 reading = false
173 get.apply(null, arguments)
174 })
175 } catch (err) {
176 stream.emit('error', err)
177 }
178 })
179 }
180 stream.resume = function () {
181 paused = false
182 get()
183 }
184 process.nextTick(get)
185 stream.pause = function () {
186 paused = true
187 }
188 stream.destroy = function () {
189 stream.emit('end')
190 stream.emit('close')
191 ended = true
192 }
193 return stream
194}
195
196
197//
198// map sync
199//
200
201es.mapSync = function (sync) {
202 return es.through(function write(data) {
203 var mappedData
204 try {
205 mappedData = sync(data)
206 } catch (err) {
207 return this.emit('error', err)
208 }
209 if (mappedData !== undefined)
210 this.emit('data', mappedData)
211 })
212}
213
214//
215// filterSync
216//
217
218es.filterSync = function (test) {
219 return es.through(function(data){
220 var s = this
221 if (test(data)) {
222 s.queue(data)
223 }
224 });
225}
226
227//
228// flatmapSync
229//
230
231es.flatmapSync = function (mapper) {
232 return es.through(function(data) {
233 var s = this
234 data.forEach(function(e) {
235 s.queue(mapper(e))
236 })
237 })
238}
239
240//
241// log just print out what is coming through the stream, for debugging
242//
243
244es.log = function (name) {
245 return es.through(function (data) {
246 var args = [].slice.call(arguments)
247 if(name) console.error(name, data)
248 else console.error(data)
249 this.emit('data', data)
250 })
251}
252
253
254//
255// child -- pipe through a child process
256//
257
258es.child = function (child) {
259
260 return es.duplex(child.stdin, child.stdout)
261
262}
263
264//
265// parse
266//
267// must be used after es.split() to ensure that each chunk represents a line
268// source.pipe(es.split()).pipe(es.parse())
269
270es.parse = function (options) {
271 var emitError = !!(options ? options.error : false)
272 return es.through(function (data) {
273 var obj
274 try {
275 if(data) //ignore empty lines
276 obj = JSON.parse(data.toString())
277 } catch (err) {
278 if (emitError)
279 return this.emit('error', err)
280 return console.error(err, 'attempting to parse:', data)
281 }
282 //ignore lines that where only whitespace.
283 if(obj !== undefined)
284 this.emit('data', obj)
285 })
286}
287//
288// stringify
289//
290
291es.stringify = function () {
292 var Buffer = require('buffer').Buffer
293 return es.mapSync(function (e){
294 return JSON.stringify(Buffer.isBuffer(e) ? e.toString() : e) + '\n'
295 })
296}
297
298//
299// replace a string within a stream.
300//
301// warn: just concatenates the string and then does str.split().join().
302// probably not optimal.
303// for smallish responses, who cares?
304// I need this for shadow-npm so it's only relatively small json files.
305
306es.replace = function (from, to) {
307 return es.pipeline(es.split(from), es.join(to))
308}
309
310//
311// join chunks with a joiner. just like Array#join
312// also accepts a callback that is passed the chunks appended together
313// this is still supported for legacy reasons.
314//
315
316es.join = function (str) {
317
318 //legacy api
319 if('function' === typeof str)
320 return es.wait(str)
321
322 var first = true
323 return es.through(function (data) {
324 if(!first)
325 this.emit('data', str)
326 first = false
327 this.emit('data', data)
328 return true
329 })
330}
331
332
333//
334// wait. callback when 'end' is emitted, with all chunks appended as string.
335//
336
337es.wait = function (callback) {
338 var arr = []
339 return es.through(function (data) { arr.push(data) },
340 function () {
341 var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
342 : arr.join('')
343 this.emit('data', body)
344 this.emit('end')
345 if(callback) callback(null, body)
346 })
347}
348
349es.pipeable = function () {
350 throw new Error('[EVENT-STREAM] es.pipeable is deprecated')
351}