UNPKG

9.77 kBMarkdownView Raw
1# EventStream
2
3[Streams](http://nodejs.org/api/stream.html "Stream") are node's best and most misunderstood idea, and EventStream is a toolkit to make creating and working with streams easy.
4
5Normally, streams are only used for IO, but in event stream we send all kinds of objects down the pipe. If your application's input and output are streams, shouldn't the throughput be a stream too?
6
7The *EventStream* functions resemble the array functions, because Streams are like Arrays, but laid out in time, rather than in memory.
8
9All the `event-stream` functions return instances of `Stream`.
10
11`event-stream` creates [0.8 streams](https://github.com/joyent/node/blob/v0.8/doc/api/stream.markdown), which are compatible with [0.10 streams](http://nodejs.org/api/stream.html "Stream").
12
13>NOTE: I shall use the term <em>"through stream"</em> to refer to a stream that is writable <em>and</em> readable.
14
15>NOTE for Gulp users: Merge will not work for gulp 4. [merge-stream](https://npmjs.com/merge-stream) should be used.
16
17### [simple example](https://github.com/dominictarr/event-stream/blob/master/examples/pretty.js):
18
19``` js
20//pretty.js
21
22if(!module.parent) {
23 var es = require('event-stream')
24 var inspect = require('util').inspect
25
26 process.stdin //connect streams together with `pipe`
27 .pipe(es.split()) //split stream to break on newlines
28 .pipe(es.map(function (data, cb) { //turn this async function into a stream
29 cb(null
30 , inspect(JSON.parse(data))) //render it nicely
31 }))
32 .pipe(process.stdout) // pipe it to stdout !
33}
34```
35run it ...
36
37``` bash
38curl -sS registry.npmjs.org/event-stream | node pretty.js
39```
40
41[node Stream documentation](http://nodejs.org/api/stream.html)
42
43## through (write?, end?)
44
45Re-emits data synchronously. Easy way to create synchronous through streams.
46Pass in optional `write` and `end` methods. They will be called in the
47context of the stream. Use `this.pause()` and `this.resume()` to manage flow.
48Check `this.paused` to see current flow state. (write always returns `!this.paused`)
49
50this function is the basis for most of the synchronous streams in `event-stream`.
51
52``` js
53
54es.through(function write(data) {
55 this.emit('data', data)
56 //this.pause()
57 },
58 function end () { //optional
59 this.emit('end')
60 })
61
62```
63
64## map (asyncFunction)
65
66Create a through stream from an asynchronous function.
67
68``` js
69var es = require('event-stream')
70
71es.map(function (data, callback) {
72 //transform data
73 // ...
74 callback(null, data)
75})
76
77```
78
79Each map MUST call the callback. It may callback with data, with an error or with no arguments,
80
81 * `callback()` drop this data.
82 this makes the map work like `filter`,
83 note:`callback(null,null)` is not the same, and will emit `null`
84
85 * `callback(null, newData)` turn data into newData
86
87 * `callback(error)` emit an error for this item.
88
89>Note: if a callback is not called, `map` will think that it is still being processed,
90>every call must be answered or the stream will not know when to end.
91>
92>Also, if the callback is called more than once, every call but the first will be ignored.
93
94## mapSync (syncFunction)
95
96Same as `map`, but the callback is called synchronously. Based on `es.through`
97
98## flatmapSync (syncFunction)
99
100Map elements nested.
101
102``` js
103var es = require('event-stream')
104
105es.flatmapSync(function (data) {
106 //transform data
107 // ...
108 return data
109})
110
111```
112
113## filterSync (syncFunction)
114
115Filter elements.
116
117``` js
118var es = require('event-stream')
119
120es.filterSync(function (data) {
121 return data > 0
122})
123
124```
125
126## split (matcher)
127
128Break up a stream and reassemble it so that each line is a chunk. matcher may be a `String`, or a `RegExp`
129
130Example, read every line in a file ...
131
132``` js
133fs.createReadStream(file, {flags: 'r'})
134 .pipe(es.split())
135 .pipe(es.map(function (line, cb) {
136 //do something with the line
137 cb(null, line)
138 }))
139```
140
141`split` takes the same arguments as `string.split` except it defaults to '\n' instead of ',', and the optional `limit` parameter is ignored.
142[String#split](https://developer.mozilla.org/en/JavaScript/Reference/Global_Objects/String/split)
143
144**NOTE** - Maintaining Line Breaks
145If you want to process each line of the stream, transform the data, reassemble, and **KEEP** the line breaks the example will look like this:
146
147```javascript
148fs.createReadStream(file, {flags: 'r'})
149 .pipe(es.split(/(\r?\n)/))
150 .pipe(es.map(function (line, cb) {
151 //do something with the line
152 cb(null, line)
153 }))
154```
155
156This technique is mentioned in the [underlying documentation](https://www.npmjs.com/package/split#keep-matched-splitter) for the split npm package.
157
158## join (separator)
159
160Create a through stream that emits `separator` between each chunk, just like Array#join.
161
162(for legacy reasons, if you pass a callback instead of a string, join is a synonym for `es.wait`)
163
164## merge (stream1,...,streamN) or merge (streamArray)
165> concat → merge
166
167Merges streams into one and returns it.
168Incoming data will be emitted as soon it comes into - no ordering will be applied (for example: `data1 data1 data2 data1 data2` - where `data1` and `data2` is data from two streams).
169Counts how many streams were passed to it and emits end only when all streams emitted end.
170
171```js
172es.merge(
173 process.stdout,
174 process.stderr
175).pipe(fs.createWriteStream('output.log'));
176```
177
178It can also take an Array of streams as input like this:
179```js
180es.merge([
181 fs.createReadStream('input1.txt'),
182 fs.createReadStream('input2.txt')
183]).pipe(fs.createWriteStream('output.log'));
184```
185
186## replace (from, to)
187
188Replace all occurrences of `from` with `to`. `from` may be a `String` or a `RegExp`.
189Works just like `string.split(from).join(to)`, but streaming.
190
191
192## parse
193
194Convenience function for parsing JSON chunks. For newline separated JSON,
195use with `es.split`. By default it logs parsing errors by `console.error`;
196for another behaviour, transforms created by `es.parse({error: true})` will
197emit error events for exceptions thrown from `JSON.parse`, unmodified.
198
199``` js
200fs.createReadStream(filename)
201 .pipe(es.split()) //defaults to lines.
202 .pipe(es.parse())
203```
204
205## stringify
206
207convert javascript objects into lines of text. The text will have whitespace escaped and have a `\n` appended, so it will be compatible with `es.parse`
208
209``` js
210objectStream
211 .pipe(es.stringify())
212 .pipe(fs.createWriteStream(filename))
213```
214
215## readable (asyncFunction)
216
217create a readable stream (that respects pause) from an async function.
218while the stream is not paused,
219the function will be polled with `(count, callback)`,
220and `this` will be the readable stream.
221
222``` js
223
224es.readable(function (count, callback) {
225 if(streamHasEnded)
226 return this.emit('end')
227
228 //...
229
230 this.emit('data', data) //use this way to emit multiple chunks per call.
231
232 callback() // you MUST always call the callback eventually.
233 // the function will not be called again until you do this.
234})
235```
236you can also pass the data and the error to the callback.
237you may only call the callback once.
238calling the same callback more than once will have no effect.
239
240## readArray (array)
241
242Create a readable stream from an Array.
243
244Just emit each item as a data event, respecting `pause` and `resume`.
245
246``` js
247 var es = require('event-stream')
248 , reader = es.readArray([1,2,3])
249
250 reader.pipe(...)
251```
252
253If you want the stream behave like a 0.10 stream you will need to wrap it using [`Readable.wrap()`](http://nodejs.org/api/stream.html#stream_readable_wrap_stream) function. Example:
254
255``` js
256 var s = new stream.Readable({objectMode: true}).wrap(es.readArray([1,2,3]));
257```
258
259## writeArray (callback)
260
261create a writeable stream from a callback,
262all `data` events are stored in an array, which is passed to the callback when the stream ends.
263
264``` js
265 var es = require('event-stream')
266 , reader = es.readArray([1, 2, 3])
267 , writer = es.writeArray(function (err, array){
268 //array deepEqual [1, 2, 3]
269 })
270
271 reader.pipe(writer)
272```
273
274## pause ()
275
276A stream that buffers all chunks when paused.
277
278
279``` js
280 var ps = es.pause()
281 ps.pause() //buffer the stream, also do not allow 'end'
282 ps.resume() //allow chunks through
283```
284
285## duplex (writeStream, readStream)
286
287Takes a writable stream and a readable stream and makes them appear as a readable writable stream.
288
289It is assumed that the two streams are connected to each other in some way.
290
291(This is used by `pipeline` and `child`.)
292
293``` js
294 var grep = cp.exec('grep Stream')
295
296 es.duplex(grep.stdin, grep.stdout)
297```
298
299## child (child_process)
300
301Create a through stream from a child process ...
302
303``` js
304 var cp = require('child_process')
305
306 es.child(cp.exec('grep Stream')) // a through stream
307
308```
309
310## wait (callback)
311
312waits for stream to emit 'end'.
313joins chunks of a stream into a single string or buffer.
314takes an optional callback, which will be passed the
315complete string/buffer when it receives the 'end' event.
316
317also, emits a single 'data' event.
318
319``` js
320
321readStream.pipe(es.wait(function (err, body) {
322 // have complete text here.
323}))
324
325```
326
327# Other Stream Modules
328
329These modules are not included as a part of *EventStream* but may be
330useful when working with streams.
331
332## [reduce (syncFunction, initial)](https://github.com/parshap/node-stream-reduce)
333
334Like `Array.prototype.reduce` but for streams. Given a sync reduce
335function and an initial value it will return a through stream that emits
336a single data event with the reduced value once the input stream ends.
337
338``` js
339var reduce = require("stream-reduce");
340process.stdin.pipe(reduce(function(acc, data) {
341 return acc + data.length;
342}, 0)).on("data", function(length) {
343 console.log("stdin size:", length);
344});
345```