UNPKG

4.57 kBJavaScriptView Raw
1var tape = require('tape')
2var through = require('through2')
3var pumpify = require('./')
4var stream = require('stream')
5var duplexify = require('duplexify')
6
7tape('basic', function(t) {
8 t.plan(3)
9
10 var pipeline = pumpify(
11 through(function(data, enc, cb) {
12 t.same(data.toString(), 'hello')
13 cb(null, data.toString().toUpperCase())
14 }),
15 through(function(data, enc, cb) {
16 t.same(data.toString(), 'HELLO')
17 cb(null, data.toString().toLowerCase())
18 })
19 )
20
21 pipeline.write('hello')
22 pipeline.on('data', function(data) {
23 t.same(data.toString(), 'hello')
24 t.end()
25 })
26})
27
28tape('3 times', function(t) {
29 t.plan(4)
30
31 var pipeline = pumpify(
32 through(function(data, enc, cb) {
33 t.same(data.toString(), 'hello')
34 cb(null, data.toString().toUpperCase())
35 }),
36 through(function(data, enc, cb) {
37 t.same(data.toString(), 'HELLO')
38 cb(null, data.toString().toLowerCase())
39 }),
40 through(function(data, enc, cb) {
41 t.same(data.toString(), 'hello')
42 cb(null, data.toString().toUpperCase())
43 })
44 )
45
46 pipeline.write('hello')
47 pipeline.on('data', function(data) {
48 t.same(data.toString(), 'HELLO')
49 t.end()
50 })
51})
52
53tape('destroy', function(t) {
54 var test = through()
55 test.destroy = function() {
56 t.ok(true)
57 t.end()
58 }
59
60 var pipeline = pumpify(through(), test)
61
62 pipeline.destroy()
63})
64
65tape('close', function(t) {
66 var test = through()
67 var pipeline = pumpify(through(), test)
68
69 pipeline.on('error', function(err) {
70 t.same(err.message, 'lol')
71 t.end()
72 })
73
74 test.emit('error', new Error('lol'))
75})
76
77tape('end waits for last one', function(t) {
78 var ran = false
79
80 var a = through()
81 var b = through()
82 var c = through(function(data, enc, cb) {
83 setTimeout(function() {
84 ran = true
85 cb()
86 }, 100)
87 })
88
89 var pipeline = pumpify(a, b, c)
90
91 pipeline.write('foo')
92 pipeline.end(function() {
93 t.ok(ran)
94 t.end()
95 })
96
97 t.ok(!ran)
98})
99
100tape('always wait for finish', function(t) {
101 var a = new stream.Readable()
102 a._read = function() {}
103 a.push('hello')
104
105 var pipeline = pumpify(a, through(), through())
106 var ran = false
107
108 pipeline.on('finish', function() {
109 t.ok(ran)
110 t.end()
111 })
112
113 setTimeout(function() {
114 ran = true
115 a.push(null)
116 }, 100)
117})
118
119tape('async', function(t) {
120 var pipeline = pumpify()
121
122 t.plan(4)
123
124 pipeline.write('hello')
125 pipeline.on('data', function(data) {
126 t.same(data.toString(), 'HELLO')
127 t.end()
128 })
129
130 setTimeout(function() {
131 pipeline.setPipeline(
132 through(function(data, enc, cb) {
133 t.same(data.toString(), 'hello')
134 cb(null, data.toString().toUpperCase())
135 }),
136 through(function(data, enc, cb) {
137 t.same(data.toString(), 'HELLO')
138 cb(null, data.toString().toLowerCase())
139 }),
140 through(function(data, enc, cb) {
141 t.same(data.toString(), 'hello')
142 cb(null, data.toString().toUpperCase())
143 })
144 )
145 }, 100)
146})
147
148tape('early destroy', function(t) {
149 var a = through()
150 var b = through()
151 var c = through()
152
153 b.destroy = function() {
154 t.ok(true)
155 t.end()
156 }
157
158 var pipeline = pumpify()
159
160 pipeline.destroy()
161 setTimeout(function() {
162 pipeline.setPipeline(a, b, c)
163 }, 100)
164})
165
166tape('preserves error', function (t) {
167 var a = through()
168 var b = through(function (data, enc, cb) {
169 cb(new Error('stop'))
170 })
171 var c = through()
172 var s = pumpify()
173
174 s.on('error', function (err) {
175 t.same(err.message, 'stop')
176 t.end()
177 })
178
179 s.setPipeline(a, b, c)
180 s.resume()
181 s.write('hi')
182})
183
184tape('preserves error again', function (t) {
185 var ws = new stream.Writable()
186 var rs = new stream.Readable({highWaterMark: 16})
187
188 ws._write = function (data, enc, cb) {
189 cb(null)
190 }
191
192 rs._read = function () {
193 process.nextTick(function () {
194 rs.push('hello world')
195 })
196 }
197
198 var pumpifyErr = pumpify(
199 through(),
200 through(function(chunk, _, cb) {
201 cb(new Error('test'))
202 }),
203 ws
204 )
205
206 rs.pipe(pumpifyErr)
207 .on('error', function (err) {
208 t.ok(err)
209 t.ok(err.message !== 'premature close', 'does not close with premature close')
210 t.end()
211 })
212})
213
214tape('returns error from duplexify', function (t) {
215 var a = through()
216 var b = duplexify()
217 var s = pumpify()
218
219 s.setPipeline(a, b)
220
221 s.on('error', function (err) {
222 t.same(err.message, 'stop')
223 t.end()
224 })
225
226 s.write('data')
227 // Test passes if `.end()` is not called
228 s.end()
229
230 b.setWritable(through())
231
232 setImmediate(function () {
233 b.destroy(new Error('stop'))
234 })
235})