UNPKG

7.12 kBJavaScriptView Raw
1'use strict';
2
3const {
4 Duplex: DuplexStream,
5 Readable: ReadableStream,
6 Writable: WritableStream,
7} = require('stream');
8
9const {
10 CHANNEL_EXTENDED_DATATYPE: { STDERR },
11} = require('./protocol/constants.js');
12const { bufferSlice } = require('./protocol/utils.js');
13
14const PACKET_SIZE = 32 * 1024;
15const MAX_WINDOW = 2 * 1024 * 1024;
16const WINDOW_THRESHOLD = MAX_WINDOW / 2;
17
18class ClientStderr extends ReadableStream {
19 constructor(channel, streamOpts) {
20 super(streamOpts);
21
22 this._channel = channel;
23 }
24 _read(n) {
25 if (this._channel._waitChanDrain) {
26 this._channel._waitChanDrain = false;
27 if (this._channel.incoming.window <= WINDOW_THRESHOLD)
28 windowAdjust(this._channel);
29 }
30 }
31}
32
33class ServerStderr extends WritableStream {
34 constructor(channel) {
35 super({ highWaterMark: MAX_WINDOW });
36
37 this._channel = channel;
38 }
39
40 _write(data, encoding, cb) {
41 const channel = this._channel;
42 const protocol = channel._client._protocol;
43 const outgoing = channel.outgoing;
44 const packetSize = outgoing.packetSize;
45 const id = outgoing.id;
46 let window = outgoing.window;
47 const len = data.length;
48 let p = 0;
49
50 if (outgoing.state !== 'open')
51 return;
52
53 while (len - p > 0 && window > 0) {
54 let sliceLen = len - p;
55 if (sliceLen > window)
56 sliceLen = window;
57 if (sliceLen > packetSize)
58 sliceLen = packetSize;
59
60 if (p === 0 && sliceLen === len)
61 protocol.channelExtData(id, data, STDERR);
62 else
63 protocol.channelExtData(id, bufferSlice(data, p, p + sliceLen), STDERR);
64
65 p += sliceLen;
66 window -= sliceLen;
67 }
68
69 outgoing.window = window;
70
71 if (len - p > 0) {
72 if (window === 0)
73 channel._waitWindow = true;
74 if (p > 0)
75 channel._chunkErr = bufferSlice(data, p, len);
76 else
77 channel._chunkErr = data;
78 channel._chunkcbErr = cb;
79 return;
80 }
81
82 cb();
83 }
84}
85
86class Channel extends DuplexStream {
87 constructor(client, info, opts) {
88 const streamOpts = {
89 highWaterMark: MAX_WINDOW,
90 allowHalfOpen: (!opts || (opts && opts.allowHalfOpen !== false)),
91 emitClose: false,
92 };
93 super(streamOpts);
94 this.allowHalfOpen = streamOpts.allowHalfOpen;
95
96 const server = !!(opts && opts.server);
97
98 this.server = server;
99 this.type = info.type;
100 this.subtype = undefined;
101
102 /*
103 incoming and outgoing contain these properties:
104 {
105 id: undefined,
106 window: undefined,
107 packetSize: undefined,
108 state: 'closed'
109 }
110 */
111 this.incoming = info.incoming;
112 this.outgoing = info.outgoing;
113 this._callbacks = [];
114
115 this._client = client;
116 this._hasX11 = false;
117 this._exit = {
118 code: undefined,
119 signal: undefined,
120 dump: undefined,
121 desc: undefined,
122 };
123
124 this.stdin = this.stdout = this;
125
126 if (server)
127 this.stderr = new ServerStderr(this);
128 else
129 this.stderr = new ClientStderr(this, streamOpts);
130
131 // Outgoing data
132 this._waitWindow = false; // SSH-level backpressure
133
134 // Incoming data
135 this._waitChanDrain = false; // Channel Readable side backpressure
136
137 this._chunk = undefined;
138 this._chunkcb = undefined;
139 this._chunkErr = undefined;
140 this._chunkcbErr = undefined;
141
142 this.on('finish', onFinish)
143 .on('prefinish', onFinish); // For node v0.11+
144
145 this.on('end', onEnd).on('close', onEnd);
146 }
147
148 _read(n) {
149 if (this._waitChanDrain) {
150 this._waitChanDrain = false;
151 if (this.incoming.window <= WINDOW_THRESHOLD)
152 windowAdjust(this);
153 }
154 }
155
156 _write(data, encoding, cb) {
157 const protocol = this._client._protocol;
158 const outgoing = this.outgoing;
159 const packetSize = outgoing.packetSize;
160 const id = outgoing.id;
161 let window = outgoing.window;
162 const len = data.length;
163 let p = 0;
164
165 if (outgoing.state !== 'open')
166 return;
167
168 while (len - p > 0 && window > 0) {
169 let sliceLen = len - p;
170 if (sliceLen > window)
171 sliceLen = window;
172 if (sliceLen > packetSize)
173 sliceLen = packetSize;
174
175 if (p === 0 && sliceLen === len)
176 protocol.channelData(id, data);
177 else
178 protocol.channelData(id, bufferSlice(data, p, p + sliceLen));
179
180 p += sliceLen;
181 window -= sliceLen;
182 }
183
184 outgoing.window = window;
185
186 if (len - p > 0) {
187 if (window === 0)
188 this._waitWindow = true;
189 if (p > 0)
190 this._chunk = bufferSlice(data, p, len);
191 else
192 this._chunk = data;
193 this._chunkcb = cb;
194 return;
195 }
196
197 cb();
198 }
199
200 eof() {
201 if (this.outgoing.state === 'open') {
202 this.outgoing.state = 'eof';
203 this._client._protocol.channelEOF(this.outgoing.id);
204 }
205 }
206
207 close() {
208 if (this.outgoing.state === 'open' || this.outgoing.state === 'eof') {
209 this.outgoing.state = 'closing';
210 this._client._protocol.channelClose(this.outgoing.id);
211 }
212 }
213
214 destroy() {
215 this.end();
216 this.close();
217 }
218
219 // Session type-specific methods =============================================
220 setWindow(rows, cols, height, width) {
221 if (this.server)
222 throw new Error('Client-only method called in server mode');
223
224 if (this.type === 'session'
225 && (this.subtype === 'shell' || this.subtype === 'exec')
226 && this.writable
227 && this.outgoing.state === 'open') {
228 this._client._protocol.windowChange(this.outgoing.id,
229 rows,
230 cols,
231 height,
232 width);
233 }
234 }
235
236 signal(signalName) {
237 if (this.server)
238 throw new Error('Client-only method called in server mode');
239
240 if (this.type === 'session'
241 && this.writable
242 && this.outgoing.state === 'open') {
243 this._client._protocol.signal(this.outgoing.id, signalName);
244 }
245 }
246
247 exit(statusOrSignal, coreDumped, msg) {
248 if (!this.server)
249 throw new Error('Server-only method called in client mode');
250
251 if (this.type === 'session'
252 && this.writable
253 && this.outgoing.state === 'open') {
254 if (typeof statusOrSignal === 'number') {
255 this._client._protocol.exitStatus(this.outgoing.id, statusOrSignal);
256 } else {
257 this._client._protocol.exitSignal(this.outgoing.id,
258 statusOrSignal,
259 coreDumped,
260 msg);
261 }
262 }
263 }
264
265}
266
267function onFinish() {
268 this.eof();
269 if (this.server || !this.allowHalfOpen)
270 this.close();
271 this.writable = false;
272}
273
274function onEnd() {
275 this.readable = false;
276}
277
278function windowAdjust(self) {
279 if (self.outgoing.state === 'closed')
280 return;
281 const amt = MAX_WINDOW - self.incoming.window;
282 if (amt <= 0)
283 return;
284 self.incoming.window += amt;
285 self._client._protocol.channelWindowAdjust(self.outgoing.id, amt);
286}
287
288module.exports = {
289 Channel,
290 MAX_WINDOW,
291 PACKET_SIZE,
292 windowAdjust,
293 WINDOW_THRESHOLD,
294};