UNPKG

1.82 kBJavaScriptView Raw
1'use strict'
2var pull = require('pull-stream')
3// wrap pull streams around packet-stream's weird streams.
4
5function once (fn) {
6 var done = false
7 return function (err, val) {
8 if(done) return
9 done = true
10 fn(err, val)
11 }
12}
13
14module.exports = function (weird, _done) {
15 var buffer = [], ended = false, waiting, abort
16
17 var done = once(function (err, v) {
18 _done && _done(err, v)
19 // deallocate
20 weird = null
21 _done = null
22 waiting = null
23
24 if(abort) abort(err || true, function () {})
25 })
26
27 weird.read = function (data, end) {
28 ended = ended || end
29
30 if(waiting) {
31 var cb = waiting
32 waiting = null
33 cb(ended, data)
34 }
35 else if(!ended) buffer.push(data)
36
37 if(ended) done(ended !== true ? ended : null)
38 }
39
40 return {
41 source: function (abort, cb) {
42 if(abort) {
43 weird && weird.write(null, abort)
44 cb(abort); done(abort !== true ? abort : null)
45 }
46 else if(buffer.length) cb(null, buffer.shift())
47 else if(ended) cb(ended)
48 else waiting = cb
49 },
50 sink : function (read) {
51 if(ended) return read(ended, function () {}), abort = null
52 abort = read
53 pull.drain(function (data) {
54 //TODO: make this should only happen on a UNIPLEX stream.
55 if(ended) return false
56 weird.write(data)
57 }, function (err) {
58 if(weird && !weird.writeEnd) weird.write(null, err || true)
59 done && done(err)
60 })
61 (read)
62 }
63 }
64}
65
66function uniplex (s, done) {
67 return module.exports(s, function (err) {
68 if(!s.writeEnd) s.write(null, err || true)
69 if(done) done(err)
70 })
71}
72
73module.exports.source = function (s) {
74 return uniplex(s).source
75}
76module.exports.sink = function (s, done) {
77 return uniplex(s, done).sink
78}
79
80module.exports.duplex = module.exports