1 | # pull-stream
|
2 |
|
3 | Minimal Pipeable Pull-stream
|
4 |
|
5 | In [classic-streams](https://github.com/nodejs/node-v0.x-archive/blob/v0.8/doc/api/stream.markdown),
|
6 | streams _push_ data to the next stream in the pipeline.
|
7 | In [new-streams](https://github.com/nodejs/node-v0.x-archive/blob/v0.10/doc/api/stream.markdown),
|
8 | data is pulled out of the source stream, into the destination.
|
9 | `pull-stream` is a minimal take on streams,
|
10 | pull streams work great for "object" streams as well as streams of raw text or binary data.
|
11 |
|
12 | [![build status](https://secure.travis-ci.org/pull-stream/pull-stream.png)](https://travis-ci.org/pull-stream/pull-stream)
|
13 |
|
14 |
|
15 | ## Quick Example
|
16 |
|
17 | Stat some files:
|
18 |
|
19 | ```js
|
20 | pull(
|
21 | pull.values(['file1', 'file2', 'file3']),
|
22 | pull.asyncMap(fs.stat),
|
23 | pull.collect(function (err, array) {
|
24 | console.log(array)
|
25 | })
|
26 | )
|
27 | ```
|
28 | Note that `pull(a, b, c)` is basically the same as `a.pipe(b).pipe(c)`.
|
29 |
|
30 | To grok how pull-streams work, read through [pull-streams by example](https://github.com/dominictarr/pull-stream-examples)
|
31 |
|
32 | ## How do I do X with pull-streams?
|
33 |
|
34 | There is a module for that!
|
35 |
|
36 | Check the [pull-stream FAQ](https://github.com/pull-stream/pull-stream-faq)
|
37 | and post an issue if you have a question that is not covered.
|
38 |
|
39 | ## Compatibily with node streams
|
40 |
|
41 | pull-streams are not _directly_ compatible with node streams,
|
42 | but pull-streams can be converted into node streams with
|
43 | [pull-stream-to-stream](https://github.com/pull-stream/pull-stream-to-stream)
|
44 | and node streams can be converted into pull-stream using [stream-to-pull-stream](https://github.com/pull-stream/stream-to-pull-stream)
|
45 | correct back pressure is preserved.
|
46 |
|
47 | ### Readable & Reader vs. Readable & Writable
|
48 |
|
49 | Instead of a readable stream, and a writable stream, there is a `readable` stream,
|
50 | (aka "Source") and a `reader` stream (aka "Sink"). Through streams
|
51 | is a Sink that returns a Source.
|
52 |
|
53 | See also:
|
54 | * [Sources](./docs/sources/index.md)
|
55 | * [Throughs](./docs/throughs/index.md)
|
56 | * [Sinks](./docs/sinks/index.md)
|
57 |
|
58 | ### Source (readable stream that produces values)
|
59 |
|
60 | A Source is a function `read(end, cb)`,
|
61 | that may be called many times,
|
62 | and will (asynchronously) call `cb(null, data)` once for each call.
|
63 |
|
64 | To signify an end state, the stream eventually returns `cb(err)` or `cb(true)`.
|
65 | When signifying an end state, `data` *must* be ignored.
|
66 |
|
67 | The `read` function *must not* be called until the previous call has called back.
|
68 | Unless, it is a call to abort the stream (`read(Error || true, cb)`).
|
69 |
|
70 | ```js
|
71 | var n = 5;
|
72 |
|
73 | // random is a source 5 of random numbers.
|
74 | function random (end, cb) {
|
75 | if(end) return cb(end)
|
76 | // only read n times, then stop.
|
77 | if(0 > --n) return cb(true)
|
78 | cb(null, Math.random())
|
79 | }
|
80 |
|
81 | ```
|
82 |
|
83 | ### Sink (reader or writable stream that consumes values)
|
84 |
|
85 | A Sink is a function `reader(read)` that calls a Source (`read(null, cb)`),
|
86 | until it decides to stop (by calling `read(true, cb)`), or the readable ends (`read` calls
|
87 | `cb(Error || true)`
|
88 |
|
89 | All [Throughs](./docs/throughs/index.md)
|
90 | and [Sinks](./docs/sinks/index.md)
|
91 | are reader streams.
|
92 |
|
93 | ```js
|
94 | // logger reads a source and logs it.
|
95 | function logger (read) {
|
96 | read(null, function next(end, data) {
|
97 | if(end === true) return
|
98 | if(end) throw end
|
99 |
|
100 | console.log(data)
|
101 | read(null, next)
|
102 | })
|
103 | }
|
104 | ```
|
105 |
|
106 | Since Sources and Sinks are functions, you can pass them to each other!
|
107 |
|
108 | ```js
|
109 | logger(random) //"pipe" the streams.
|
110 |
|
111 | ```
|
112 |
|
113 | but, it's easier to read if you use's pull-stream's `pull` method
|
114 |
|
115 | ```js
|
116 | var pull = require('pull-stream')
|
117 |
|
118 | pull(random, logger)
|
119 | ```
|
120 |
|
121 | ### Creating reusable streams
|
122 |
|
123 | When working with pull streams it is common to create functions that return a stream.
|
124 | This is because streams contain mutable state and so can only be used once.
|
125 | In the above example, once `random` has been connected to a sink and has produced 5 random numbers it will not produce any more random numbers if connected to another sink.
|
126 |
|
127 | Therefore, use a function like this to create a random number generating stream that can be reused:
|
128 |
|
129 | ```js
|
130 |
|
131 | // create a stream of n random numbers
|
132 | function createRandomStream (n) {
|
133 | return function randomReadable (end, cb) {
|
134 | if(end) return cb(end)
|
135 | if(0 > --n) return cb(true)
|
136 | cb(null, Math.random())
|
137 | }
|
138 | }
|
139 |
|
140 | pull(createRandomStream(5), logger)
|
141 | ```
|
142 |
|
143 |
|
144 | ### Through
|
145 |
|
146 | A through stream is both a reader (consumes values) and a readable (produces values).
|
147 | It's a function that takes a `read` function (a Sink),
|
148 | and returns another `read` function (a Source).
|
149 |
|
150 | ```js
|
151 | // double is a through stream that doubles values.
|
152 | function double (read) {
|
153 | return function readable (end, cb) {
|
154 | read(end, function (end, data) {
|
155 | cb(end, data != null ? data * 2 : null)
|
156 | })
|
157 | }
|
158 | }
|
159 |
|
160 | pull(createRandomStream(5), double, logger)
|
161 | ```
|
162 |
|
163 | ### Pipeability
|
164 |
|
165 | Every pipeline must go from a `source` to a `sink`.
|
166 | Data will not start moving until the whole thing is connected.
|
167 |
|
168 | ```js
|
169 | pull(source, through, sink)
|
170 | ```
|
171 |
|
172 | some times, it's simplest to describe a stream in terms of other streams.
|
173 | pull can detect what sort of stream it starts with (by counting arguments)
|
174 | and if you pull together through streams, it gives you a new through stream.
|
175 |
|
176 | ```js
|
177 | var tripleThrough =
|
178 | pull(through1(), through2(), through3())
|
179 | // The three through streams become one.
|
180 |
|
181 | pull(source(), tripleThrough, sink())
|
182 | ```
|
183 |
|
184 | pull detects if it's missing a Source by checking function arity,
|
185 | if the function takes only one argument it's either a sink or a through.
|
186 | Otherwise it's a Source.
|
187 |
|
188 | ## Duplex Streams
|
189 |
|
190 | Duplex streams, which are used to communicate between two things,
|
191 | (i.e. over a network) are a little different. In a duplex stream,
|
192 | messages go both ways, so instead of a single function that represents the stream,
|
193 | you need a pair of streams. `{source: sourceStream, sink: sinkStream}`
|
194 |
|
195 | pipe duplex streams like this:
|
196 |
|
197 | ``` js
|
198 | var a = duplex()
|
199 | var b = duplex()
|
200 |
|
201 | pull(a.source, b.sink)
|
202 | pull(b.source, a.sink)
|
203 |
|
204 | //which is the same as
|
205 |
|
206 | b.sink(a.source); a.sink(b.source)
|
207 |
|
208 | //but the easiest way is to allow pull to handle this
|
209 |
|
210 | pull(a, b, a)
|
211 |
|
212 | //"pull from a to b and then back to a"
|
213 |
|
214 | ```
|
215 |
|
216 | ## Design Goals & Rationale
|
217 |
|
218 | There is a deeper,
|
219 | [platonic abstraction](http://en.wikipedia.org/wiki/Platonic_idealism),
|
220 | where a streams is just an array in time, instead of in space.
|
221 | And all the various streaming "abstractions" are just crude implementations
|
222 | of this abstract idea.
|
223 |
|
224 | [classic-streams](https://github.com/joyent/node/blob/v0.8.16/doc/api/stream.markdown),
|
225 | [new-streams](https://github.com/joyent/node/blob/v0.10/doc/api/stream.markdown),
|
226 | [reducers](https://github.com/Gozala/reducers)
|
227 |
|
228 | The objective here is to find a simple realization of the best features of the above.
|
229 |
|
230 | ### Type Agnostic
|
231 |
|
232 | A stream abstraction should be able to handle both streams of text and streams
|
233 | of objects.
|
234 |
|
235 | ### A pipeline is also a stream.
|
236 |
|
237 | Something like this should work: `a.pipe(x.pipe(y).pipe(z)).pipe(b)`
|
238 | this makes it possible to write a custom stream simply by
|
239 | combining a few available streams.
|
240 |
|
241 | ### Propagate End/Error conditions.
|
242 |
|
243 | If a stream ends in an unexpected way (error),
|
244 | then other streams in the pipeline should be notified.
|
245 | (this is a problem in node streams - when an error occurs,
|
246 | the stream is disconnected, and the user must handle that specially)
|
247 |
|
248 | Also, the stream should be able to be ended from either end.
|
249 |
|
250 | ### Transparent Backpressure & Laziness
|
251 |
|
252 | Very simple transform streams must be able to transfer back pressure
|
253 | instantly.
|
254 |
|
255 | This is a problem in node streams, pause is only transfered on write, so
|
256 | on a long chain (`a.pipe(b).pipe(c)`), if `c` pauses, `b` will have to write to it
|
257 | to pause, and then `a` will have to write to `b` to pause.
|
258 | If `b` only transforms `a`'s output, then `a` will have to write to `b` twice to
|
259 | find out that `c` is paused.
|
260 |
|
261 | [reducers](https://github.com/Gozala/reducers) reducers has an interesting method,
|
262 | where synchronous tranformations propagate back pressure instantly!
|
263 |
|
264 | This means you can have two "smart" streams doing io at the ends, and lots of dumb
|
265 | streams in the middle, and back pressure will work perfectly, as if the dumb streams
|
266 | are not there.
|
267 |
|
268 | This makes laziness work right.
|
269 |
|
270 | ### handling end, error, and abort.
|
271 |
|
272 | in pull streams, any part of the stream (source, sink, or through)
|
273 | may terminate the stream. (this is the case with node streams too,
|
274 | but it's not handled well).
|
275 |
|
276 | #### source: end, error
|
277 |
|
278 | A source may end (`cb(true)` after read) or error (`cb(error)` after read)
|
279 | After ending, the source *must* never `cb(null, data)`
|
280 |
|
281 | #### sink: abort
|
282 |
|
283 | Sinks do not normally end the stream, but if they decide they do
|
284 | not need any more data they may "abort" the source by calling `read(true, cb)`.
|
285 | A abort (`read(true, cb)`) may be called before a preceding read call
|
286 | has called back.
|
287 |
|
288 | ### handling end/abort/error in through streams
|
289 |
|
290 | Rules for implementing `read` in a through stream:
|
291 | 1) Sink wants to stop. sink aborts the through
|
292 |
|
293 | just forward the exact read() call to your source,
|
294 | any future read calls should cb(true).
|
295 |
|
296 | 2) We want to stop. (abort from the middle of the stream)
|
297 |
|
298 | abort your source, and then cb(true) to tell the sink we have ended.
|
299 | If the source errored during abort, end the sink by cb read with `cb(err)`.
|
300 | (this will be an ordinary end/error for the sink)
|
301 |
|
302 | 3) Source wants to stop. (`read(null, cb) -> cb(err||true)`)
|
303 |
|
304 | forward that exact callback towards the sink chain,
|
305 | we must respond to any future read calls with `cb(err||true)`.
|
306 |
|
307 | In none of the above cases data is flowing!
|
308 | 4) If data is flowing (normal operation: `read(null, cb) -> cb(null, data)`
|
309 |
|
310 | forward data downstream (towards the Sink)
|
311 | do none of the above!
|
312 |
|
313 | There either is data flowing (4) OR you have the error/abort cases (1-3), never both.
|
314 |
|
315 |
|
316 | ## 1:1 read-callback ratio
|
317 |
|
318 | A pull stream source (and thus transform) returns *exactly one value* per read.
|
319 |
|
320 | This differs from node streams, which can use `this.push(value)` and in internal
|
321 | buffer to create transforms that write many values from a single read value.
|
322 |
|
323 | Pull streams don't come with their own buffering mechanism, but [there are ways
|
324 | to get around this](https://github.com/dominictarr/pull-stream-examples/blob/master/buffering.js).
|
325 |
|
326 |
|
327 | ## Minimal bundle
|
328 |
|
329 | If you need only the `pull` function from this package you can reduce the size
|
330 | of the imported code (for instance to reduce a Browserify bundle) by requiring
|
331 | it directly:
|
332 |
|
333 |
|
334 | ```js
|
335 | var pull = require('pull-stream/pull')
|
336 |
|
337 | pull(createRandomStream(5), logger())
|
338 | ```
|
339 |
|
340 |
|
341 | ## Further Examples
|
342 |
|
343 | - [dominictarr/pull-stream-examples](https://github.com/dominictarr/pull-stream-examples)
|
344 | - [./docs/examples](./docs/examples.md)
|
345 |
|
346 | Explore this repo further for more information about
|
347 | [sources](./docs/sources/index.md),
|
348 | [throughs](./docs/throughs/index.md),
|
349 | [sinks](./docs/sinks/index.md), and
|
350 | [glossary](./docs/glossary.md).
|
351 |
|
352 |
|
353 | ## License
|
354 |
|
355 | MIT
|
356 |
|
357 |
|