UNPKG

10.7 kBMarkdownView Raw
1# pull-stream
2
3Minimal Pipeable Pull-stream
4
5In [classic-streams](https://github.com/nodejs/node-v0.x-archive/blob/v0.8/doc/api/stream.markdown),
6streams _push_ data to the next stream in the pipeline.
7In [new-streams](https://github.com/nodejs/node-v0.x-archive/blob/v0.10/doc/api/stream.markdown),
8data is pulled out of the source stream, into the destination.
9`pull-stream` is a minimal take on streams,
10pull streams work great for "object" streams as well as streams of raw text or binary data.
11
12[![build status](https://secure.travis-ci.org/pull-stream/pull-stream.png)](https://travis-ci.org/pull-stream/pull-stream)
13
14
15## Quick Example
16
17Stat some files:
18
19```js
20pull(
21 pull.values(['file1', 'file2', 'file3']),
22 pull.asyncMap(fs.stat),
23 pull.collect(function (err, array) {
24 console.log(array)
25 })
26)
27```
28note that `pull(a, b, c)` is basically the same as `a.pipe(b).pipe(c)`.
29
30to grok how pull-streams work, read through [pull-streams by example](https://github.com/dominictarr/pull-stream-examples)
31
32## How do I do X with pull-streams?
33
34There is a module for that!
35
36Check the [pull-stream FAQ](https://github.com/pull-stream/pull-stream-faq)
37and post an issue if you have a question that is not on that.
38
39## Compatibily with node streams
40
41pull-streams are not _directly_ compatible with node streams,
42but pull-streams can be converted into node streams with
43[pull-stream-to-stream](https://github.com/pull-stream/pull-stream-to-stream)
44and node streams can be converted into pull-stream using [stream-to-pull-stream](https://github.com/pull-stream/stream-to-pull-stream)
45correct back pressure is preserved.
46
47### Readable & Reader vs. Readable & Writable
48
49Instead of a readable stream, and a writable stream, there is a `readable` stream,
50 (aka "Source") and a `reader` stream (aka "Sink"). Through streams
51is a Sink that returns a Source.
52
53See also:
54* [Sources](./docs/sources/index.md)
55* [Throughs](./docs/throughs/index.md)
56* [Sinks](./docs/sinks/index.md)
57
58### Source (aka, Readable)
59
60The readable stream is just a `function read(end, cb)`,
61that may be called many times,
62and will (asynchronously) `cb(null, data)` once for each call.
63
64To signify an end state, the stream eventually returns `cb(err)` or `cb(true)`.
65When indicating a terminal state, `data` *must* be ignored.
66
67The `read` function *must not* be called until the previous call has called back.
68Unless, it is a call to abort the stream (`read(truthy, cb)`).
69
70```js
71var n = 5;
72
73// random is a source 5 of random numbers.
74function random (end, cb) {
75 if(end) return cb(end)
76 // only read n times, then stop.
77 if(0 > --n) return cb(true)
78 cb(null, Math.random())
79}
80
81```
82
83### Sink; (aka, Reader, "writable")
84
85A sink is just a `reader` function that calls a Source (read function),
86until it decideds to stop, or the readable ends. `cb(err || true)`
87
88All [Throughs](./docs/throughs/index.md)
89and [Sinks](./docs/sinks/index.md)
90are reader streams.
91
92```js
93// logger reads a source and logs it.
94function logger (read) {
95 read(null, function next(end, data) {
96 if(end === true) return
97 if(end) throw end
98
99 console.log(data)
100 read(null, next)
101 })
102}
103```
104
105Since these are just functions, you can pass them to each other!
106
107```js
108logger(random) //"pipe" the streams.
109
110```
111
112but, it's easier to read if you use's pull-stream's `pull` method
113
114```js
115var pull = require('pull-stream')
116
117pull(random, logger)
118```
119
120### Creating reusable streams
121
122When working with pull streams it is common to create functions that return a stream.
123This is because streams contain mutable state and so can only be used once.
124In the above example, once `random` has been connected to a sink and has produced 5 random numbers it will not produce any more random numbers if connected to another sink.
125
126Therefore, use a function like this to create a random number generating stream that can be reused:
127
128```js
129
130// create a stream of n random numbers
131function createRandomStream (n) {
132 return function randomReadable (end, cb) {
133 if(end) return cb(end)
134 if(0 > --n) return cb(true)
135 cb(null, Math.random())
136 }
137}
138
139pull(createRandomStream(5), logger)
140```
141
142
143### Through
144
145A through stream is a reader on one end and a readable on the other.
146It's Sink that returns a Source.
147That is, it's just a function that takes a `read` function,
148and returns another `read` function.
149
150```js
151// double is a through stream that doubles values.
152function double (read) {
153 return function readable (end, cb) {
154 read(end, function (end, data) {
155 cb(end, data != null ? data * 2 : null)
156 })
157 }
158}
159
160pull(createRandomStream(5), double, logger)
161```
162
163### Pipeability
164
165Every pipeline must go from a `source` to a `sink`.
166Data will not start moving until the whole thing is connected.
167
168```js
169pull(source, through, sink)
170```
171
172some times, it's simplest to describe a stream in terms of other streams.
173pull can detect what sort of stream it starts with (by counting arguments)
174and if you pull together through streams, it gives you a new through stream.
175
176```js
177var tripleThrough =
178 pull(through1(), through2(), through3())
179// The three through streams become one.
180
181pull(source(), tripleThrough, sink())
182```
183
184pull detects if it's missing a Source by checking function arity,
185if the function takes only one argument it's either a sink or a through.
186Otherwise it's a Source.
187
188## Duplex Streams
189
190Duplex streams, which are used to communicate between two things,
191(i.e. over a network) are a little different. In a duplex stream,
192messages go both ways, so instead of a single function that represents the stream,
193you need a pair of streams. `{source: sourceStream, sink: sinkStream}`
194
195pipe duplex streams like this:
196
197``` js
198var a = duplex()
199var b = duplex()
200
201pull(a.source, b.sink)
202pull(b.source, a.sink)
203
204//which is the same as
205
206b.sink(a.source); a.sink(b.source)
207
208//but the easiest way is to allow pull to handle this
209
210pull(a, b, a)
211
212//"pull from a to b and then back to a"
213
214```
215
216## Design Goals & Rationale
217
218There is a deeper,
219[platonic abstraction](http://en.wikipedia.org/wiki/Platonic_idealism),
220where a streams is just an array in time, instead of in space.
221And all the various streaming "abstractions" are just crude implementations
222of this abstract idea.
223
224[classic-streams](https://github.com/joyent/node/blob/v0.8.16/doc/api/stream.markdown),
225[new-streams](https://github.com/joyent/node/blob/v0.10/doc/api/stream.markdown),
226[reducers](https://github.com/Gozala/reducers)
227
228The objective here is to find a simple realization of the best features of the above.
229
230### Type Agnostic
231
232A stream abstraction should be able to handle both streams of text and streams
233of objects.
234
235### A pipeline is also a stream.
236
237Something like this should work: `a.pipe(x.pipe(y).pipe(z)).pipe(b)`
238this makes it possible to write a custom stream simply by
239combining a few available streams.
240
241### Propagate End/Error conditions.
242
243If a stream ends in an unexpected way (error),
244then other streams in the pipeline should be notified.
245(this is a problem in node streams - when an error occurs,
246the stream is disconnected, and the user must handle that specially)
247
248Also, the stream should be able to be ended from either end.
249
250### Transparent Backpressure & Laziness
251
252Very simple transform streams must be able to transfer back pressure
253instantly.
254
255This is a problem in node streams, pause is only transfered on write, so
256on a long chain (`a.pipe(b).pipe(c)`), if `c` pauses, `b` will have to write to it
257to pause, and then `a` will have to write to `b` to pause.
258If `b` only transforms `a`'s output, then `a` will have to write to `b` twice to
259find out that `c` is paused.
260
261[reducers](https://github.com/Gozala/reducers) reducers has an interesting method,
262where synchronous tranformations propagate back pressure instantly!
263
264This means you can have two "smart" streams doing io at the ends, and lots of dumb
265streams in the middle, and back pressure will work perfectly, as if the dumb streams
266are not there.
267
268This makes laziness work right.
269
270### handling end, error, and abort.
271
272in pull streams, any part of the stream (source, sink, or through)
273may terminate the stream. (this is the case with node streams too,
274but it's not handled well).
275
276#### source: end, error
277
278A source may end (`cb(true)` after read) or error (`cb(error)` after read)
279After ending, the source *must* never `cb(null, data)`
280
281#### sink: abort
282
283Sinks do not normally end the stream, but if they decide they do
284not need any more data they may "abort" the source by calling `read(true, cb)`.
285A abort (`read(true, cb)`) may be called before a preceding read call
286has called back.
287
288### handling end/abort/error in through streams
289
290Rules for implementing `read` in a through stream:
2911) Sink wants to stop. sink aborts the through
292
293 just forward the exact read() call to your source,
294 any future read calls should cb(true).
295
2962) We want to stop. (abort from the middle of the stream)
297
298 abort your source, and then cb(true) to tell the sink we have ended.
299 If the source errored during abort, end the sink by cb read with `cb(err)`.
300 (this will be an ordinary end/error for the sink)
301
3023) Source wants to stop. (`read(null, cb) -> cb(err||true)`)
303
304 forward that exact callback towards the sink chain,
305 we must respond to any future read calls with `cb(err||true)`.
306
307In none of the above cases data is flowing!
3084) If data is flowing (normal operation: `read(null, cb) -> cb(null, data)`
309
310 forward data downstream (towards the Sink)
311 do none of the above!
312
313There either is data flowing (4) OR you have the error/abort cases (1-3), never both.
314
315
316## 1:1 read-callback ratio
317
318A pull stream source (and thus transform) returns *exactly one value* per read.
319
320This differs from node streams, which can use `this.push(value)` and in internal
321buffer to create transforms that write many values from a single read value.
322
323Pull streams don't come with their own buffering mechanism, but [there are ways
324to get around this](https://github.com/dominictarr/pull-stream-examples/blob/master/buffering.js).
325
326
327## Minimal bundle
328
329If you need only the `pull` function from this package you can reduce the size
330of the imported code (for instance to reduce a Browserify bundle) by requiring
331it directly:
332
333
334```js
335var pull = require('pull-stream/pull')
336
337pull(createRandomStream(5), logger())
338```
339
340
341## Further Examples
342
343- [dominictarr/pull-stream-examples](https://github.com/dominictarr/pull-stream-examples)
344- [./docs/examples](./docs/examples.md)
345
346Explore this repo further for more information about
347[sources](./docs/sources/index.md),
348[throughs](./docs/throughs/index.md),
349[sinks](./docs/sinks/index.md), and
350[glossary](./docs/glossary.md).
351
352
353## License
354
355MIT
356
357