1 | var Stream = require('stream')
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 | exports = module.exports = through
|
8 | through.through = through
|
9 |
|
10 | exports.from = from
|
11 |
|
12 |
|
13 |
|
14 | function through (write, end) {
|
15 | write = write || function (data) { this.emit('data', data) }
|
16 | end = (
|
17 | 'sync'== end || !end
|
18 |
|
19 | ? function () { this.emit('end') }
|
20 | : 'async' == end || end === true
|
21 |
|
22 |
|
23 |
|
24 | ? function () {
|
25 | if(!this.paused)
|
26 | return this.emit('end')
|
27 | var self = this
|
28 | this.once('drain', function () {
|
29 | self.emit('end')
|
30 | })
|
31 | }
|
32 |
|
33 | : end
|
34 | )
|
35 | var ended = false, destroyed = false
|
36 | var stream = new Stream()
|
37 | stream.readable = stream.writable = true
|
38 | stream.paused = false
|
39 | stream.write = function (data) {
|
40 | write.call(this, data)
|
41 | return !stream.paused
|
42 | }
|
43 |
|
44 |
|
45 |
|
46 | stream.on('end', function () {
|
47 | stream.readable = false
|
48 | if(!stream.writable)
|
49 | process.nextTick(function () {
|
50 | stream.destroy()
|
51 | })
|
52 | })
|
53 |
|
54 | stream.end = function (data) {
|
55 | if(ended) throw new Error('cannot call end twice')
|
56 | ended = true
|
57 | if(arguments.length) stream.write(data)
|
58 | this.writable = false
|
59 | end.call(this)
|
60 | if(!this.readable)
|
61 | this.destroy()
|
62 | }
|
63 | stream.destroy = function () {
|
64 | if(destroyed) return
|
65 | destroyed = true
|
66 | ended = true
|
67 | stream.writable = stream.readable = false
|
68 | stream.emit('close')
|
69 | }
|
70 | stream.pause = function () {
|
71 | stream.paused = true
|
72 | }
|
73 | stream.resume = function () {
|
74 | if(stream.paused) {
|
75 | stream.paused = false
|
76 | stream.emit('drain')
|
77 | }
|
78 | }
|
79 | return stream
|
80 | }
|
81 |
|
82 |
|
83 |
|
84 | function from (source) {
|
85 | if(Array.isArray(source))
|
86 | return from (function (i) {
|
87 | if(source.length)
|
88 | this.emit('data', source.shift())
|
89 | else
|
90 | this.emit('end')
|
91 | return true
|
92 | })
|
93 |
|
94 | var s = new Stream(), i = 0, ended = false, started = false
|
95 | s.readable = true
|
96 | s.writable = false
|
97 | s.paused = false
|
98 | s.pause = function () {
|
99 | started = true
|
100 | s.paused = true
|
101 | }
|
102 | function next () {
|
103 | var n = 0, r = false
|
104 | if(ended) return
|
105 | while(!ended && !s.paused && source.call(s, i, function () {
|
106 | if(!n++ && !s.ended && !s.paused)
|
107 | next()
|
108 | }))
|
109 | ;
|
110 | }
|
111 | s.resume = function () {
|
112 | started = true
|
113 | s.paused = false
|
114 | next()
|
115 | }
|
116 | s.on('end', function () {
|
117 | ended = true
|
118 | s.readable = false
|
119 | process.nextTick(s.destroy)
|
120 | })
|
121 | s.destroy = function () {
|
122 | ended = true
|
123 | s.emit('close')
|
124 | }
|
125 | |
126 |
|
127 |
|
128 |
|
129 |
|
130 |
|
131 | process.nextTick(function () {
|
132 | if(!started) s.resume()
|
133 | })
|
134 | return s
|
135 | }
|
136 |
|
137 |
|
138 |
|
139 | function to () {
|
140 |
|
141 | }
|