UNPKG

4.08 kBJavaScriptView Raw
1var pull = require('pull-stream')
2var mux = require('../')
3var tape = require('tape')
4
5function delay(fun) {
6 return function (a, b) {
7 setImmediate(function () {
8 fun(a, b)
9 })
10 }
11}
12
13var client = {
14 hello : 'async',
15 goodbye: 'async',
16 stuff : 'source',
17 bstuff : 'source',
18 things : 'sink',
19 echo : 'duplex',
20 suchstreamwow: 'duplex'
21}
22
23module.exports = function (codec) {
24
25tape('outer stream ends after close', function (t) {
26
27 t.plan(4)
28
29 var A = mux(client, null, codec) ()
30 var B = mux(null, client, codec) ({
31 hello: function (a, cb) {
32 delay(cb)(null, 'hello, '+a)
33 },
34 goodbye: function(b, cb) {
35 delay(cb)(null, b)
36 }
37 })
38
39
40 A.hello('jim', function (err, value) {
41 if(err) throw err
42 console.log(value)
43 t.equal(value, 'hello, jim')
44 })
45
46 A.goodbye('bbb', function (err, value) {
47 if(err) throw err
48 console.log(value)
49 t.equal(value, 'bbb')
50 })
51
52 var bs = B.createStream()
53
54 var as = A.createStream()
55 pull(as, bs, as)
56
57 A.on('closed', function () {
58 t.ok(true)
59 })
60
61 A.close(function (err) {
62 t.notOk(err)
63 })
64
65})
66
67tape('close after uniplex streams end', function (t) {
68 t.plan(6)
69
70 var A = mux(client, null, codec) ()
71 var B = mux(null, client, codec) ({
72 stuff: function () {
73 t.ok(true)
74 return pull.values([1, 2, 3, 4, 5])
75 }
76 })
77
78 pull(A.stuff(), pull.collect(function (err, ary) {
79 t.deepEqual(ary, [1, 2, 3, 4, 5])
80 }))
81
82 var bs = B.createStream()
83 var as = A.createStream()
84 pull(as, bs, as)
85
86 B.on('closed', function () {
87 console.log('B emits "closed"')
88 t.ok(true)
89 })
90
91 A.on('closed', function () {
92 console.log('A emits "closed"')
93 t.ok(true)
94 })
95
96 B.close(function (err) {
97 console.log('B CLOSE')
98 t.notOk(err, 'bs is closed')
99 })
100
101 A.close(function (err) {
102 console.log('A CLOSE')
103 t.notOk(err, 'as is closed')
104 })
105})
106
107tape('close after uniplex streams end 2', function (t) {
108 t.plan(4)
109
110 var A = mux(client, null, codec) ()
111 var B = mux(null, client, codec) ({
112 things: function () {
113 t.ok(true)
114 return pull.collect(function (err, ary) {
115 t.deepEqual(ary, [1, 2, 3, 4, 5])
116 })
117 }
118 })
119
120 pull(pull.values([1, 2, 3, 4, 5]), A.things())
121
122 var bs = B.createStream()
123 var as = A.createStream()
124
125 pull(as, bs, as)
126
127 B.close(function (err) {
128 console.log('B CLOSE')
129 t.notOk(err, 'bs is closed')
130 })
131
132 A.close(function (err) {
133 console.log('A CLOSE')
134 t.notOk(err, 'as is closed')
135 })
136})
137
138tape('close after both sides of a duplex stream ends', function (t) {
139
140 t.plan(8)
141
142 var A = mux(client, null, codec) ()
143 var B = mux(null, client, codec) ({
144 echo: function () {
145 return pull.through(console.log, function () {
146 t.ok(true)
147 })
148 }
149 })
150
151 var bs = B.createStream()
152 var as = A.createStream()
153
154 pull(
155 pull.values([1, 2, 3, 4, 5]),
156 A.echo(),
157 pull.collect(function (err, ary) {
158 if(err) throw err
159 t.deepEqual(ary, [1,2,3,4,5])
160 })
161 )
162
163 pull(as, bs, as)
164
165 t.notOk(B.closed)
166 t.notOk(A.closed)
167
168 B.on('closed', function () {
169 t.ok(true)
170 })
171
172 A.on('closed', function () {
173 t.ok(true)
174 })
175
176 B.close(function (err) {
177 console.log('B CLOSE')
178 t.notOk(err, 'bs is closed')
179 })
180
181 A.close(function (err) {
182 console.log('A CLOSE', err)
183 t.notOk(err, 'as is closed')
184 })
185
186
187})
188
189tape('closed is emitted when stream disconnects', function (t) {
190 t.plan(2)
191 var A = mux(client, null) ()
192 A.on('closed', function (err) {
193 console.log('EMIT CLOSED')
194 t.notOk(err)
195 })
196 pull(pull.empty(), A.createStream(function (err) {
197 console.log(err)
198 t.notOk(err) //end of parent stream
199 }), pull.drain())
200})
201
202tape('closed is emitted with error when stream errors', function (t) {
203 t.plan(2)
204 var A = mux(client, null, codec) ()
205 A.on('closed', function (err) {
206 t.notOk(err)
207 })
208 pull(pull.empty(), A.createStream(function (err) {
209 console.log(err)
210 t.notOk(err) //end of parent stream
211 }), pull.drain())
212})
213
214}
215
216if(!module.parent)
217 module.exports(function (e) { return e })