UNPKG

3.21 kBJavaScriptView Raw
1var Stream = require('stream')
2// through
3//
4// a stream that does nothing but re-emit the input.
5// useful for aggregating a series of changing but not ending streams into one stream)
6
7exports = module.exports = through
8through.through = through
9
10exports.from = from
11
12//create a readable writable stream.
13
14function through (write, end) {
15 write = write || function (data) { this.emit('data', data) }
16 end = (
17 'sync'== end || !end
18 //use sync end. (default)
19 ? function () { this.emit('end') }
20 : 'async' == end || end === true
21 //use async end.
22 //must eventually call drain if paused.
23 //else will not end.
24 ? function () {
25 if(!this.paused)
26 return this.emit('end')
27 var self = this
28 this.once('drain', function () {
29 self.emit('end')
30 })
31 }
32 //use custom end function
33 : end
34 )
35 var ended = false, destroyed = false
36 var stream = new Stream()
37 stream.readable = stream.writable = true
38 stream.paused = false
39 stream.write = function (data) {
40 write.call(this, data)
41 return !stream.paused
42 }
43 //this will be registered as the first 'end' listener
44 //must call destroy next tick, to make sure we're after any
45 //stream piped from here.
46 stream.on('end', function () {
47 stream.readable = false
48 if(!stream.writable)
49 process.nextTick(function () {
50 stream.destroy()
51 })
52 })
53
54 stream.end = function (data) {
55 if(ended) throw new Error('cannot call end twice')
56 ended = true
57 if(arguments.length) stream.write(data)
58 this.writable = false
59 end.call(this)
60 if(!this.readable)
61 this.destroy()
62 }
63 stream.destroy = function () {
64 if(destroyed) return
65 destroyed = true
66 ended = true
67 stream.writable = stream.readable = false
68 stream.emit('close')
69 }
70 stream.pause = function () {
71 stream.paused = true
72 }
73 stream.resume = function () {
74 if(stream.paused) {
75 stream.paused = false
76 stream.emit('drain')
77 }
78 }
79 return stream
80}
81
82//create a readable stream.
83
84function from (source) {
85 if(Array.isArray(source))
86 return from (function (i) {
87 if(source.length)
88 this.emit('data', source.shift())
89 else
90 this.emit('end')
91 return true
92 })
93
94 var s = new Stream(), i = 0, ended = false, started = false
95 s.readable = true
96 s.writable = false
97 s.paused = false
98 s.pause = function () {
99 started = true
100 s.paused = true
101 }
102 function next () {
103 var n = 0, r = false
104 if(ended) return
105 while(!ended && !s.paused && source.call(s, i, function () {
106 if(!n++ && !s.ended && !s.paused)
107 next()
108 }))
109 ;
110 }
111 s.resume = function () {
112 started = true
113 s.paused = false
114 next()
115 }
116 s.on('end', function () {
117 ended = true
118 s.readable = false
119 process.nextTick(s.destroy)
120 })
121 s.destroy = function () {
122 ended = true
123 s.emit('close')
124 }
125 /*
126 by default, the stream will start emitting at nextTick
127 if you want, you can pause it, after pipeing.
128 you can also resume before next tick, and that will also
129 work.
130 */
131 process.nextTick(function () {
132 if(!started) s.resume()
133 })
134 return s
135}
136
137
138//create a writable stream.
139function to () {
140
141}