UNPKG

1.27 kBJavaScriptView Raw
1'use strict'
2
3var values = require('../sources/values')
4var once = require('../sources/once')
5
6//convert a stream of arrays or streams into just a stream.
7module.exports = function flatten () {
8 return function (read) {
9 var _read
10 return function (abort, cb) {
11 if (abort) { //abort the current stream, and then stream of streams.
12 _read ? _read(abort, function(err) {
13 read(err || abort, cb)
14 }) : read(abort, cb)
15 }
16 else if(_read) nextChunk()
17 else nextStream()
18
19 function nextChunk () {
20 _read(null, function (err, data) {
21 if (err === true) nextStream()
22 else if (err) {
23 read(true, function(abortErr) {
24 // TODO: what do we do with the abortErr?
25 cb(err)
26 })
27 }
28 else cb(null, data)
29 })
30 }
31 function nextStream () {
32 _read = null
33 read(null, function (end, stream) {
34 if(end)
35 return cb(end)
36 if(Array.isArray(stream) || stream && 'object' === typeof stream)
37 stream = values(stream)
38 else if('function' != typeof stream)
39 stream = once(stream)
40 _read = stream
41 nextChunk()
42 })
43 }
44 }
45 }
46}
47