UNPKG

8.97 kBJavaScriptView Raw
1var tape = require('tape')
2var pull = require('pull-stream')
3var pushable = require('pull-pushable')
4var mux = require('../')
5
6module.exports = function(serializer, buffers) {
7
8 var b = buffers
9 ? function (b) {
10 return (
11 Buffer.isBuffer(b)
12 //reserialize, to take into the account
13 //changes Buffer#toJSON between 0.10 and 0.12
14 ? JSON.parse(JSON.stringify(b))
15 : b
16 )
17 }
18 : function (b) { return b }
19
20 var client = {
21 hello : 'async',
22 goodbye: 'async',
23 stuff : 'source',
24 bstuff : 'source',
25 things : 'sink',
26 suchstreamwow: 'duplex'
27 }
28
29 tape('async', function (t) {
30
31 var A = mux(client, null, serializer) ()
32 var B = mux(null, client, serializer) ({
33 hello: function (a, cb) {
34 cb(null, 'hello, '+a)
35 },
36 goodbye: function(b, cb) {
37 cb(null, b)
38 }
39 })
40
41 var s = A.createStream()
42 pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s)
43
44 A.hello('world', function (err, value) {
45 if(err) throw err
46 console.log(value)
47 t.equal(value, 'hello, world')
48
49 var buf = new Buffer([0, 1, 2, 3, 4])
50 A.goodbye(buf, function (err, buf2) {
51 if (err) throw err
52 console.log(b(buf2), b(buf))
53 t.deepEqual(b(buf2), b(buf))
54 t.end()
55 })
56 })
57
58
59 })
60
61 tape('source', function (t) {
62
63 var expected = [
64 new Buffer([0, 1]),
65 new Buffer([2, 3]),
66 new Buffer([4, 5])
67 ]
68
69 var A = mux(client, null, serializer) ()
70 var B = mux(null, client, serializer) ({
71 stuff: function (b) {
72 return pull.values([1, 2, 3, 4, 5].map(function (a) {
73 return a * b
74 }))
75 },
76 bstuff: function() {
77 return pull.values(expected)
78 }
79 })
80
81 var s = A.createStream()
82 pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s)
83
84 pull(A.stuff(5), pull.collect(function (err, ary) {
85 if(err) throw err
86 console.log(ary)
87 t.deepEqual(ary, [5, 10, 15, 20, 25])
88
89 pull(A.bstuff(), pull.collect(function(err, ary) {
90 if (err) throw err
91 console.log(ary.map(b), expected.map(b))
92 t.deepEqual(ary.map(b), expected.map(b))
93 t.end()
94 }))
95 }))
96
97 })
98
99 tape('sync', function (t) {
100 var client = {
101 syncOk : 'sync',
102 syncErr: 'sync'
103 }
104
105 var A = mux(client, null, serializer) ()
106 var B = mux(null, client, serializer) ({
107 syncOk: function (a) {
108 return {okay: a}
109 },
110 syncErr: function(b) {
111 throw new Error('test error:'+b)
112 }
113 })
114
115 var s = A.createStream()
116 pull(s, B.createStream(), s)
117
118 A.syncOk(true, function (err, value) {
119 t.deepEqual(value, {okay: true})
120 A.syncErr('blah', function (err) {
121 t.equal(err.message, 'test error:blah')
122 t.end()
123
124 })
125 })
126
127 })
128
129 tape('sink', function (t) {
130
131 var A = mux(client, null, serializer) ()
132 var B = mux(null, client, serializer) ({
133 things: function (someParam) {
134 return pull.collect(function(err, values) {
135 if (err) throw err
136 t.equal(someParam, 5)
137 t.deepEqual(values, [1, 2, 3, 4, 5])
138 t.end()
139 })
140 }
141 })
142
143 var s = A.createStream()
144 pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s)
145 pull(pull.values([1,2,3,4,5]), A.things(5))
146 })
147
148 tape('duplex', function (t) {
149
150 var A = mux(client, null, serializer) ()
151 var B = mux(null, client, serializer) ({
152 suchstreamwow: function (someParam) {
153 // did the param come through?
154 t.equal(someParam, 5)
155
156 // normally, we'd use pull.values and pull.collect
157 // however, pull.values sends 'end' onto the stream, which closes the muxrpc stream immediately
158 // ...and we need the stream to stay open for the drain to collect
159 var nextValue = 0
160 var p = pushable()
161 for (var i=0; i < 5; i++)
162 p.push(i)
163 return {
164 source: p,
165 sink: pull.drain(function(value) {
166 console.log('************', nextValue, value)
167 t.equal(nextValue, value)
168 nextValue++
169 if (nextValue == 5)
170 t.end()
171 })
172 }
173 }
174 })
175
176 var s = A.createStream()
177 pull(s, pull.through(console.log.bind(console, 'IN')), B.createStream(), pull.through(console.log.bind(console,'OUT')), s)
178 var dup = A.suchstreamwow(5)
179 pull(dup, dup)
180 })
181
182 tape('async - error1', function (t) {
183 var A = mux(client, null) ()
184
185 var s = A.createStream()
186
187 A.hello('world', function (err, value) {
188 t.ok(err)
189 t.end()
190 })
191
192 s.sink(function (abort, cb) { cb(true) })
193 })
194
195 tape('async - error2', function (t) {
196 var A = mux(client, null) ()
197
198 var s = A.createStream()
199
200 A.hello('world', function (err, value) {
201 console.log('CB!')
202 t.ok(err)
203 t.end()
204 })
205
206 s.source(true, function () {})
207 })
208
209 tape('buffer calls before stream is created', function (t) {
210 var A = mux(client, null) ()
211 var B = mux(null, client) ({
212 hello: function (a, cb) {
213 cb(null, 'hello, '+a)
214 },
215 })
216
217 A.hello('world', function (err, value) {
218 if(err) throw err
219 console.log(value)
220 t.equal(value, 'hello, world')
221 t.end()
222 })
223
224 var s = A.createStream()
225 pull(s, B.createStream(), s)
226
227 })
228
229// tape('async - error, and reconnect', function (t) {
230// var A = mux(client, null) ()
231//
232// var s = A.createStream()
233//
234// A.hello('world', function (err, value) {
235// t.ok(err)
236//
237// var B = mux(null, client) ({
238// hello: function (a, cb) {
239// cb(null, 'hello, '+a)
240// },
241// })
242//
243// var s = A.createStream()
244//
245// pull(s, B.createStream(), s)
246//
247// A.hello('world', function (err, value) {
248// t.notOk(err)
249// t.equal(value, 'hello, world')
250// t.end()
251// })
252// })
253//
254// s.sink(function (abort, cb) { cb(true) })
255// })
256
257 tape('recover error written to outer stream', function (t) {
258
259 var A = mux(client, null) ()
260 var err = new Error('testing errors')
261 var s = A.createStream(function (_err) {
262 t.equal(_err, err)
263 t.end()
264 })
265
266 pull(pull.error(err), s.sink)
267
268 })
269
270 tape('recover error when outer stream aborted', function (t) {
271
272 var A = mux(client, null) ()
273 var err = new Error('testing errors')
274 var s = A.createStream(function (_err) {
275 t.equal(_err, err)
276 t.end()
277 })
278
279 s.source(err, function () {})
280 })
281
282 tape('cb when stream is ended', function (t) {
283
284 var A = mux(client, null) ()
285 var s = A.createStream(function (_err) {
286 t.equal(_err, null)
287 t.end()
288 })
289
290 pull(pull.empty(), s.sink)
291
292 })
293
294 tape('cb when stream is aborted', function (t) {
295
296 var A = mux(client, null) ()
297 var s = A.createStream(function (_err) {
298 t.equal(_err, null)
299 t.end()
300 })
301
302 s.source(true, function () {})
303 })
304
305 var client2 = {
306 salutations: {
307 hello: 'async',
308 goodbye: 'async'
309 }
310 }
311
312 tape('nested methods', function (t) {
313 var A = mux(client2, null, serializer) ()
314 var B = mux(null, client2, serializer) ({
315 salutations: {
316 hello: function (a, cb) {
317 cb(null, 'hello, '+a)
318 },
319 goodbye: function(b, cb) {
320 cb(null, b)
321 }
322 }
323 })
324
325 var s = A.createStream()
326 pull(s, pull.through(console.log), B.createStream(), pull.through(console.log), s)
327
328 A.salutations.hello('world', function (err, value) {
329 if(err) throw err
330 console.log(value)
331 t.equal(value, 'hello, world')
332
333 var buf = new Buffer([0, 1, 2, 3, 4])
334 A.salutations.goodbye(buf, function (err, buf2) {
335 if (err) throw err
336 t.deepEqual( b(buf2), b(buf))
337 t.end()
338 })
339 })
340
341 })
342
343 tape('sink', function (t) {
344 var A = mux(client, null, serializer)()
345 var B = mux(null, client, serializer)({
346 things: function (len) {
347 return pull.collect(function (err, ary) {
348 t.equal(ary.length, len)
349 })
350 }
351 })
352
353 var s = A.createStream(); pull(s, B.createStream(), s)
354
355 pull(pull.values([1,2,3]), A.things(3, function (err) {
356 if(err) throw err
357 t.end()
358 }))
359 })
360
361 tape('sink - abort', function (t) {
362 var err = new Error('test abort error')
363
364 var A = mux(client, null, serializer)()
365 var B = mux(null, client, serializer)({
366 things: function (len) {
367 return function (read) {
368 read(err, function () {})
369 }
370 }
371 })
372
373 var s = A.createStream(); pull(s, B.createStream(), s)
374
375 pull(pull.values([1,2,3]), A.things(3, function (_err) {
376 t.ok(_err)
377 t.equal(_err.message, err.message)
378 t.end()
379 }))
380
381 })
382
383}
384
385//see ./jsonb.js for tests with serialization.
386if(!module.parent)
387 module.exports(function (e) { return e });