1 | 'use strict';
|
2 |
|
3 | const { Duplex } = require('stream');
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 | function emitClose(stream) {
|
12 | stream.emit('close');
|
13 | }
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 | function duplexOnEnd() {
|
21 | if (!this.destroyed && this._writableState.finished) {
|
22 | this.destroy();
|
23 | }
|
24 | }
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 |
|
31 |
|
32 | function duplexOnError(err) {
|
33 | this.removeListener('error', duplexOnError);
|
34 | this.destroy();
|
35 | if (this.listenerCount('error') === 0) {
|
36 |
|
37 | this.emit('error', err);
|
38 | }
|
39 | }
|
40 |
|
41 |
|
42 |
|
43 |
|
44 |
|
45 |
|
46 |
|
47 |
|
48 |
|
49 | function createWebSocketStream(ws, options) {
|
50 | let terminateOnDestroy = true;
|
51 |
|
52 | const duplex = new Duplex({
|
53 | ...options,
|
54 | autoDestroy: false,
|
55 | emitClose: false,
|
56 | objectMode: false,
|
57 | writableObjectMode: false
|
58 | });
|
59 |
|
60 | ws.on('message', function message(msg, isBinary) {
|
61 | const data =
|
62 | !isBinary && duplex._readableState.objectMode ? msg.toString() : msg;
|
63 |
|
64 | if (!duplex.push(data)) ws.pause();
|
65 | });
|
66 |
|
67 | ws.once('error', function error(err) {
|
68 | if (duplex.destroyed) return;
|
69 |
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 |
|
78 |
|
79 | terminateOnDestroy = false;
|
80 | duplex.destroy(err);
|
81 | });
|
82 |
|
83 | ws.once('close', function close() {
|
84 | if (duplex.destroyed) return;
|
85 |
|
86 | duplex.push(null);
|
87 | });
|
88 |
|
89 | duplex._destroy = function (err, callback) {
|
90 | if (ws.readyState === ws.CLOSED) {
|
91 | callback(err);
|
92 | process.nextTick(emitClose, duplex);
|
93 | return;
|
94 | }
|
95 |
|
96 | let called = false;
|
97 |
|
98 | ws.once('error', function error(err) {
|
99 | called = true;
|
100 | callback(err);
|
101 | });
|
102 |
|
103 | ws.once('close', function close() {
|
104 | if (!called) callback(err);
|
105 | process.nextTick(emitClose, duplex);
|
106 | });
|
107 |
|
108 | if (terminateOnDestroy) ws.terminate();
|
109 | };
|
110 |
|
111 | duplex._final = function (callback) {
|
112 | if (ws.readyState === ws.CONNECTING) {
|
113 | ws.once('open', function open() {
|
114 | duplex._final(callback);
|
115 | });
|
116 | return;
|
117 | }
|
118 |
|
119 |
|
120 |
|
121 |
|
122 |
|
123 | if (ws._socket === null) return;
|
124 |
|
125 | if (ws._socket._writableState.finished) {
|
126 | callback();
|
127 | if (duplex._readableState.endEmitted) duplex.destroy();
|
128 | } else {
|
129 | ws._socket.once('finish', function finish() {
|
130 |
|
131 |
|
132 |
|
133 | callback();
|
134 | });
|
135 | ws.close();
|
136 | }
|
137 | };
|
138 |
|
139 | duplex._read = function () {
|
140 | if (ws.isPaused) ws.resume();
|
141 | };
|
142 |
|
143 | duplex._write = function (chunk, encoding, callback) {
|
144 | if (ws.readyState === ws.CONNECTING) {
|
145 | ws.once('open', function open() {
|
146 | duplex._write(chunk, encoding, callback);
|
147 | });
|
148 | return;
|
149 | }
|
150 |
|
151 | ws.send(chunk, callback);
|
152 | };
|
153 |
|
154 | duplex.on('end', duplexOnEnd);
|
155 | duplex.on('error', duplexOnError);
|
156 | return duplex;
|
157 | }
|
158 |
|
159 | module.exports = createWebSocketStream;
|