1 | var tape = require('tape')
|
2 | var pull = require('pull-stream')
|
3 | var pushable = require('pull-pushable')
|
4 | var mux = require('../')
|
5 |
|
6 | module.exports = function(serializer, buffers) {
|
7 |
|
8 | var b = buffers
|
9 | ? function (b) {
|
10 | return (
|
11 | Buffer.isBuffer(b)
|
12 |
|
13 |
|
14 | ? JSON.parse(JSON.stringify(b))
|
15 | : b
|
16 | )
|
17 | }
|
18 | : function (b) { return b }
|
19 |
|
20 | var client = {
|
21 | hello : 'async',
|
22 | goodbye: 'async',
|
23 | stuff : 'source',
|
24 | bstuff : 'source',
|
25 | things : 'sink',
|
26 | suchstreamwow: 'duplex'
|
27 | }
|
28 |
|
29 | tape('async', function (t) {
|
30 |
|
31 | var A = mux(client, null, serializer) ()
|
32 | var B = mux(null, client, serializer) ({
|
33 | hello: function (a, cb) {
|
34 | cb(null, 'hello, '+a)
|
35 | },
|
36 | goodbye: function(b, cb) {
|
37 | cb(null, b)
|
38 | }
|
39 | })
|
40 |
|
41 | var s = A.createStream()
|
42 | pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s)
|
43 |
|
44 | A.hello('world', function (err, value) {
|
45 | if(err) throw err
|
46 | console.log(value)
|
47 | t.equal(value, 'hello, world')
|
48 |
|
49 | var buf = new Buffer([0, 1, 2, 3, 4])
|
50 | A.goodbye(buf, function (err, buf2) {
|
51 | if (err) throw err
|
52 | console.log(b(buf2), b(buf))
|
53 | t.deepEqual(b(buf2), b(buf))
|
54 | t.end()
|
55 | })
|
56 | })
|
57 |
|
58 |
|
59 | })
|
60 |
|
61 | tape('source', function (t) {
|
62 |
|
63 | var expected = [
|
64 | new Buffer([0, 1]),
|
65 | new Buffer([2, 3]),
|
66 | new Buffer([4, 5])
|
67 | ]
|
68 |
|
69 | var A = mux(client, null, serializer) ()
|
70 | var B = mux(null, client, serializer) ({
|
71 | stuff: function (b) {
|
72 | return pull.values([1, 2, 3, 4, 5].map(function (a) {
|
73 | return a * b
|
74 | }))
|
75 | },
|
76 | bstuff: function() {
|
77 | return pull.values(expected)
|
78 | }
|
79 | })
|
80 |
|
81 | var s = A.createStream()
|
82 | pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s)
|
83 |
|
84 | pull(A.stuff(5), pull.collect(function (err, ary) {
|
85 | if(err) throw err
|
86 | console.log(ary)
|
87 | t.deepEqual(ary, [5, 10, 15, 20, 25])
|
88 |
|
89 | pull(A.bstuff(), pull.collect(function(err, ary) {
|
90 | if (err) throw err
|
91 | console.log(ary.map(b), expected.map(b))
|
92 | t.deepEqual(ary.map(b), expected.map(b))
|
93 | t.end()
|
94 | }))
|
95 | }))
|
96 |
|
97 | })
|
98 |
|
99 | tape('sync', function (t) {
|
100 | var client = {
|
101 | syncOk : 'sync',
|
102 | syncErr: 'sync'
|
103 | }
|
104 |
|
105 | var A = mux(client, null, serializer) ()
|
106 | var B = mux(null, client, serializer) ({
|
107 | syncOk: function (a) {
|
108 | return {okay: a}
|
109 | },
|
110 | syncErr: function(b) {
|
111 | throw new Error('test error:'+b)
|
112 | }
|
113 | })
|
114 |
|
115 | var s = A.createStream()
|
116 | pull(s, B.createStream(), s)
|
117 |
|
118 | A.syncOk(true, function (err, value) {
|
119 | t.deepEqual(value, {okay: true})
|
120 | A.syncErr('blah', function (err) {
|
121 | t.equal(err.message, 'test error:blah')
|
122 | t.end()
|
123 |
|
124 | })
|
125 | })
|
126 |
|
127 | })
|
128 |
|
129 | tape('sink', function (t) {
|
130 |
|
131 | var A = mux(client, null, serializer) ()
|
132 | var B = mux(null, client, serializer) ({
|
133 | things: function (someParam) {
|
134 | return pull.collect(function(err, values) {
|
135 | if (err) throw err
|
136 | t.equal(someParam, 5)
|
137 | t.deepEqual(values, [1, 2, 3, 4, 5])
|
138 | t.end()
|
139 | })
|
140 | }
|
141 | })
|
142 |
|
143 | var s = A.createStream()
|
144 | pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s)
|
145 | pull(pull.values([1,2,3,4,5]), A.things(5))
|
146 | })
|
147 |
|
148 | tape('duplex', function (t) {
|
149 |
|
150 | var A = mux(client, null, serializer) ()
|
151 | var B = mux(null, client, serializer) ({
|
152 | suchstreamwow: function (someParam) {
|
153 |
|
154 | t.equal(someParam, 5)
|
155 |
|
156 |
|
157 |
|
158 |
|
159 | var nextValue = 0
|
160 | var p = pushable()
|
161 | for (var i=0; i < 5; i++)
|
162 | p.push(i)
|
163 | return {
|
164 | source: p,
|
165 | sink: pull.drain(function(value) {
|
166 | console.log('************', nextValue, value)
|
167 | t.equal(nextValue, value)
|
168 | nextValue++
|
169 | if (nextValue == 5)
|
170 | t.end()
|
171 | })
|
172 | }
|
173 | }
|
174 | })
|
175 |
|
176 | var s = A.createStream()
|
177 | pull(s, pull.through(console.log.bind(console, 'IN')), B.createStream(), pull.through(console.log.bind(console,'OUT')), s)
|
178 | var dup = A.suchstreamwow(5)
|
179 | pull(dup, dup)
|
180 | })
|
181 |
|
182 | tape('async - error1', function (t) {
|
183 | var A = mux(client, null) ()
|
184 |
|
185 | var s = A.createStream()
|
186 |
|
187 | A.hello('world', function (err, value) {
|
188 | t.ok(err)
|
189 | t.end()
|
190 | })
|
191 |
|
192 | s.sink(function (abort, cb) { cb(true) })
|
193 | })
|
194 |
|
195 | tape('async - error2', function (t) {
|
196 | var A = mux(client, null) ()
|
197 |
|
198 | var s = A.createStream()
|
199 |
|
200 | A.hello('world', function (err, value) {
|
201 | console.log('CB!')
|
202 | t.ok(err)
|
203 | t.end()
|
204 | })
|
205 |
|
206 | s.source(true, function () {})
|
207 | })
|
208 |
|
209 | tape('buffer calls before stream is created', function (t) {
|
210 | var A = mux(client, null) ()
|
211 | var B = mux(null, client) ({
|
212 | hello: function (a, cb) {
|
213 | cb(null, 'hello, '+a)
|
214 | },
|
215 | })
|
216 |
|
217 | A.hello('world', function (err, value) {
|
218 | if(err) throw err
|
219 | console.log(value)
|
220 | t.equal(value, 'hello, world')
|
221 | t.end()
|
222 | })
|
223 |
|
224 | var s = A.createStream()
|
225 | pull(s, B.createStream(), s)
|
226 |
|
227 | })
|
228 |
|
229 |
|
230 |
|
231 |
|
232 |
|
233 |
|
234 |
|
235 |
|
236 |
|
237 |
|
238 |
|
239 |
|
240 |
|
241 |
|
242 |
|
243 |
|
244 |
|
245 |
|
246 |
|
247 |
|
248 |
|
249 |
|
250 |
|
251 |
|
252 |
|
253 |
|
254 |
|
255 |
|
256 |
|
257 | tape('recover error written to outer stream', function (t) {
|
258 |
|
259 | var A = mux(client, null) ()
|
260 | var err = new Error('testing errors')
|
261 | var s = A.createStream(function (_err) {
|
262 | t.equal(_err, err)
|
263 | t.end()
|
264 | })
|
265 |
|
266 | pull(pull.error(err), s.sink)
|
267 |
|
268 | })
|
269 |
|
270 | tape('recover error when outer stream aborted', function (t) {
|
271 |
|
272 | var A = mux(client, null) ()
|
273 | var err = new Error('testing errors')
|
274 | var s = A.createStream(function (_err) {
|
275 | t.equal(_err, err)
|
276 | t.end()
|
277 | })
|
278 |
|
279 | s.source(err, function () {})
|
280 | })
|
281 |
|
282 | tape('cb when stream is ended', function (t) {
|
283 |
|
284 | var A = mux(client, null) ()
|
285 | var s = A.createStream(function (_err) {
|
286 | t.equal(_err, null)
|
287 | t.end()
|
288 | })
|
289 |
|
290 | pull(pull.empty(), s.sink)
|
291 |
|
292 | })
|
293 |
|
294 | tape('cb when stream is aborted', function (t) {
|
295 |
|
296 | var A = mux(client, null) ()
|
297 | var s = A.createStream(function (_err) {
|
298 | t.equal(_err, null)
|
299 | t.end()
|
300 | })
|
301 |
|
302 | s.source(true, function () {})
|
303 | })
|
304 |
|
305 | var client2 = {
|
306 | salutations: {
|
307 | hello: 'async',
|
308 | goodbye: 'async'
|
309 | }
|
310 | }
|
311 |
|
312 | tape('nested methods', function (t) {
|
313 | var A = mux(client2, null, serializer) ()
|
314 | var B = mux(null, client2, serializer) ({
|
315 | salutations: {
|
316 | hello: function (a, cb) {
|
317 | cb(null, 'hello, '+a)
|
318 | },
|
319 | goodbye: function(b, cb) {
|
320 | cb(null, b)
|
321 | }
|
322 | }
|
323 | })
|
324 |
|
325 | var s = A.createStream()
|
326 | pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s)
|
327 |
|
328 | A.salutations.hello('world', function (err, value) {
|
329 | if(err) throw err
|
330 | console.log(value)
|
331 | t.equal(value, 'hello, world')
|
332 |
|
333 | var buf = new Buffer([0, 1, 2, 3, 4])
|
334 | A.salutations.goodbye(buf, function (err, buf2) {
|
335 | if (err) throw err
|
336 | t.deepEqual( b(buf2), b(buf))
|
337 | t.end()
|
338 | })
|
339 | })
|
340 |
|
341 | })
|
342 |
|
343 | tape('sink', function (t) {
|
344 | var A = mux(client, null, serializer)()
|
345 | var B = mux(null, client, serializer)({
|
346 | things: function (len) {
|
347 | return pull.collect(function (err, ary) {
|
348 | t.equal(ary.length, len)
|
349 | })
|
350 | }
|
351 | })
|
352 |
|
353 | var s = A.createStream(); pull(s, B.createStream(), s)
|
354 |
|
355 | pull(pull.values([1,2,3]), A.things(3, function (err) {
|
356 | if(err) throw err
|
357 | t.end()
|
358 | }))
|
359 | })
|
360 |
|
361 | tape('sink - abort', function (t) {
|
362 | var err = new Error('test abort error')
|
363 |
|
364 | var A = mux(client, null, serializer)()
|
365 | var B = mux(null, client, serializer)({
|
366 | things: function (len) {
|
367 | return function (read) {
|
368 | read(err, function () {})
|
369 | }
|
370 | }
|
371 | })
|
372 |
|
373 | var s = A.createStream(); pull(s, B.createStream(), s)
|
374 |
|
375 | pull(pull.values([1,2,3]), A.things(3, function (_err) {
|
376 | t.ok(_err)
|
377 | t.equal(_err.message, err.message)
|
378 | t.end()
|
379 | }))
|
380 |
|
381 | })
|
382 |
|
383 | }
|
384 |
|
385 |
|
386 | if(!module.parent)
|
387 | module.exports(function (e) { return e });
|