UNPKG

2.58 kBJavaScriptView Raw
1import EE from 'events';
2import {inherits} from 'util';
3
4import {Duplex} from './duplex.js';
5import {Readable} from './readable.js';
6import {Writable} from './writable.js';
7import {Transform} from './transform.js';
8import {PassThrough} from './passthrough.js';
9inherits(Stream, EE);
10Stream.Readable = Readable;
11Stream.Writable = Writable;
12Stream.Duplex = Duplex;
13Stream.Transform = Transform;
14Stream.PassThrough = PassThrough;
15
16// Backwards-compat with node 0.4.x
17Stream.Stream = Stream;
18
19export default Stream;
20export {Readable,Writable,Duplex,Transform,PassThrough,Stream}
21
22// old-style streams. Note that the pipe method (the only relevant
23// part of this class) is overridden in the Readable class.
24
25function Stream() {
26 EE.call(this);
27}
28
29Stream.prototype.pipe = function(dest, options) {
30 var source = this;
31
32 function ondata(chunk) {
33 if (dest.writable) {
34 if (false === dest.write(chunk) && source.pause) {
35 source.pause();
36 }
37 }
38 }
39
40 source.on('data', ondata);
41
42 function ondrain() {
43 if (source.readable && source.resume) {
44 source.resume();
45 }
46 }
47
48 dest.on('drain', ondrain);
49
50 // If the 'end' option is not supplied, dest.end() will be called when
51 // source gets the 'end' or 'close' events. Only dest.end() once.
52 if (!dest._isStdio && (!options || options.end !== false)) {
53 source.on('end', onend);
54 source.on('close', onclose);
55 }
56
57 var didOnEnd = false;
58 function onend() {
59 if (didOnEnd) return;
60 didOnEnd = true;
61
62 dest.end();
63 }
64
65
66 function onclose() {
67 if (didOnEnd) return;
68 didOnEnd = true;
69
70 if (typeof dest.destroy === 'function') dest.destroy();
71 }
72
73 // don't leave dangling pipes when there are errors.
74 function onerror(er) {
75 cleanup();
76 if (EE.listenerCount(this, 'error') === 0) {
77 throw er; // Unhandled stream error in pipe.
78 }
79 }
80
81 source.on('error', onerror);
82 dest.on('error', onerror);
83
84 // remove all the event listeners that were added.
85 function cleanup() {
86 source.removeListener('data', ondata);
87 dest.removeListener('drain', ondrain);
88
89 source.removeListener('end', onend);
90 source.removeListener('close', onclose);
91
92 source.removeListener('error', onerror);
93 dest.removeListener('error', onerror);
94
95 source.removeListener('end', cleanup);
96 source.removeListener('close', cleanup);
97
98 dest.removeListener('close', cleanup);
99 }
100
101 source.on('end', cleanup);
102 source.on('close', cleanup);
103
104 dest.on('close', cleanup);
105
106 dest.emit('pipe', source);
107
108 // Allow for unix-like usage: A.pipe(B).pipe(C)
109 return dest;
110};