1 | 'use strict';
|
2 |
|
3 | const {
|
4 | Duplex: DuplexStream,
|
5 | Readable: ReadableStream,
|
6 | Writable: WritableStream,
|
7 | } = require('stream');
|
8 |
|
9 | const {
|
10 | CHANNEL_EXTENDED_DATATYPE: { STDERR },
|
11 | } = require('./protocol/constants.js');
|
12 | const { bufferSlice } = require('./protocol/utils.js');
|
13 |
|
14 | const PACKET_SIZE = 32 * 1024;
|
15 | const MAX_WINDOW = 2 * 1024 * 1024;
|
16 | const WINDOW_THRESHOLD = MAX_WINDOW / 2;
|
17 |
|
18 | class ClientStderr extends ReadableStream {
|
19 | constructor(channel, streamOpts) {
|
20 | super(streamOpts);
|
21 |
|
22 | this._channel = channel;
|
23 | }
|
24 | _read(n) {
|
25 | if (this._channel._waitChanDrain) {
|
26 | this._channel._waitChanDrain = false;
|
27 | if (this._channel.incoming.window <= WINDOW_THRESHOLD)
|
28 | windowAdjust(this._channel);
|
29 | }
|
30 | }
|
31 | }
|
32 |
|
33 | class ServerStderr extends WritableStream {
|
34 | constructor(channel) {
|
35 | super({ highWaterMark: MAX_WINDOW });
|
36 |
|
37 | this._channel = channel;
|
38 | }
|
39 |
|
40 | _write(data, encoding, cb) {
|
41 | const channel = this._channel;
|
42 | const protocol = channel._client._protocol;
|
43 | const outgoing = channel.outgoing;
|
44 | const packetSize = outgoing.packetSize;
|
45 | const id = outgoing.id;
|
46 | let window = outgoing.window;
|
47 | const len = data.length;
|
48 | let p = 0;
|
49 |
|
50 | if (outgoing.state !== 'open')
|
51 | return;
|
52 |
|
53 | while (len - p > 0 && window > 0) {
|
54 | let sliceLen = len - p;
|
55 | if (sliceLen > window)
|
56 | sliceLen = window;
|
57 | if (sliceLen > packetSize)
|
58 | sliceLen = packetSize;
|
59 |
|
60 | if (p === 0 && sliceLen === len)
|
61 | protocol.channelExtData(id, data, STDERR);
|
62 | else
|
63 | protocol.channelExtData(id, bufferSlice(data, p, p + sliceLen), STDERR);
|
64 |
|
65 | p += sliceLen;
|
66 | window -= sliceLen;
|
67 | }
|
68 |
|
69 | outgoing.window = window;
|
70 |
|
71 | if (len - p > 0) {
|
72 | if (window === 0)
|
73 | channel._waitWindow = true;
|
74 | if (p > 0)
|
75 | channel._chunkErr = bufferSlice(data, p, len);
|
76 | else
|
77 | channel._chunkErr = data;
|
78 | channel._chunkcbErr = cb;
|
79 | return;
|
80 | }
|
81 |
|
82 | cb();
|
83 | }
|
84 | }
|
85 |
|
86 | class Channel extends DuplexStream {
|
87 | constructor(client, info, opts) {
|
88 | const streamOpts = {
|
89 | highWaterMark: MAX_WINDOW,
|
90 | allowHalfOpen: (!opts || (opts && opts.allowHalfOpen !== false)),
|
91 | emitClose: false,
|
92 | };
|
93 | super(streamOpts);
|
94 | this.allowHalfOpen = streamOpts.allowHalfOpen;
|
95 |
|
96 | const server = !!(opts && opts.server);
|
97 |
|
98 | this.server = server;
|
99 | this.type = info.type;
|
100 | this.subtype = undefined;
|
101 |
|
102 | |
103 |
|
104 |
|
105 |
|
106 |
|
107 |
|
108 |
|
109 |
|
110 |
|
111 | this.incoming = info.incoming;
|
112 | this.outgoing = info.outgoing;
|
113 | this._callbacks = [];
|
114 |
|
115 | this._client = client;
|
116 | this._hasX11 = false;
|
117 | this._exit = {
|
118 | code: undefined,
|
119 | signal: undefined,
|
120 | dump: undefined,
|
121 | desc: undefined,
|
122 | };
|
123 |
|
124 | this.stdin = this.stdout = this;
|
125 |
|
126 | if (server)
|
127 | this.stderr = new ServerStderr(this);
|
128 | else
|
129 | this.stderr = new ClientStderr(this, streamOpts);
|
130 |
|
131 |
|
132 | this._waitWindow = false;
|
133 |
|
134 |
|
135 | this._waitChanDrain = false;
|
136 |
|
137 | this._chunk = undefined;
|
138 | this._chunkcb = undefined;
|
139 | this._chunkErr = undefined;
|
140 | this._chunkcbErr = undefined;
|
141 |
|
142 | this.on('finish', onFinish)
|
143 | .on('prefinish', onFinish);
|
144 |
|
145 | this.on('end', onEnd).on('close', onEnd);
|
146 | }
|
147 |
|
148 | _read(n) {
|
149 | if (this._waitChanDrain) {
|
150 | this._waitChanDrain = false;
|
151 | if (this.incoming.window <= WINDOW_THRESHOLD)
|
152 | windowAdjust(this);
|
153 | }
|
154 | }
|
155 |
|
156 | _write(data, encoding, cb) {
|
157 | const protocol = this._client._protocol;
|
158 | const outgoing = this.outgoing;
|
159 | const packetSize = outgoing.packetSize;
|
160 | const id = outgoing.id;
|
161 | let window = outgoing.window;
|
162 | const len = data.length;
|
163 | let p = 0;
|
164 |
|
165 | if (outgoing.state !== 'open')
|
166 | return;
|
167 |
|
168 | while (len - p > 0 && window > 0) {
|
169 | let sliceLen = len - p;
|
170 | if (sliceLen > window)
|
171 | sliceLen = window;
|
172 | if (sliceLen > packetSize)
|
173 | sliceLen = packetSize;
|
174 |
|
175 | if (p === 0 && sliceLen === len)
|
176 | protocol.channelData(id, data);
|
177 | else
|
178 | protocol.channelData(id, bufferSlice(data, p, p + sliceLen));
|
179 |
|
180 | p += sliceLen;
|
181 | window -= sliceLen;
|
182 | }
|
183 |
|
184 | outgoing.window = window;
|
185 |
|
186 | if (len - p > 0) {
|
187 | if (window === 0)
|
188 | this._waitWindow = true;
|
189 | if (p > 0)
|
190 | this._chunk = bufferSlice(data, p, len);
|
191 | else
|
192 | this._chunk = data;
|
193 | this._chunkcb = cb;
|
194 | return;
|
195 | }
|
196 |
|
197 | cb();
|
198 | }
|
199 |
|
200 | eof() {
|
201 | if (this.outgoing.state === 'open') {
|
202 | this.outgoing.state = 'eof';
|
203 | this._client._protocol.channelEOF(this.outgoing.id);
|
204 | }
|
205 | }
|
206 |
|
207 | close() {
|
208 | if (this.outgoing.state === 'open' || this.outgoing.state === 'eof') {
|
209 | this.outgoing.state = 'closing';
|
210 | this._client._protocol.channelClose(this.outgoing.id);
|
211 | }
|
212 | }
|
213 |
|
214 | destroy() {
|
215 | this.end();
|
216 | this.close();
|
217 | }
|
218 |
|
219 |
|
220 | setWindow(rows, cols, height, width) {
|
221 | if (this.server)
|
222 | throw new Error('Client-only method called in server mode');
|
223 |
|
224 | if (this.type === 'session'
|
225 | && (this.subtype === 'shell' || this.subtype === 'exec')
|
226 | && this.writable
|
227 | && this.outgoing.state === 'open') {
|
228 | this._client._protocol.windowChange(this.outgoing.id,
|
229 | rows,
|
230 | cols,
|
231 | height,
|
232 | width);
|
233 | }
|
234 | }
|
235 |
|
236 | signal(signalName) {
|
237 | if (this.server)
|
238 | throw new Error('Client-only method called in server mode');
|
239 |
|
240 | if (this.type === 'session'
|
241 | && this.writable
|
242 | && this.outgoing.state === 'open') {
|
243 | this._client._protocol.signal(this.outgoing.id, signalName);
|
244 | }
|
245 | }
|
246 |
|
247 | exit(statusOrSignal, coreDumped, msg) {
|
248 | if (!this.server)
|
249 | throw new Error('Server-only method called in client mode');
|
250 |
|
251 | if (this.type === 'session'
|
252 | && this.writable
|
253 | && this.outgoing.state === 'open') {
|
254 | if (typeof statusOrSignal === 'number') {
|
255 | this._client._protocol.exitStatus(this.outgoing.id, statusOrSignal);
|
256 | } else {
|
257 | this._client._protocol.exitSignal(this.outgoing.id,
|
258 | statusOrSignal,
|
259 | coreDumped,
|
260 | msg);
|
261 | }
|
262 | }
|
263 | }
|
264 |
|
265 | }
|
266 |
|
267 | function onFinish() {
|
268 | this.eof();
|
269 | if (this.server || !this.allowHalfOpen)
|
270 | this.close();
|
271 | this.writable = false;
|
272 | }
|
273 |
|
274 | function onEnd() {
|
275 | this.readable = false;
|
276 | }
|
277 |
|
278 | function windowAdjust(self) {
|
279 | if (self.outgoing.state === 'closed')
|
280 | return;
|
281 | const amt = MAX_WINDOW - self.incoming.window;
|
282 | if (amt <= 0)
|
283 | return;
|
284 | self.incoming.window += amt;
|
285 | self._client._protocol.channelWindowAdjust(self.outgoing.id, amt);
|
286 | }
|
287 |
|
288 | module.exports = {
|
289 | Channel,
|
290 | MAX_WINDOW,
|
291 | PACKET_SIZE,
|
292 | windowAdjust,
|
293 | WINDOW_THRESHOLD,
|
294 | };
|