UNPKG

2.48 kBJavaScriptView Raw
1
2var mux = require('../')
3var tape = require('tape')
4var pull = require('pull-stream')
5var Abortable = require('pull-abortable')
6
7function id (e) { return e }
8
9function abortStream(onAbort, onAborted) {
10 return function (read) {
11 return function (abort, cb) {
12 if(abort && onAbort) onAbort(abort)
13 read(abort, function (end, data) {
14 if(end && onAborted) onAborted(end)
15 cb(end, data)
16 })
17 }
18 }
19}
20
21module.exports = function (serializer) {
22 tape('stream abort', function (t) {
23 t.plan(2)
24
25 var client = {
26 drainAbort: 'sink'
27 }
28
29 var A = mux(client, null, serializer) ()
30 var B = mux(null, client, serializer) ({
31 drainAbort: function (n) {
32 return pull(
33 pull.take(3),
34 pull.through(console.log),
35 pull.collect(function (err, ary) {
36 console.log(ary)
37 t.deepEqual(ary, [1, 2, 3])
38 })
39 )
40 }
41 })
42
43 var as = A.createStream()
44 var bs = B.createStream()
45
46 pull(as, bs, as)
47
48 var sent = []
49
50 pull(
51 pull.values([1,2,3,4,5,6,7,8,9,10], function (abort) {
52 t.ok(sent.length < 10, 'sent is correct')
53 t.end()
54 }),
55 pull.asyncMap(function (data, cb) {
56 setImmediate(function () {
57 cb(null, data)
58 })
59 }),
60 pull.through(sent.push.bind(sent)),
61 A.drainAbort(3)
62 )
63
64 })
65}
66
67module.exports = function (serializer) {
68 tape('stream abort', function (t) {
69 t.plan(2)
70 var abortable = Abortable()
71 var client = {
72 drainAbort: 'sink'
73 }
74
75 var A = mux(client, null, serializer) ()
76 var B = mux(null, client, serializer) ({
77 drainAbort: function (n) {
78 return pull(
79 pull.through(function () {
80 if(--n) return
81 abortable.abort()
82 }),
83 pull.collect(function (err, ary) {
84 console.log(ary)
85 t.deepEqual(ary, [1, 2, 3])
86 })
87 )
88 }
89 })
90
91 var as = A.createStream()
92 var bs = B.createStream()
93
94 pull(as, abortable, bs, as)
95
96 var sent = []
97
98 pull(
99 pull.values([1,2,3,4,5,6,7,8,9,10], function (abort) {
100 console.log(abort)
101 t.ok(sent.length < 10, 'sent is correct')
102 t.end()
103 }),
104 pull.asyncMap(function (data, cb) {
105 setImmediate(function () {
106 cb(null, data)
107 })
108 }),
109 pull.through(sent.push.bind(sent)),
110 A.drainAbort(3)
111 )
112
113 })
114}
115
116
117module.exports(id)