UNPKG

8.77 kBJavaScriptView Raw
1'use strict'
2module.exports = fun
3// module.exports.FunStream = FunStream (modern classes, boo)
4
5try {
6 module.exports.Promise = require('bluebird')
7} catch (ex) {
8 module.exports.Promise = Promise
9}
10
11const MiniSyncSink = require('./mini-sync-sink')
12const Transform = require('stream').Transform
13const Writable = require('stream').Writable
14const PassThrough = require('stream').PassThrough
15
16function fun (stream, opts) {
17 if (stream == null) {
18 return new FunStream()
19 }
20 if (Array.isArray(stream)) {
21 return funary(stream, opts)
22 }
23 if (typeof stream === 'object') {
24 if ('pause' in stream) {
25 return funify(stream, opts)
26 }
27 if ('write' in stream) {
28 return stream // write streams can't be fun
29 }
30 if (opts == null) {
31 return new FunStream(stream)
32 }
33 }
34 throw new Error(`funstream invalid arguments, expected: fun([stream | array], [opts]), got: fun(${[].map.call(arguments, arg => typeof arg).join(', ')})`)
35}
36
37function funify (stream, opts) {
38 if (stream instanceof FunStream || stream.isFun) return stream
39 stream.isFun = true
40 stream.opts = opts || {}
41 stream.async = stream.opts.async
42
43 stream.filter = FunStream.prototype.filter
44 stream.map = FunStream.prototype.map
45 stream.reduce = FunStream.prototype.reduce
46 stream.forEach = FunStream.prototype.forEach
47 stream.sync = FunStream.prototype.sync
48 stream.async = FunStream.prototype.async
49
50 const originalPipe = stream.pipe
51 stream.pipe = function (into, opts) {
52 this.on('error', (err, stream) => into.emit('error', err, stream || this))
53 return funify(originalPipe.call(this, into, opts), this.opts)
54 }
55 return stream
56}
57
58function funary (array, opts) {
59 const stream = new FunStream(opts)
60 let index = 0
61 setImmediate(sendArray)
62 return stream
63
64 function sendArray () {
65 while (index < array.length) {
66 if (!stream.write(array[index++])) {
67 stream.once('drain', sendArray)
68 return
69 }
70 }
71 stream.end()
72 }
73}
74
75class FunStream extends PassThrough {
76 constructor (opts) {
77 super(Object.assign({objectMode: true}, opts))
78 this.opts = opts || {}
79 }
80 async () {
81 this.opts.async = true
82 return this
83 }
84 sync () {
85 this.opts.async = false
86 return this
87 }
88 pipe (into, opts) {
89 this.on('error', (err, stream) => into.emit('error', err, stream || this))
90 return funify(super.pipe(into, opts), this.opts)
91 }
92 filter (filterWith, opts) {
93 const filter = FilterStream(filterWith, opts ? Object.assign(this.opts, opts) : this.opts)
94 return this.pipe(filter)
95 }
96 map (mapWith, opts) {
97 const map = MapStream(mapWith, opts ? Object.assign(this.opts, opts) : this.opts)
98 return this.pipe(map)
99 }
100 reduce (reduceWith, initial, opts) {
101 return new module.exports.Promise((resolve, reject) => {
102 const reduce = this.pipe(ReduceStream(reduceWith, initial, opts ? Object.assign(this.opts, opts) : this.opts))
103 reduce.once('error', reject)
104 reduce.once('result', resolve)
105 })
106 }
107 forEach (consumeWith, opts) {
108 return new module.exports.Promise((resolve, reject) => {
109 const consume = this.pipe(ConsumeStream(consumeWith, opts ? Object.assign(this.opts, opts) : this.opts))
110 consume.once('error', reject)
111 consume.once('finish', resolve)
112 })
113 }
114}
115module.exports.FunStream = FunStream
116
117class FunTransform extends Transform {
118 constructor (opts) {
119 super({objectMode: true})
120 this.opts = opts || {}
121 }
122}
123funify(FunTransform.prototype)
124
125function isAsync (fun, args, opts) {
126 if (fun.constructor.name === 'AsyncFunction') return true
127 if (opts && opts.async != null) return opts.async
128 return fun.length > args
129}
130
131function FilterStream (filterWith, opts) {
132 if (isAsync(filterWith, 1, opts)) {
133 return new FilterStreamAsync(filterWith, opts)
134 } else {
135 return new FilterStreamSync(filterWith, opts)
136 }
137}
138
139class FilterStreamAsync extends FunTransform {
140 constructor (filterWith, opts) {
141 super(opts)
142 this.filters = [filterWith]
143 }
144 _transform (data, encoding, next) {
145 this._runFilters(data, true, 0, next)
146 }
147 _runFilters (data, keep, nextFilter, next) {
148 if (!keep) return next()
149 if (nextFilter >= this.filters.length) {
150 this.push(data)
151 return next()
152 }
153 try {
154 const handleResult = (err, keep) => {
155 if (err) {
156 return next(err)
157 } else {
158 this._runFilters(data, keep, nextFilter + 1, next)
159 }
160 }
161
162 const result = this.filters[nextFilter](data, handleResult)
163 if (result && result.then) return result.then(keep => handleResult(null, keep), handleResult)
164 } catch (ex) {
165 return next(ex)
166 }
167 }
168 filter (filterWith, opts) {
169 if (isAsync(filterWith, 1, opts)) {
170 this.filters.push(filterWith)
171 return this
172 } else {
173 return super.filter(filterWith, opts)
174 }
175 }
176}
177
178class FilterStreamSync extends FunTransform {
179 constructor (filterWith, opts) {
180 super(opts)
181 this.filters = [filterWith]
182 }
183 _transform (data, encoding, next) {
184 try {
185 if (this.filters.every(fn => fn(data))) {
186 this.push(data, encoding)
187 }
188 next()
189 } catch (ex) {
190 next(ex)
191 }
192 }
193 filter (filterWith, opts) {
194 if (isAsync(filterWith, 1, opts)) {
195 return super.filter(filterWith, opts)
196 } else {
197 this.filters.push(filterWith)
198 return this
199 }
200 }
201}
202
203function MapStream (mapWith, opts) {
204 if (isAsync(mapWith, 1, opts)) {
205 return new MapStreamAsync(mapWith, opts)
206 } else {
207 return new MapStreamSync(mapWith, opts)
208 }
209}
210
211class MapStreamAsync extends FunTransform {
212 constructor (mapWith, opts) {
213 super(opts)
214 this.maps = [mapWith]
215 }
216 _transform (data, encoding, next) {
217 this._runMaps(data, 0, next)
218 }
219 _runMaps (data, nextMap, next) {
220 try {
221 if (nextMap >= this.maps.length) {
222 this.push(data)
223 return next()
224 }
225 const handleResult = (err, value) => {
226 if (err) {
227 return next(err)
228 } else {
229 this._runMaps(value, nextMap + 1, next)
230 }
231 }
232 const result = this.maps[nextMap](data, handleResult)
233 if (result && result.then) return result.then(keep => handleResult(null, keep), handleResult)
234
235 } catch (ex) {
236 next(ex)
237 }
238 }
239 map (mapWith, opts) {
240 if (isAsync(mapWith, 1, opts)) {
241 this.maps.push(mapWith)
242 return this
243 } else {
244 return super.map(mapWith, opts)
245 }
246 }
247}
248
249class MapStreamSync extends FunTransform {
250 constructor (mapWith, opts) {
251 super(opts)
252 this.maps = [mapWith]
253 }
254 _transform (data, encoding, next) {
255 try {
256 this.push(this.maps.reduce((data, fn) => fn(data), data))
257 next()
258 } catch (ex) {
259 next(ex)
260 }
261 }
262 map (mapWith, opts) {
263 if (isAsync(mapWith, 1, opts)) {
264 return super.map(mapWith, opts)
265 } else {
266 this.maps.push(mapWith)
267 return this
268 }
269 }
270}
271
272function ReduceStream (reduceWith, initial, opts) {
273 if (isAsync(reduceWith, 2, opts)) {
274 return new ReduceStreamAsync(reduceWith, initial)
275 } else {
276 return new ReduceStreamSync(reduceWith, initial)
277 }
278}
279class ReduceStreamAsync extends Writable {
280 constructor (reduceWith, initial) {
281 super({objectMode: true})
282 this.reduceWith = reduceWith
283 this.acc = initial
284 }
285 _write (data, encoding, next) {
286 if (this.acc == null) {
287 this.acc = data
288 next()
289 } else {
290 const handleResult = (err, value) => {
291 this.acc = result
292 next(err)
293 }
294 const result = this.reduceWith(this.acc, data, handleResult)
295 if (result && result.then) return result.then(keep => handleResult(null, keep), handleResult)
296 }
297 }
298 end () {
299 super.end()
300 this.emit('result', this.acc)
301 }
302}
303
304class ReduceStreamSync extends MiniSyncSink {
305 constructor (reduceWith, initial) {
306 super()
307 this.reduceWith = reduceWith
308 this.acc = initial
309 }
310 write (data, encoding, next) {
311 if (this.acc == null) {
312 this.acc = data
313 } else {
314 try {
315 this.acc = this.reduceWith(this.acc, data)
316 } catch (ex) {
317 this.emit(ex)
318 return false
319 }
320 }
321 if (next) next()
322 return true
323 }
324 end () {
325 super.end()
326 this.emit('result', this.acc)
327 }
328}
329
330function ConsumeStream (consumeWith, opts) {
331 if (isAsync(consumeWith, 1, opts)) {
332 return new ConsumeStreamAsync({consumeWith: consumeWith})
333 } else {
334 return new MiniSyncSink({write: consumeWith})
335 }
336}
337
338class ConsumeStreamAsync extends Writable {
339 constructor (opts) {
340 super({objectMode: true})
341 this.consumeWith = opts.consumeWith
342 }
343 _write (data, encoding, next) {
344 const result = this.consumeWith(data, next)
345 if (result && result.then) return result.then(keep => next(null, keep), next)
346 }
347}