1 |
|
2 |
|
3 | var tape = require('tape')
|
4 | var pull = require('../')
|
5 |
|
6 |
|
7 | function hang (values, onAbort) {
|
8 | var i = 0
|
9 | return function (abort, cb) {
|
10 | if(i < values.length)
|
11 | cb(null, values[i++])
|
12 | else if(!abort)
|
13 | _cb = cb
|
14 | else {
|
15 | _cb(abort)
|
16 | cb(abort)
|
17 | onAbort && onAbort()
|
18 | }
|
19 | }
|
20 | }
|
21 |
|
22 | function abortable () {
|
23 | var _read, aborted
|
24 | function reader (read) {
|
25 | _read = read
|
26 | return function (abort, cb) {
|
27 | if(abort) aborted = abort
|
28 | read(abort, cb)
|
29 | }
|
30 | }
|
31 |
|
32 | reader.abort = function (cb) {
|
33 | cb = cb || function (err) {
|
34 | if(err && err !== true) throw err
|
35 | }
|
36 | if(aborted)
|
37 | cb(aborted)
|
38 | else _read(true, cb)
|
39 | }
|
40 |
|
41 | return reader
|
42 | }
|
43 |
|
44 | function test (name, trx) {
|
45 | tape('test abort:'+name, function (t) {
|
46 | var a = abortable()
|
47 |
|
48 | pull(
|
49 | hang([1,2,3], function () {
|
50 | t.end()
|
51 | }),
|
52 | trx,
|
53 | a,
|
54 | pull.drain(function (e) {
|
55 | if(e === 3)
|
56 | setImmediate(function () {
|
57 | a.abort()
|
58 | })
|
59 | }, function (err) {
|
60 | })
|
61 | )
|
62 | })
|
63 | }
|
64 |
|
65 | test('through', pull.through())
|
66 | test('map', pull.map(function (e) { return e }))
|
67 | test('take', pull.take(Boolean))
|
68 |
|