UNPKG

742 BJavaScriptView Raw
1const duplexer2 = require('duplexer2');
2const Streamz = require('streamz');
3const toStream = require('./tostream');
4
5module.exports = function(fn) {
6 const inStream = Streamz();
7 const outStream = Streamz();
8
9 if (fn.length > 1)
10 fn(inStream,outStream);
11 else
12 toStream(fn(inStream)).pipe(outStream);
13
14 const stream = duplexer2({objectMode: true},inStream,outStream);
15
16 // Mirror error and promise behaviour from streamz
17 stream.on('error',e => {
18 if (stream._events.error.length < 2) {
19 const pipes = stream._readableState.pipes;
20 if (pipes)
21 [].concat(pipes).forEach(child => child.emit('error',e));
22 else
23 throw e;
24 }
25 });
26
27 stream.promise = Streamz.prototype.promise;
28
29 return stream;
30};
\No newline at end of file