1 | # funstream
|
2 |
|
3 | Funstream gives you iteratorish methods on your streams.
|
4 |
|
5 | ```
|
6 | const fun = require('funstream')
|
7 |
|
8 | fun([1, 2, 3, 4, 5]).map(n => n + 1).filter(n => n % 2).map(n => `${n}\n`).pipe(process.stdout)
|
9 | // prints lines with 3 and 5
|
10 | fun([1, 2, 3, 4, 5]).map(n => n + 1).filter(n => n % 2).reduce((a, b) => a + b).then(console.log)
|
11 | // prints 8
|
12 |
|
13 | // If you're not so keen on mutating things, why not try piping into a piping hot fun stream?
|
14 | process.stdin.pipe(fun()).map(str => transformStr(str)).pipe(process.stdout)
|
15 |
|
16 | // You'll also transparently use async functions, so like if `transformStr` is async then you can use:
|
17 | // `.map(async str => transformStr(str))`
|
18 | // Just make sure you don't forget the async keyword. Alternatively, you
|
19 | // can use the async method in the chain:
|
20 | // `.async().map(str => transformStr(str))`
|
21 | ```
|
22 |
|
23 | Funstream makes object streams better.
|
24 |
|
25 | ## Funstream constructors
|
26 |
|
27 | ### fun(stream[, opts]) → FunStream
|
28 |
|
29 | This is probably what you want.
|
30 |
|
31 | Makes an existing stream a funstream! Has the advantage over `fun()` of
|
32 | handling error propagation for you.
|
33 |
|
34 | `opts` is an optional options object. The only option currently is `async`
|
35 | which let's you explicitly tell Funstream if your callbacks are sync or
|
36 | async. If you don't include this we'll detect which you're using by looking
|
37 | at the number of arguments your callback takes. Because promises and sync functions
|
38 | take the same number of arguments, if you're using promise returning callcks you'll need to
|
39 | explicitly pass in `async: true`.
|
40 |
|
41 | ### fun(array[,opts]) → FunStream
|
42 |
|
43 | Returns a funstream that will receive entries from the array one at a time
|
44 | while respecting back pressure.
|
45 |
|
46 | ### fun([opts]) → FunStream
|
47 |
|
48 | Make a passthrough Funstream. You can pipe into this to get access to our
|
49 | handy methods.
|
50 |
|
51 | ## Funstream and Pipelines
|
52 |
|
53 | Contrary to ordinary, BORING streams, we make sure errors are passed along
|
54 | when we chain into something. This applies when you `.map` or `.filter` but
|
55 | it ALSO applies when you `.pipe`.
|
56 |
|
57 | ## Funstream methods
|
58 |
|
59 | This is the good stuff. All callbacks can be sync or async. You can
|
60 | indicate this by setting the `async` property on the opts object either when
|
61 | calling the method below or when constructing the objects to start with.
|
62 | Values of the `async` property progogate down the chain, for example:
|
63 |
|
64 | `.map(…, {async: true}).map(…)`
|
65 |
|
66 | The second map callback will also be assume do to be async.
|
67 |
|
68 | Multiple sync functions of the same time will be automatically aggregated
|
69 | without constructing additional streams, so:
|
70 |
|
71 | `.filter(n => n < 23).filter(n => n > 5)`
|
72 |
|
73 | The second `filter` call actually returns the same stream object. This does
|
74 | mean that if you try to fork the streams inbetween it won't work. Sorry.
|
75 |
|
76 | ### .pipe(target[, opts]) → FunStream(target)
|
77 |
|
78 | Like an ordinary pipe, but funerer. In addition mutating the target into a
|
79 | funstream we also forward errors to it.
|
80 |
|
81 | ### .filter(filterWith[, opts]) → FunStream
|
82 |
|
83 | Filter the stream!
|
84 |
|
85 | * `filterWith(data) → Boolean` (can throw)
|
86 | * `filterWith(data, cb)` (and `cb(err, shouldInclude)`)
|
87 | * `filterWith(data) → Promise(Boolean)
|
88 |
|
89 | If `filterWith` returns true, we include the value in the output stream,
|
90 | otherwise not.
|
91 |
|
92 | ### .map(mapWith[, opts]) → FunStream
|
93 |
|
94 | Transform the stream!
|
95 |
|
96 | * `mapWith(data) → newData` (can throw)
|
97 | * `mapWith(data, cb)` (and `cb(err, newData)`)
|
98 | * `mapWith(data) → Promise(newData)
|
99 |
|
100 | `data` is replaced with `newData` from `mapWith` in the output stream.
|
101 |
|
102 | ### .reduce(reduceWith[, initial[, opts]]) → Promise
|
103 |
|
104 | Promise the result of computing everything.
|
105 |
|
106 | * `reduceWith(a, b) → newData` (can throw)
|
107 | * `reduceWith(a, b, cb)` (and `cb(err, newData)`)
|
108 | * `reduceWith(a, b) → Promise(newData)
|
109 |
|
110 | Concat a stream:
|
111 | ```
|
112 | fun(stream)
|
113 | .reduce((a, b) => a + b)
|
114 | .then(wholeThing => { … })
|
115 | ```
|
116 |
|
117 | ### .forEach(consumeWith[, opts]) → Promise
|
118 |
|
119 | Run some code for every chunk, promise that the stream is done.
|
120 |
|
121 | Example, print each line:
|
122 | ```
|
123 | fun(stream)
|
124 | .forEach(chunk => console.log(chunk)
|
125 | .then(() => console.log('Done!'))
|
126 | ``` |
\ | No newline at end of file |