1 | var tape = require('tape')
|
2 | var through = require('through2')
|
3 | var concat = require('concat-stream')
|
4 | var stream = require('readable-stream')
|
5 | var net = require('net')
|
6 | var duplexify = require('./')
|
7 |
|
8 | var HELLO_WORLD = (Buffer.from && Buffer.from !== Uint8Array.from)
|
9 | ? Buffer.from('hello world')
|
10 | : new Buffer('hello world')
|
11 |
|
12 | tape('passthrough', function(t) {
|
13 | t.plan(2)
|
14 |
|
15 | var pt = through()
|
16 | var dup = duplexify(pt, pt)
|
17 |
|
18 | dup.end('hello world')
|
19 | dup.on('finish', function() {
|
20 | t.ok(true, 'should finish')
|
21 | })
|
22 | dup.pipe(concat(function(data) {
|
23 | t.same(data.toString(), 'hello world', 'same in as out')
|
24 | }))
|
25 | })
|
26 |
|
27 | tape('passthrough + double end', function(t) {
|
28 | t.plan(2)
|
29 |
|
30 | var pt = through()
|
31 | var dup = duplexify(pt, pt)
|
32 |
|
33 | dup.end('hello world')
|
34 | dup.end()
|
35 |
|
36 | dup.on('finish', function() {
|
37 | t.ok(true, 'should finish')
|
38 | })
|
39 | dup.pipe(concat(function(data) {
|
40 | t.same(data.toString(), 'hello world', 'same in as out')
|
41 | }))
|
42 | })
|
43 |
|
44 | tape('async passthrough + end', function(t) {
|
45 | t.plan(2)
|
46 |
|
47 | var pt = through.obj({highWaterMark:1}, function(data, enc, cb) {
|
48 | setTimeout(function() {
|
49 | cb(null, data)
|
50 | }, 100)
|
51 | })
|
52 |
|
53 | var dup = duplexify(pt, pt)
|
54 |
|
55 | dup.write('hello ')
|
56 | dup.write('world')
|
57 | dup.end()
|
58 |
|
59 | dup.on('finish', function() {
|
60 | t.ok(true, 'should finish')
|
61 | })
|
62 | dup.pipe(concat(function(data) {
|
63 | t.same(data.toString(), 'hello world', 'same in as out')
|
64 | }))
|
65 | })
|
66 |
|
67 | tape('duplex', function(t) {
|
68 | var readExpected = ['read-a', 'read-b', 'read-c']
|
69 | var writeExpected = ['write-a', 'write-b', 'write-c']
|
70 |
|
71 | t.plan(readExpected.length+writeExpected.length+2)
|
72 |
|
73 | var readable = through.obj()
|
74 | var writable = through.obj(function(data, enc, cb) {
|
75 | t.same(data, writeExpected.shift(), 'onwrite should match')
|
76 | cb()
|
77 | })
|
78 |
|
79 | var dup = duplexify.obj(writable, readable)
|
80 |
|
81 | readExpected.slice().forEach(function(data) {
|
82 | readable.write(data)
|
83 | })
|
84 | readable.end()
|
85 |
|
86 | writeExpected.slice().forEach(function(data) {
|
87 | dup.write(data)
|
88 | })
|
89 | dup.end()
|
90 |
|
91 | dup.on('data', function(data) {
|
92 | t.same(data, readExpected.shift(), 'ondata should match')
|
93 | })
|
94 | dup.on('end', function() {
|
95 | t.ok(true, 'should end')
|
96 | })
|
97 | dup.on('finish', function() {
|
98 | t.ok(true, 'should finish')
|
99 | })
|
100 | })
|
101 |
|
102 | tape('async', function(t) {
|
103 | var dup = duplexify()
|
104 | var pt = through()
|
105 |
|
106 | dup.pipe(concat(function(data) {
|
107 | t.same(data.toString(), 'i was async', 'same in as out')
|
108 | t.end()
|
109 | }))
|
110 |
|
111 | dup.write('i')
|
112 | dup.write(' was ')
|
113 | dup.end('async')
|
114 |
|
115 | setTimeout(function() {
|
116 | dup.setWritable(pt)
|
117 | setTimeout(function() {
|
118 | dup.setReadable(pt)
|
119 | }, 50)
|
120 | }, 50)
|
121 | })
|
122 |
|
123 | tape('destroy', function(t) {
|
124 | t.plan(2)
|
125 |
|
126 | var write = through()
|
127 | var read = through()
|
128 | var dup = duplexify(write, read)
|
129 |
|
130 | write.destroy = function() {
|
131 | t.ok(true, 'write destroyed')
|
132 | }
|
133 |
|
134 | dup.on('close', function() {
|
135 | t.ok(true, 'close emitted')
|
136 | })
|
137 |
|
138 | dup.destroy()
|
139 | dup.destroy()
|
140 | dup.end()
|
141 | })
|
142 |
|
143 | tape('destroy both', function(t) {
|
144 | t.plan(3)
|
145 |
|
146 | var write = through()
|
147 | var read = through()
|
148 | var dup = duplexify(write, read)
|
149 |
|
150 | write.destroy = function() {
|
151 | t.ok(true, 'write destroyed')
|
152 | }
|
153 |
|
154 | read.destroy = function() {
|
155 | t.ok(true, 'read destroyed')
|
156 | }
|
157 |
|
158 | dup.on('close', function() {
|
159 | t.ok(true, 'close emitted')
|
160 | })
|
161 |
|
162 | dup.destroy()
|
163 | dup.destroy()
|
164 | })
|
165 |
|
166 | tape('bubble read errors', function(t) {
|
167 | t.plan(2)
|
168 |
|
169 | var write = through()
|
170 | var read = through()
|
171 | var dup = duplexify(write, read)
|
172 |
|
173 | dup.on('error', function(err) {
|
174 | t.same(err.message, 'read-error', 'received read error')
|
175 | })
|
176 | dup.on('close', function() {
|
177 | t.ok(true, 'close emitted')
|
178 | })
|
179 |
|
180 | read.emit('error', new Error('read-error'))
|
181 | write.emit('error', new Error('write-error'))
|
182 | })
|
183 |
|
184 | tape('bubble write errors', function(t) {
|
185 | t.plan(2)
|
186 |
|
187 | var write = through()
|
188 | var read = through()
|
189 | var dup = duplexify(write, read)
|
190 |
|
191 | dup.on('error', function(err) {
|
192 | t.same(err.message, 'write-error', 'received write error')
|
193 | })
|
194 | dup.on('close', function() {
|
195 | t.ok(true, 'close emitted')
|
196 | })
|
197 |
|
198 | write.emit('error', new Error('write-error'))
|
199 | read.emit('error', new Error('read-error'))
|
200 | })
|
201 |
|
202 | tape('bubble errors from write()', function(t) {
|
203 | t.plan(3)
|
204 |
|
205 | var errored = false
|
206 | var dup = duplexify(new stream.Writable({
|
207 | write: function(chunk, enc, next) {
|
208 | next(new Error('write-error'))
|
209 | }
|
210 | }))
|
211 |
|
212 | dup.on('error', function(err) {
|
213 | errored = true
|
214 | t.same(err.message, 'write-error', 'received write error')
|
215 | })
|
216 | dup.on('close', function() {
|
217 | t.pass('close emitted')
|
218 | t.ok(errored, 'error was emitted before close')
|
219 | })
|
220 | dup.end('123')
|
221 | })
|
222 |
|
223 | tape('destroy while waiting for drain', function(t) {
|
224 | t.plan(3)
|
225 |
|
226 | var errored = false
|
227 | var dup = duplexify(new stream.Writable({
|
228 | highWaterMark: 0,
|
229 | write: function() {}
|
230 | }))
|
231 |
|
232 | dup.on('error', function(err) {
|
233 | errored = true
|
234 | t.same(err.message, 'destroy-error', 'received destroy error')
|
235 | })
|
236 | dup.on('close', function() {
|
237 | t.pass('close emitted')
|
238 | t.ok(errored, 'error was emitted before close')
|
239 | })
|
240 | dup.write('123')
|
241 | dup.destroy(new Error('destroy-error'))
|
242 | })
|
243 |
|
244 | tape('reset writable / readable', function(t) {
|
245 | t.plan(3)
|
246 |
|
247 | var toUpperCase = function(data, enc, cb) {
|
248 | cb(null, data.toString().toUpperCase())
|
249 | }
|
250 |
|
251 | var passthrough = through()
|
252 | var upper = through(toUpperCase)
|
253 | var dup = duplexify(passthrough, passthrough)
|
254 |
|
255 | dup.once('data', function(data) {
|
256 | t.same(data.toString(), 'hello')
|
257 | dup.setWritable(upper)
|
258 | dup.setReadable(upper)
|
259 | dup.once('data', function(data) {
|
260 | t.same(data.toString(), 'HELLO')
|
261 | dup.once('data', function(data) {
|
262 | t.same(data.toString(), 'HI')
|
263 | t.end()
|
264 | })
|
265 | })
|
266 | dup.write('hello')
|
267 | dup.write('hi')
|
268 | })
|
269 | dup.write('hello')
|
270 | })
|
271 |
|
272 | tape('cork', function(t) {
|
273 | var passthrough = through()
|
274 | var dup = duplexify(passthrough, passthrough)
|
275 | var ok = false
|
276 |
|
277 | dup.on('prefinish', function() {
|
278 | dup.cork()
|
279 | setTimeout(function() {
|
280 | ok = true
|
281 | dup.uncork()
|
282 | }, 100)
|
283 | })
|
284 | dup.on('finish', function() {
|
285 | t.ok(ok)
|
286 | t.end()
|
287 | })
|
288 | dup.end()
|
289 | })
|
290 |
|
291 | tape('prefinish not twice', function(t) {
|
292 | var passthrough = through()
|
293 | var dup = duplexify(passthrough, passthrough)
|
294 | var prefinished = false
|
295 |
|
296 | dup.on('prefinish', function() {
|
297 | t.ok(!prefinished, 'only prefinish once')
|
298 | prefinished = true
|
299 | })
|
300 |
|
301 | dup.on('finish', function() {
|
302 | t.end()
|
303 | })
|
304 |
|
305 | dup.end()
|
306 | })
|
307 |
|
308 | tape('close', function(t) {
|
309 | var passthrough = through()
|
310 | var dup = duplexify(passthrough, passthrough)
|
311 |
|
312 | passthrough.emit('close')
|
313 | dup.on('close', function() {
|
314 | t.ok(true, 'should forward close')
|
315 | t.end()
|
316 | })
|
317 | })
|
318 |
|
319 | tape('works with node native streams (net)', function(t) {
|
320 | t.plan(1)
|
321 |
|
322 | var server = net.createServer(function(socket) {
|
323 | var dup = duplexify(socket, socket)
|
324 |
|
325 | dup.once('data', function(chunk) {
|
326 | t.same(chunk, HELLO_WORLD)
|
327 | server.close()
|
328 | socket.end()
|
329 | t.end()
|
330 | })
|
331 | })
|
332 |
|
333 | server.listen(0, function () {
|
334 | var socket = net.connect(server.address().port)
|
335 | var dup = duplexify(socket, socket)
|
336 |
|
337 | dup.write(HELLO_WORLD)
|
338 | })
|
339 | })
|