1 | var once = require('once');
|
2 | var noop = function() {};
|
3 |
|
4 | var patch = function(stream, callback) {
|
5 | var end = stream.end;
|
6 | stream.end = function() {
|
7 | callback();
|
8 | end.apply(this, arguments);
|
9 | };
|
10 | };
|
11 |
|
12 | var destroyer = function(stream, reading, writing, callback) {
|
13 | callback = once(callback);
|
14 |
|
15 | var destroyed = false;
|
16 | var closed = false;
|
17 |
|
18 | var onfinish = function() {
|
19 | writing = false;
|
20 | if (!reading) callback();
|
21 | };
|
22 |
|
23 | stream.on('error', callback);
|
24 |
|
25 | stream.on('finish', onfinish);
|
26 |
|
27 | stream.on('end', function() {
|
28 | reading = false;
|
29 | if (!writing) callback();
|
30 | });
|
31 |
|
32 | stream.on('close', function() {
|
33 | closed = true;
|
34 | if (!reading && !writing) return;
|
35 | if (reading && stream._readableState && stream._readableState.ended) return;
|
36 | callback(new Error('stream closed'));
|
37 | });
|
38 |
|
39 | if (writing && stream.writable && !stream._writableState) patch(stream, onfinish);
|
40 |
|
41 | return function() {
|
42 | if (closed || destroyed || (!reading && !writing) || !stream.destroy) return;
|
43 | destroyed = true;
|
44 | stream.destroy();
|
45 | };
|
46 | };
|
47 |
|
48 | var call = function(fn) {
|
49 | fn();
|
50 | };
|
51 |
|
52 | var pipe = function(from, to) {
|
53 | return from.pipe(to);
|
54 | };
|
55 |
|
56 | var functionish = function(fn) {
|
57 | return !fn || typeof fn === 'function';
|
58 | };
|
59 |
|
60 | var pump = function() {
|
61 | var streams = Array.prototype.slice.call(arguments);
|
62 | var callback = functionish(streams[streams.length-1]) && streams.pop() || noop;
|
63 |
|
64 | if (Array.isArray(streams[0])) streams = streams[0];
|
65 |
|
66 | var error;
|
67 | var readables = Math.max(streams.length-1, 1);
|
68 | var destroys = streams.map(function(stream, i) {
|
69 | var reading = i < readables;
|
70 | var writing = i > 0;
|
71 | return destroyer(stream, reading, writing, function(err) {
|
72 | if (!error) error = err;
|
73 | if (err) destroys.forEach(call);
|
74 | if (reading) return;
|
75 | destroys.forEach(call);
|
76 | callback(error);
|
77 | });
|
78 | });
|
79 |
|
80 | return streams.reduce(pipe);
|
81 | };
|
82 |
|
83 | module.exports = pump; |
\ | No newline at end of file |