1 | 'use strict'
|
2 | module.exports = fun
|
3 |
|
4 |
|
5 | try {
|
6 | module.exports.Promise = require('bluebird')
|
7 | } catch (ex) {
|
8 | module.exports.Promise = Promise
|
9 | }
|
10 |
|
11 | const MiniSyncSink = require('./mini-sync-sink')
|
12 | const Transform = require('stream').Transform
|
13 | const Writable = require('stream').Writable
|
14 | const PassThrough = require('stream').PassThrough
|
15 |
|
16 | function fun (stream, opts) {
|
17 | if (stream == null) {
|
18 | return new FunStream()
|
19 | }
|
20 | if (Array.isArray(stream)) {
|
21 | return funary(stream, opts)
|
22 | }
|
23 | if (typeof stream === 'object') {
|
24 | if ('pause' in stream) {
|
25 | return funify(stream, opts)
|
26 | }
|
27 | if ('write' in stream) {
|
28 | return stream
|
29 | }
|
30 | if (opts == null) {
|
31 | return new FunStream(stream)
|
32 | }
|
33 | }
|
34 | throw new Error(`funstream invalid arguments, expected: fun([stream | array], [opts]), got: fun(${[].map.call(arguments, arg => typeof arg).join(', ')})`)
|
35 | }
|
36 |
|
37 | function funify (stream, opts) {
|
38 | if (stream instanceof FunStream || stream.isFun) return stream
|
39 | stream.isFun = true
|
40 | stream.opts = opts || {}
|
41 | stream.async = stream.opts.async
|
42 |
|
43 | stream.filter = FunStream.prototype.filter
|
44 | stream.map = FunStream.prototype.map
|
45 | stream.reduce = FunStream.prototype.reduce
|
46 | stream.forEach = FunStream.prototype.forEach
|
47 | stream.sync = FunStream.prototype.sync
|
48 | stream.async = FunStream.prototype.async
|
49 |
|
50 | const originalPipe = stream.pipe
|
51 | stream.pipe = function (into, opts) {
|
52 | this.on('error', (err, stream) => into.emit('error', err, stream || this))
|
53 | return funify(originalPipe.call(this, into, opts), this.opts)
|
54 | }
|
55 | return stream
|
56 | }
|
57 |
|
58 | function funary (array, opts) {
|
59 | const stream = new FunStream(opts)
|
60 | let index = 0
|
61 | setImmediate(sendArray)
|
62 | return stream
|
63 |
|
64 | function sendArray () {
|
65 | while (index < array.length) {
|
66 | if (!stream.write(array[index++])) {
|
67 | stream.once('drain', sendArray)
|
68 | return
|
69 | }
|
70 | }
|
71 | stream.end()
|
72 | }
|
73 | }
|
74 |
|
75 | class FunStream extends PassThrough {
|
76 | constructor (opts) {
|
77 | super(Object.assign({objectMode: true}, opts))
|
78 | this.opts = opts || {}
|
79 | }
|
80 | async () {
|
81 | this.opts.async = true
|
82 | return this
|
83 | }
|
84 | sync () {
|
85 | this.opts.async = false
|
86 | return this
|
87 | }
|
88 | pipe (into, opts) {
|
89 | this.on('error', (err, stream) => into.emit('error', err, stream || this))
|
90 | return funify(super.pipe(into, opts), this.opts)
|
91 | }
|
92 | filter (filterWith, opts) {
|
93 | const filter = FilterStream(filterWith, opts ? Object.assign(this.opts, opts) : this.opts)
|
94 | return this.pipe(filter)
|
95 | }
|
96 | map (mapWith, opts) {
|
97 | const map = MapStream(mapWith, opts ? Object.assign(this.opts, opts) : this.opts)
|
98 | return this.pipe(map)
|
99 | }
|
100 | reduce (reduceWith, initial, opts) {
|
101 | return new module.exports.Promise((resolve, reject) => {
|
102 | const reduce = this.pipe(ReduceStream(reduceWith, initial, opts ? Object.assign(this.opts, opts) : this.opts))
|
103 | reduce.once('error', reject)
|
104 | reduce.once('result', resolve)
|
105 | })
|
106 | }
|
107 | forEach (consumeWith, opts) {
|
108 | return new module.exports.Promise((resolve, reject) => {
|
109 | const consume = this.pipe(ConsumeStream(consumeWith, opts ? Object.assign(this.opts, opts) : this.opts))
|
110 | consume.once('error', reject)
|
111 | consume.once('finish', resolve)
|
112 | })
|
113 | }
|
114 | }
|
115 | module.exports.FunStream = FunStream
|
116 |
|
117 | class FunTransform extends Transform {
|
118 | constructor (opts) {
|
119 | super({objectMode: true})
|
120 | this.opts = opts || {}
|
121 | }
|
122 | }
|
123 | funify(FunTransform.prototype)
|
124 |
|
125 | function isAsync (fun, args, opts) {
|
126 | if (fun.constructor.name === 'AsyncFunction') return true
|
127 | if (opts && opts.async != null) return opts.async
|
128 | return fun.length > args
|
129 | }
|
130 |
|
131 | function FilterStream (filterWith, opts) {
|
132 | if (isAsync(filterWith, 1, opts)) {
|
133 | return new FilterStreamAsync(filterWith, opts)
|
134 | } else {
|
135 | return new FilterStreamSync(filterWith, opts)
|
136 | }
|
137 | }
|
138 |
|
139 | class FilterStreamAsync extends FunTransform {
|
140 | constructor (filterWith, opts) {
|
141 | super(opts)
|
142 | this.filters = [filterWith]
|
143 | }
|
144 | _transform (data, encoding, next) {
|
145 | this._runFilters(data, true, 0, next)
|
146 | }
|
147 | _runFilters (data, keep, nextFilter, next) {
|
148 | if (!keep) return next()
|
149 | if (nextFilter >= this.filters.length) {
|
150 | this.push(data)
|
151 | return next()
|
152 | }
|
153 | try {
|
154 | const handleResult = (err, keep) => {
|
155 | if (err) {
|
156 | return next(err)
|
157 | } else {
|
158 | this._runFilters(data, keep, nextFilter + 1, next)
|
159 | }
|
160 | }
|
161 |
|
162 | const result = this.filters[nextFilter](data, handleResult)
|
163 | if (result && result.then) return result.then(keep => handleResult(null, keep), handleResult)
|
164 | } catch (ex) {
|
165 | return next(ex)
|
166 | }
|
167 | }
|
168 | filter (filterWith, opts) {
|
169 | if (isAsync(filterWith, 1, opts)) {
|
170 | this.filters.push(filterWith)
|
171 | return this
|
172 | } else {
|
173 | return super.filter(filterWith, opts)
|
174 | }
|
175 | }
|
176 | }
|
177 |
|
178 | class FilterStreamSync extends FunTransform {
|
179 | constructor (filterWith, opts) {
|
180 | super(opts)
|
181 | this.filters = [filterWith]
|
182 | }
|
183 | _transform (data, encoding, next) {
|
184 | try {
|
185 | if (this.filters.every(fn => fn(data))) {
|
186 | this.push(data, encoding)
|
187 | }
|
188 | next()
|
189 | } catch (ex) {
|
190 | next(ex)
|
191 | }
|
192 | }
|
193 | filter (filterWith, opts) {
|
194 | if (isAsync(filterWith, 1, opts)) {
|
195 | return super.filter(filterWith, opts)
|
196 | } else {
|
197 | this.filters.push(filterWith)
|
198 | return this
|
199 | }
|
200 | }
|
201 | }
|
202 |
|
203 | function MapStream (mapWith, opts) {
|
204 | if (isAsync(mapWith, 1, opts)) {
|
205 | return new MapStreamAsync(mapWith, opts)
|
206 | } else {
|
207 | return new MapStreamSync(mapWith, opts)
|
208 | }
|
209 | }
|
210 |
|
211 | class MapStreamAsync extends FunTransform {
|
212 | constructor (mapWith, opts) {
|
213 | super(opts)
|
214 | this.maps = [mapWith]
|
215 | }
|
216 | _transform (data, encoding, next) {
|
217 | this._runMaps(data, 0, next)
|
218 | }
|
219 | _runMaps (data, nextMap, next) {
|
220 | try {
|
221 | if (nextMap >= this.maps.length) {
|
222 | this.push(data)
|
223 | return next()
|
224 | }
|
225 | const handleResult = (err, value) => {
|
226 | if (err) {
|
227 | return next(err)
|
228 | } else {
|
229 | this._runMaps(value, nextMap + 1, next)
|
230 | }
|
231 | }
|
232 | const result = this.maps[nextMap](data, handleResult)
|
233 | if (result && result.then) return result.then(keep => handleResult(null, keep), handleResult)
|
234 |
|
235 | } catch (ex) {
|
236 | next(ex)
|
237 | }
|
238 | }
|
239 | map (mapWith, opts) {
|
240 | if (isAsync(mapWith, 1, opts)) {
|
241 | this.maps.push(mapWith)
|
242 | return this
|
243 | } else {
|
244 | return super.map(mapWith, opts)
|
245 | }
|
246 | }
|
247 | }
|
248 |
|
249 | class MapStreamSync extends FunTransform {
|
250 | constructor (mapWith, opts) {
|
251 | super(opts)
|
252 | this.maps = [mapWith]
|
253 | }
|
254 | _transform (data, encoding, next) {
|
255 | try {
|
256 | this.push(this.maps.reduce((data, fn) => fn(data), data))
|
257 | next()
|
258 | } catch (ex) {
|
259 | next(ex)
|
260 | }
|
261 | }
|
262 | map (mapWith, opts) {
|
263 | if (isAsync(mapWith, 1, opts)) {
|
264 | return super.map(mapWith, opts)
|
265 | } else {
|
266 | this.maps.push(mapWith)
|
267 | return this
|
268 | }
|
269 | }
|
270 | }
|
271 |
|
272 | function ReduceStream (reduceWith, initial, opts) {
|
273 | if (isAsync(reduceWith, 2, opts)) {
|
274 | return new ReduceStreamAsync(reduceWith, initial)
|
275 | } else {
|
276 | return new ReduceStreamSync(reduceWith, initial)
|
277 | }
|
278 | }
|
279 | class ReduceStreamAsync extends Writable {
|
280 | constructor (reduceWith, initial) {
|
281 | super({objectMode: true})
|
282 | this.reduceWith = reduceWith
|
283 | this.acc = initial
|
284 | }
|
285 | _write (data, encoding, next) {
|
286 | if (this.acc == null) {
|
287 | this.acc = data
|
288 | next()
|
289 | } else {
|
290 | const handleResult = (err, value) => {
|
291 | this.acc = result
|
292 | next(err)
|
293 | }
|
294 | const result = this.reduceWith(this.acc, data, handleResult)
|
295 | if (result && result.then) return result.then(keep => handleResult(null, keep), handleResult)
|
296 | }
|
297 | }
|
298 | end () {
|
299 | super.end()
|
300 | this.emit('result', this.acc)
|
301 | }
|
302 | }
|
303 |
|
304 | class ReduceStreamSync extends MiniSyncSink {
|
305 | constructor (reduceWith, initial) {
|
306 | super()
|
307 | this.reduceWith = reduceWith
|
308 | this.acc = initial
|
309 | }
|
310 | write (data, encoding, next) {
|
311 | if (this.acc == null) {
|
312 | this.acc = data
|
313 | } else {
|
314 | try {
|
315 | this.acc = this.reduceWith(this.acc, data)
|
316 | } catch (ex) {
|
317 | this.emit(ex)
|
318 | return false
|
319 | }
|
320 | }
|
321 | if (next) next()
|
322 | return true
|
323 | }
|
324 | end () {
|
325 | super.end()
|
326 | this.emit('result', this.acc)
|
327 | }
|
328 | }
|
329 |
|
330 | function ConsumeStream (consumeWith, opts) {
|
331 | if (isAsync(consumeWith, 1, opts)) {
|
332 | return new ConsumeStreamAsync({consumeWith: consumeWith})
|
333 | } else {
|
334 | return new MiniSyncSink({write: consumeWith})
|
335 | }
|
336 | }
|
337 |
|
338 | class ConsumeStreamAsync extends Writable {
|
339 | constructor (opts) {
|
340 | super({objectMode: true})
|
341 | this.consumeWith = opts.consumeWith
|
342 | }
|
343 | _write (data, encoding, next) {
|
344 | const result = this.consumeWith(data, next)
|
345 | if (result && result.then) return result.then(keep => next(null, keep), next)
|
346 | }
|
347 | }
|