1 | 'use strict'
|
2 | var pull = require('pull-stream')
|
3 |
|
4 |
|
5 | function once (fn) {
|
6 | var done = false
|
7 | return function (err, val) {
|
8 | if(done) return
|
9 | done = true
|
10 | fn(err, val)
|
11 | }
|
12 | }
|
13 |
|
14 | module.exports = function (weird, _done) {
|
15 | var buffer = [], ended = false, waiting, abort
|
16 |
|
17 | var done = once(function (err, v) {
|
18 | _done && _done(err, v)
|
19 |
|
20 | weird = null
|
21 | _done = null
|
22 | waiting = null
|
23 |
|
24 | if(abort) abort(err || true, function () {})
|
25 | })
|
26 |
|
27 | weird.read = function (data, end) {
|
28 | ended = ended || end
|
29 |
|
30 | if(waiting) {
|
31 | var cb = waiting
|
32 | waiting = null
|
33 | cb(ended, data)
|
34 | }
|
35 | else if(!ended) buffer.push(data)
|
36 |
|
37 | if(ended) done(ended !== true ? ended : null)
|
38 | }
|
39 |
|
40 | return {
|
41 | source: function (abort, cb) {
|
42 | if(abort) {
|
43 | weird && weird.write(null, abort)
|
44 | cb(abort); done(abort !== true ? abort : null)
|
45 | }
|
46 | else if(buffer.length) cb(null, buffer.shift())
|
47 | else if(ended) cb(ended)
|
48 | else waiting = cb
|
49 | },
|
50 | sink : function (read) {
|
51 | if(ended) return read(ended, function () {}), abort = null
|
52 | abort = read
|
53 | pull.drain(function (data) {
|
54 |
|
55 | if(ended) return false
|
56 | weird.write(data)
|
57 | }, function (err) {
|
58 | if(weird && !weird.writeEnd) weird.write(null, err || true)
|
59 | done && done(err)
|
60 | })
|
61 | (read)
|
62 | }
|
63 | }
|
64 | }
|
65 |
|
66 | function uniplex (s, done) {
|
67 | return module.exports(s, function (err) {
|
68 | if(!s.writeEnd) s.write(null, err || true)
|
69 | if(done) done(err)
|
70 | })
|
71 | }
|
72 |
|
73 | module.exports.source = function (s) {
|
74 | return uniplex(s).source
|
75 | }
|
76 | module.exports.sink = function (s, done) {
|
77 | return uniplex(s, done).sink
|
78 | }
|
79 |
|
80 | module.exports.duplex = module.exports
|