1 | var pull = require('pull-stream')
|
2 | var mux = require('../')
|
3 | var tape = require('tape')
|
4 |
|
5 | function delay(fun) {
|
6 | return function (a, b) {
|
7 | setImmediate(function () {
|
8 | fun(a, b)
|
9 | })
|
10 | }
|
11 | }
|
12 |
|
13 | var client = {
|
14 | hello : 'async',
|
15 | goodbye: 'async',
|
16 | stuff : 'source',
|
17 | bstuff : 'source',
|
18 | things : 'sink',
|
19 | echo : 'duplex',
|
20 | suchstreamwow: 'duplex'
|
21 | }
|
22 |
|
23 | module.exports = function (codec) {
|
24 |
|
25 | tape('outer stream ends after close', function (t) {
|
26 |
|
27 | t.plan(4)
|
28 |
|
29 | var A = mux(client, null, codec) ()
|
30 | var B = mux(null, client, codec) ({
|
31 | hello: function (a, cb) {
|
32 | delay(cb)(null, 'hello, '+a)
|
33 | },
|
34 | goodbye: function(b, cb) {
|
35 | delay(cb)(null, b)
|
36 | }
|
37 | })
|
38 |
|
39 |
|
40 | A.hello('jim', function (err, value) {
|
41 | if(err) throw err
|
42 | console.log(value)
|
43 | t.equal(value, 'hello, jim')
|
44 | })
|
45 |
|
46 | A.goodbye('bbb', function (err, value) {
|
47 | if(err) throw err
|
48 | console.log(value)
|
49 | t.equal(value, 'bbb')
|
50 | })
|
51 |
|
52 | var bs = B.createStream()
|
53 |
|
54 | var as = A.createStream()
|
55 | pull(as, bs, as)
|
56 |
|
57 | A.on('closed', function () {
|
58 | t.ok(true)
|
59 | })
|
60 |
|
61 | A.close(function (err) {
|
62 | t.notOk(err)
|
63 | })
|
64 |
|
65 | })
|
66 |
|
67 | tape('close after uniplex streams end', function (t) {
|
68 | t.plan(6)
|
69 |
|
70 | var A = mux(client, null, codec) ()
|
71 | var B = mux(null, client, codec) ({
|
72 | stuff: function () {
|
73 | t.ok(true)
|
74 | return pull.values([1, 2, 3, 4, 5])
|
75 | }
|
76 | })
|
77 |
|
78 | pull(A.stuff(), pull.collect(function (err, ary) {
|
79 | t.deepEqual(ary, [1, 2, 3, 4, 5])
|
80 | }))
|
81 |
|
82 | var bs = B.createStream()
|
83 | var as = A.createStream()
|
84 | pull(as, bs, as)
|
85 |
|
86 | B.on('closed', function () {
|
87 | console.log('B emits "closed"')
|
88 | t.ok(true)
|
89 | })
|
90 |
|
91 | A.on('closed', function () {
|
92 | console.log('A emits "closed"')
|
93 | t.ok(true)
|
94 | })
|
95 |
|
96 | B.close(function (err) {
|
97 | console.log('B CLOSE')
|
98 | t.notOk(err, 'bs is closed')
|
99 | })
|
100 |
|
101 | A.close(function (err) {
|
102 | console.log('A CLOSE')
|
103 | t.notOk(err, 'as is closed')
|
104 | })
|
105 | })
|
106 |
|
107 | tape('close after uniplex streams end 2', function (t) {
|
108 | t.plan(4)
|
109 |
|
110 | var A = mux(client, null, codec) ()
|
111 | var B = mux(null, client, codec) ({
|
112 | things: function () {
|
113 | t.ok(true)
|
114 | return pull.collect(function (err, ary) {
|
115 | t.deepEqual(ary, [1, 2, 3, 4, 5])
|
116 | })
|
117 | }
|
118 | })
|
119 |
|
120 | pull(pull.values([1, 2, 3, 4, 5]), A.things())
|
121 |
|
122 | var bs = B.createStream()
|
123 | var as = A.createStream()
|
124 |
|
125 | pull(as, bs, as)
|
126 |
|
127 | B.close(function (err) {
|
128 | console.log('B CLOSE')
|
129 | t.notOk(err, 'bs is closed')
|
130 | })
|
131 |
|
132 | A.close(function (err) {
|
133 | console.log('A CLOSE')
|
134 | t.notOk(err, 'as is closed')
|
135 | })
|
136 | })
|
137 |
|
138 | tape('close after both sides of a duplex stream ends', function (t) {
|
139 |
|
140 | t.plan(8)
|
141 |
|
142 | var A = mux(client, null, codec) ()
|
143 | var B = mux(null, client, codec) ({
|
144 | echo: function () {
|
145 | return pull.through(console.log, function () {
|
146 | t.ok(true)
|
147 | })
|
148 | }
|
149 | })
|
150 |
|
151 | var bs = B.createStream()
|
152 | var as = A.createStream()
|
153 |
|
154 | pull(
|
155 | pull.values([1, 2, 3, 4, 5]),
|
156 | A.echo(),
|
157 | pull.collect(function (err, ary) {
|
158 | if(err) throw err
|
159 | t.deepEqual(ary, [1,2,3,4,5])
|
160 | })
|
161 | )
|
162 |
|
163 | pull(as, bs, as)
|
164 |
|
165 | t.notOk(B.closed)
|
166 | t.notOk(A.closed)
|
167 |
|
168 | B.on('closed', function () {
|
169 | t.ok(true)
|
170 | })
|
171 |
|
172 | A.on('closed', function () {
|
173 | t.ok(true)
|
174 | })
|
175 |
|
176 | B.close(function (err) {
|
177 | console.log('B CLOSE')
|
178 | t.notOk(err, 'bs is closed')
|
179 | })
|
180 |
|
181 | A.close(function (err) {
|
182 | console.log('A CLOSE', err)
|
183 | t.notOk(err, 'as is closed')
|
184 | })
|
185 |
|
186 |
|
187 | })
|
188 |
|
189 | tape('closed is emitted when stream disconnects', function (t) {
|
190 | t.plan(2)
|
191 | var A = mux(client, null) ()
|
192 | A.on('closed', function (err) {
|
193 | console.log('EMIT CLOSED')
|
194 | t.notOk(err)
|
195 | })
|
196 | pull(pull.empty(), A.createStream(function (err) {
|
197 | console.log(err)
|
198 | t.notOk(err)
|
199 | }), pull.drain())
|
200 | })
|
201 |
|
202 | tape('closed is emitted with error when stream errors', function (t) {
|
203 | t.plan(2)
|
204 | var A = mux(client, null, codec) ()
|
205 | A.on('closed', function (err) {
|
206 | t.notOk(err)
|
207 | })
|
208 | pull(pull.empty(), A.createStream(function (err) {
|
209 | console.log(err)
|
210 | t.notOk(err)
|
211 | }), pull.drain())
|
212 | })
|
213 |
|
214 | }
|
215 |
|
216 | if(!module.parent)
|
217 | module.exports(function (e) { return e })
|