UNPKG

1.67 kBJavaScriptView Raw
1"use strict";
2
3var stream = require("stream");
4
5function DuplexWrapper(options, writable, readable) {
6 if (typeof readable === "undefined") {
7 readable = writable;
8 writable = options;
9 options = null;
10 }
11
12 stream.Duplex.call(this, options);
13
14 if (typeof readable.read !== "function") {
15 readable = (new stream.Readable(options)).wrap(readable);
16 }
17
18 this._writable = writable;
19 this._readable = readable;
20 this._waiting = false;
21
22 var self = this;
23
24 writable.once("finish", function() {
25 self.end();
26 });
27
28 this.once("finish", function() {
29 writable.end();
30 });
31
32 readable.on("readable", function() {
33 if (self._waiting) {
34 self._waiting = false;
35 self._read();
36 }
37 });
38
39 readable.once("end", function() {
40 self.push(null);
41 });
42
43 if (!options || typeof options.bubbleErrors === "undefined" || options.bubbleErrors) {
44 writable.on("error", function(err) {
45 self.emit("error", err);
46 });
47
48 readable.on("error", function(err) {
49 self.emit("error", err);
50 });
51 }
52}
53
54DuplexWrapper.prototype = Object.create(stream.Duplex.prototype, {constructor: {value: DuplexWrapper}});
55
56DuplexWrapper.prototype._write = function _write(input, encoding, done) {
57 this._writable.write(input, encoding, done);
58};
59
60DuplexWrapper.prototype._read = function _read() {
61 var buf;
62 var reads = 0;
63 while ((buf = this._readable.read()) !== null) {
64 this.push(buf);
65 reads++;
66 }
67 if (reads === 0) {
68 this._waiting = true;
69 }
70};
71
72module.exports = function duplex2(options, writable, readable) {
73 return new DuplexWrapper(options, writable, readable);
74};
75
76module.exports.DuplexWrapper = DuplexWrapper;