1 | "use strict";
|
2 |
|
3 | var stream = require("stream");
|
4 |
|
5 | function DuplexWrapper(options, writable, readable) {
|
6 | if (typeof readable === "undefined") {
|
7 | readable = writable;
|
8 | writable = options;
|
9 | options = null;
|
10 | }
|
11 |
|
12 | stream.Duplex.call(this, options);
|
13 |
|
14 | if (typeof readable.read !== "function") {
|
15 | readable = (new stream.Readable(options)).wrap(readable);
|
16 | }
|
17 |
|
18 | this._writable = writable;
|
19 | this._readable = readable;
|
20 | this._waiting = false;
|
21 |
|
22 | var self = this;
|
23 |
|
24 | writable.once("finish", function() {
|
25 | self.end();
|
26 | });
|
27 |
|
28 | this.once("finish", function() {
|
29 | writable.end();
|
30 | });
|
31 |
|
32 | readable.on("readable", function() {
|
33 | if (self._waiting) {
|
34 | self._waiting = false;
|
35 | self._read();
|
36 | }
|
37 | });
|
38 |
|
39 | readable.once("end", function() {
|
40 | self.push(null);
|
41 | });
|
42 |
|
43 | if (!options || typeof options.bubbleErrors === "undefined" || options.bubbleErrors) {
|
44 | writable.on("error", function(err) {
|
45 | self.emit("error", err);
|
46 | });
|
47 |
|
48 | readable.on("error", function(err) {
|
49 | self.emit("error", err);
|
50 | });
|
51 | }
|
52 | }
|
53 |
|
54 | DuplexWrapper.prototype = Object.create(stream.Duplex.prototype, {constructor: {value: DuplexWrapper}});
|
55 |
|
56 | DuplexWrapper.prototype._write = function _write(input, encoding, done) {
|
57 | this._writable.write(input, encoding, done);
|
58 | };
|
59 |
|
60 | DuplexWrapper.prototype._read = function _read() {
|
61 | var buf;
|
62 | var reads = 0;
|
63 | while ((buf = this._readable.read()) !== null) {
|
64 | this.push(buf);
|
65 | reads++;
|
66 | }
|
67 | if (reads === 0) {
|
68 | this._waiting = true;
|
69 | }
|
70 | };
|
71 |
|
72 | module.exports = function duplex2(options, writable, readable) {
|
73 | return new DuplexWrapper(options, writable, readable);
|
74 | };
|
75 |
|
76 | module.exports.DuplexWrapper = DuplexWrapper;
|