UNPKG

7.17 kBJavaScriptView Raw
1var tape = require('tape')
2var through = require('through2')
3var concat = require('concat-stream')
4var stream = require('readable-stream')
5var net = require('net')
6var duplexify = require('./')
7
8var HELLO_WORLD = (Buffer.from && Buffer.from !== Uint8Array.from)
9 ? Buffer.from('hello world')
10 : new Buffer('hello world')
11
12tape('passthrough', function(t) {
13 t.plan(2)
14
15 var pt = through()
16 var dup = duplexify(pt, pt)
17
18 dup.end('hello world')
19 dup.on('finish', function() {
20 t.ok(true, 'should finish')
21 })
22 dup.pipe(concat(function(data) {
23 t.same(data.toString(), 'hello world', 'same in as out')
24 }))
25})
26
27tape('passthrough + double end', function(t) {
28 t.plan(2)
29
30 var pt = through()
31 var dup = duplexify(pt, pt)
32
33 dup.end('hello world')
34 dup.end()
35
36 dup.on('finish', function() {
37 t.ok(true, 'should finish')
38 })
39 dup.pipe(concat(function(data) {
40 t.same(data.toString(), 'hello world', 'same in as out')
41 }))
42})
43
44tape('async passthrough + end', function(t) {
45 t.plan(2)
46
47 var pt = through.obj({highWaterMark:1}, function(data, enc, cb) {
48 setTimeout(function() {
49 cb(null, data)
50 }, 100)
51 })
52
53 var dup = duplexify(pt, pt)
54
55 dup.write('hello ')
56 dup.write('world')
57 dup.end()
58
59 dup.on('finish', function() {
60 t.ok(true, 'should finish')
61 })
62 dup.pipe(concat(function(data) {
63 t.same(data.toString(), 'hello world', 'same in as out')
64 }))
65})
66
67tape('duplex', function(t) {
68 var readExpected = ['read-a', 'read-b', 'read-c']
69 var writeExpected = ['write-a', 'write-b', 'write-c']
70
71 t.plan(readExpected.length+writeExpected.length+2)
72
73 var readable = through.obj()
74 var writable = through.obj(function(data, enc, cb) {
75 t.same(data, writeExpected.shift(), 'onwrite should match')
76 cb()
77 })
78
79 var dup = duplexify.obj(writable, readable)
80
81 readExpected.slice().forEach(function(data) {
82 readable.write(data)
83 })
84 readable.end()
85
86 writeExpected.slice().forEach(function(data) {
87 dup.write(data)
88 })
89 dup.end()
90
91 dup.on('data', function(data) {
92 t.same(data, readExpected.shift(), 'ondata should match')
93 })
94 dup.on('end', function() {
95 t.ok(true, 'should end')
96 })
97 dup.on('finish', function() {
98 t.ok(true, 'should finish')
99 })
100})
101
102tape('async', function(t) {
103 var dup = duplexify()
104 var pt = through()
105
106 dup.pipe(concat(function(data) {
107 t.same(data.toString(), 'i was async', 'same in as out')
108 t.end()
109 }))
110
111 dup.write('i')
112 dup.write(' was ')
113 dup.end('async')
114
115 setTimeout(function() {
116 dup.setWritable(pt)
117 setTimeout(function() {
118 dup.setReadable(pt)
119 }, 50)
120 }, 50)
121})
122
123tape('destroy', function(t) {
124 t.plan(2)
125
126 var write = through()
127 var read = through()
128 var dup = duplexify(write, read)
129
130 write.destroy = function() {
131 t.ok(true, 'write destroyed')
132 }
133
134 dup.on('close', function() {
135 t.ok(true, 'close emitted')
136 })
137
138 dup.destroy()
139 dup.destroy() // should only work once
140 dup.end()
141})
142
143tape('destroy both', function(t) {
144 t.plan(3)
145
146 var write = through()
147 var read = through()
148 var dup = duplexify(write, read)
149
150 write.destroy = function() {
151 t.ok(true, 'write destroyed')
152 }
153
154 read.destroy = function() {
155 t.ok(true, 'read destroyed')
156 }
157
158 dup.on('close', function() {
159 t.ok(true, 'close emitted')
160 })
161
162 dup.destroy()
163 dup.destroy() // should only work once
164})
165
166tape('bubble read errors', function(t) {
167 t.plan(2)
168
169 var write = through()
170 var read = through()
171 var dup = duplexify(write, read)
172
173 dup.on('error', function(err) {
174 t.same(err.message, 'read-error', 'received read error')
175 })
176 dup.on('close', function() {
177 t.ok(true, 'close emitted')
178 })
179
180 read.emit('error', new Error('read-error'))
181 write.emit('error', new Error('write-error')) // only emit first error
182})
183
184tape('bubble write errors', function(t) {
185 t.plan(2)
186
187 var write = through()
188 var read = through()
189 var dup = duplexify(write, read)
190
191 dup.on('error', function(err) {
192 t.same(err.message, 'write-error', 'received write error')
193 })
194 dup.on('close', function() {
195 t.ok(true, 'close emitted')
196 })
197
198 write.emit('error', new Error('write-error'))
199 read.emit('error', new Error('read-error')) // only emit first error
200})
201
202tape('bubble errors from write()', function(t) {
203 t.plan(3)
204
205 var errored = false
206 var dup = duplexify(new stream.Writable({
207 write: function(chunk, enc, next) {
208 next(new Error('write-error'))
209 }
210 }))
211
212 dup.on('error', function(err) {
213 errored = true
214 t.same(err.message, 'write-error', 'received write error')
215 })
216 dup.on('close', function() {
217 t.pass('close emitted')
218 t.ok(errored, 'error was emitted before close')
219 })
220 dup.end('123')
221})
222
223tape('destroy while waiting for drain', function(t) {
224 t.plan(3)
225
226 var errored = false
227 var dup = duplexify(new stream.Writable({
228 highWaterMark: 0,
229 write: function() {}
230 }))
231
232 dup.on('error', function(err) {
233 errored = true
234 t.same(err.message, 'destroy-error', 'received destroy error')
235 })
236 dup.on('close', function() {
237 t.pass('close emitted')
238 t.ok(errored, 'error was emitted before close')
239 })
240 dup.write('123')
241 dup.destroy(new Error('destroy-error'))
242})
243
244tape('reset writable / readable', function(t) {
245 t.plan(3)
246
247 var toUpperCase = function(data, enc, cb) {
248 cb(null, data.toString().toUpperCase())
249 }
250
251 var passthrough = through()
252 var upper = through(toUpperCase)
253 var dup = duplexify(passthrough, passthrough)
254
255 dup.once('data', function(data) {
256 t.same(data.toString(), 'hello')
257 dup.setWritable(upper)
258 dup.setReadable(upper)
259 dup.once('data', function(data) {
260 t.same(data.toString(), 'HELLO')
261 dup.once('data', function(data) {
262 t.same(data.toString(), 'HI')
263 t.end()
264 })
265 })
266 dup.write('hello')
267 dup.write('hi')
268 })
269 dup.write('hello')
270})
271
272tape('cork', function(t) {
273 var passthrough = through()
274 var dup = duplexify(passthrough, passthrough)
275 var ok = false
276
277 dup.on('prefinish', function() {
278 dup.cork()
279 setTimeout(function() {
280 ok = true
281 dup.uncork()
282 }, 100)
283 })
284 dup.on('finish', function() {
285 t.ok(ok)
286 t.end()
287 })
288 dup.end()
289})
290
291tape('prefinish not twice', function(t) {
292 var passthrough = through()
293 var dup = duplexify(passthrough, passthrough)
294 var prefinished = false
295
296 dup.on('prefinish', function() {
297 t.ok(!prefinished, 'only prefinish once')
298 prefinished = true
299 })
300
301 dup.on('finish', function() {
302 t.end()
303 })
304
305 dup.end()
306})
307
308tape('close', function(t) {
309 var passthrough = through()
310 var dup = duplexify(passthrough, passthrough)
311
312 passthrough.emit('close')
313 dup.on('close', function() {
314 t.ok(true, 'should forward close')
315 t.end()
316 })
317})
318
319tape('works with node native streams (net)', function(t) {
320 t.plan(1)
321
322 var server = net.createServer(function(socket) {
323 var dup = duplexify(socket, socket)
324
325 dup.once('data', function(chunk) {
326 t.same(chunk, HELLO_WORLD)
327 server.close()
328 socket.end()
329 t.end()
330 })
331 })
332
333 server.listen(0, function () {
334 var socket = net.connect(server.address().port)
335 var dup = duplexify(socket, socket)
336
337 dup.write(HELLO_WORLD)
338 })
339})