1 |
|
2 | var mux = require('../')
|
3 | var tape = require('tape')
|
4 | var pull = require('pull-stream')
|
5 | var Abortable = require('pull-abortable')
|
6 |
|
7 | function id (e) { return e }
|
8 |
|
9 | function abortStream(onAbort, onAborted) {
|
10 | return function (read) {
|
11 | return function (abort, cb) {
|
12 | if(abort && onAbort) onAbort(abort)
|
13 | read(abort, function (end, data) {
|
14 | if(end && onAborted) onAborted(end)
|
15 | cb(end, data)
|
16 | })
|
17 | }
|
18 | }
|
19 | }
|
20 |
|
21 | module.exports = function (serializer) {
|
22 | tape('stream abort', function (t) {
|
23 | t.plan(2)
|
24 |
|
25 | var client = {
|
26 | drainAbort: 'sink'
|
27 | }
|
28 |
|
29 | var A = mux(client, null, serializer) ()
|
30 | var B = mux(null, client, serializer) ({
|
31 | drainAbort: function (n) {
|
32 | return pull(
|
33 | pull.take(3),
|
34 | pull.through(console.log),
|
35 | pull.collect(function (err, ary) {
|
36 | console.log(ary)
|
37 | t.deepEqual(ary, [1, 2, 3])
|
38 | })
|
39 | )
|
40 | }
|
41 | })
|
42 |
|
43 | var as = A.createStream()
|
44 | var bs = B.createStream()
|
45 |
|
46 | pull(as, bs, as)
|
47 |
|
48 | var sent = []
|
49 |
|
50 | pull(
|
51 | pull.values([1,2,3,4,5,6,7,8,9,10], function (abort) {
|
52 | t.ok(sent.length < 10, 'sent is correct')
|
53 | t.end()
|
54 | }),
|
55 | pull.asyncMap(function (data, cb) {
|
56 | setImmediate(function () {
|
57 | cb(null, data)
|
58 | })
|
59 | }),
|
60 | pull.through(sent.push.bind(sent)),
|
61 | A.drainAbort(3)
|
62 | )
|
63 |
|
64 | })
|
65 | }
|
66 |
|
67 | module.exports = function (serializer) {
|
68 | tape('stream abort', function (t) {
|
69 | t.plan(2)
|
70 | var abortable = Abortable()
|
71 | var client = {
|
72 | drainAbort: 'sink'
|
73 | }
|
74 |
|
75 | var A = mux(client, null, serializer) ()
|
76 | var B = mux(null, client, serializer) ({
|
77 | drainAbort: function (n) {
|
78 | return pull(
|
79 | pull.through(function () {
|
80 | if(--n) return
|
81 | abortable.abort()
|
82 | }),
|
83 | pull.collect(function (err, ary) {
|
84 | console.log(ary)
|
85 | t.deepEqual(ary, [1, 2, 3])
|
86 | })
|
87 | )
|
88 | }
|
89 | })
|
90 |
|
91 | var as = A.createStream()
|
92 | var bs = B.createStream()
|
93 |
|
94 | pull(as, abortable, bs, as)
|
95 |
|
96 | var sent = []
|
97 |
|
98 | pull(
|
99 | pull.values([1,2,3,4,5,6,7,8,9,10], function (abort) {
|
100 | console.log(abort)
|
101 | t.ok(sent.length < 10, 'sent is correct')
|
102 | t.end()
|
103 | }),
|
104 | pull.asyncMap(function (data, cb) {
|
105 | setImmediate(function () {
|
106 | cb(null, data)
|
107 | })
|
108 | }),
|
109 | pull.through(sent.push.bind(sent)),
|
110 | A.drainAbort(3)
|
111 | )
|
112 |
|
113 | })
|
114 | }
|
115 |
|
116 |
|
117 | module.exports(id)
|