UNPKG

13.3 kBJavaScriptView Raw
1var inherits = require('util').inherits;
2var DuplexStream = require('stream').Duplex;
3var ReadableStream = require('stream').Readable;
4var WritableStream = require('stream').Writable;
5
6var STDERR = require('ssh2-streams').constants.CHANNEL_EXTENDED_DATATYPE.STDERR;
7
8var PACKET_SIZE = 32 * 1024;
9var MAX_WINDOW = 1 * 1024 * 1024;
10var WINDOW_THRESHOLD = MAX_WINDOW / 2;
11var CUSTOM_EVENTS = [
12 'CHANNEL_EOF',
13 'CHANNEL_CLOSE',
14 'CHANNEL_DATA',
15 'CHANNEL_EXTENDED_DATA',
16 'CHANNEL_WINDOW_ADJUST',
17 'CHANNEL_SUCCESS',
18 'CHANNEL_FAILURE',
19 'CHANNEL_REQUEST'
20];
21var CUSTOM_EVENTS_LEN = CUSTOM_EVENTS.length;
22
23function Channel(info, client, opts) {
24 var streamOpts = {
25 highWaterMark: MAX_WINDOW,
26 allowHalfOpen: (!opts || (opts && opts.allowHalfOpen !== false))
27 };
28
29 this.allowHalfOpen = streamOpts.allowHalfOpen;
30
31 DuplexStream.call(this, streamOpts);
32
33 var self = this;
34 var server = opts && opts.server;
35
36 this.server = server;
37 this.type = info.type;
38 this.subtype = undefined;
39 /*
40 incoming and outgoing contain these properties:
41 {
42 id: undefined,
43 window: undefined,
44 packetSize: undefined,
45 state: 'closed'
46 }
47 */
48 var incoming = this.incoming = info.incoming;
49 var incomingId = incoming.id;
50 var outgoing = this.outgoing = info.outgoing;
51 var callbacks = this._callbacks = [];
52 var exitCode;
53 var exitSignal;
54 var exitDump;
55 var exitDesc;
56 var exitLang;
57
58 this._client = client;
59 this._hasX11 = false;
60
61 var channels = client._channels;
62 var sshstream = client._sshstream;
63
64 function ondrain() {
65 if (self._waitClientDrain) {
66 self._waitClientDrain = false;
67 if (!self._waitWindow) {
68 if (self._chunk)
69 self._write(self._chunk, null, self._chunkcb);
70 else if (self._chunkcb)
71 self._chunkcb();
72 else if (self._chunkErr)
73 self.stderr._write(self._chunkErr, null, self._chunkcbErr);
74 else if (self._chunkcbErr)
75 self._chunkcbErr();
76 }
77 }
78 }
79 client._sock.on('drain', ondrain);
80
81 sshstream.once('CHANNEL_EOF:' + incomingId, function() {
82 if (incoming.state !== 'open')
83 return;
84 incoming.state = 'eof';
85
86 if (self.readable)
87 self.push(null);
88 if (!server && self.stderr.readable)
89 self.stderr.push(null);
90 }).once('CHANNEL_CLOSE:' + incomingId, function() {
91 if (incoming.state === 'closed')
92 return;
93 incoming.state = 'closed';
94
95 if (self.readable)
96 self.push(null);
97 if (server && self.stderr.writable)
98 self.stderr.end();
99 else if (!server && self.stderr.readable)
100 self.stderr.push(null);
101
102 if (outgoing.state === 'open' || outgoing.state === 'eof')
103 self.close();
104 if (outgoing.state === 'closing')
105 outgoing.state = 'closed';
106
107 delete channels[incomingId];
108
109 var state = self._writableState;
110 client._sock.removeListener('drain', ondrain);
111 if (!state.ending && !state.finished)
112 self.end();
113
114 // Take care of any outstanding channel requests
115 self._callbacks = [];
116 for (var i = 0; i < callbacks.length; ++i)
117 callbacks[i](true);
118 callbacks = self._callbacks;
119
120 if (!server) {
121 // align more with node child processes, where the close event gets the
122 // same arguments as the exit event
123 if (!self.readable) {
124 if (exitCode === null) {
125 self.emit('close', exitCode, exitSignal, exitDump, exitDesc,
126 exitLang);
127 } else
128 self.emit('close', exitCode);
129 } else {
130 self.once('end', function() {
131 if (exitCode === null) {
132 self.emit('close', exitCode, exitSignal, exitDump, exitDesc,
133 exitLang);
134 } else
135 self.emit('close', exitCode);
136 });
137 }
138
139 if (!self.stderr.readable)
140 self.stderr.emit('close');
141 else {
142 self.stderr.once('end', function() {
143 self.stderr.emit('close');
144 });
145 }
146 } else { // Server mode
147 if (!self.readable)
148 self.emit('close');
149 else {
150 self.once('end', function() {
151 self.emit('close');
152 });
153 }
154 }
155
156 for (var i = 0; i < CUSTOM_EVENTS_LEN; ++i)
157 sshstream.removeAllListeners(CUSTOM_EVENTS[i] + ':' + incomingId);
158 }).on('CHANNEL_DATA:' + incomingId, function(data) {
159 // the remote party should not be sending us data if there is no window
160 // space available ...
161 // TODO: raise error on data with not enough window
162 if (incoming.window === 0)
163 return;
164
165 incoming.window -= data.length;
166
167 if (!self.push(data)) {
168 self._waitChanDrain = true;
169 return;
170 }
171
172 if (incoming.window <= WINDOW_THRESHOLD)
173 windowAdjust(self);
174 }).on('CHANNEL_WINDOW_ADJUST:' + incomingId, function(amt) {
175 // the server is allowing us to send `amt` more bytes of data
176 outgoing.window += amt;
177
178 if (self._waitWindow) {
179 self._waitWindow = false;
180 if (!self._waitClientDrain) {
181 if (self._chunk)
182 self._write(self._chunk, null, self._chunkcb);
183 else if (self._chunkcb)
184 self._chunkcb();
185 else if (self._chunkErr)
186 self.stderr._write(self._chunkErr, null, self._chunkcbErr);
187 else if (self._chunkcbErr)
188 self._chunkcbErr();
189 }
190 }
191 }).on('CHANNEL_SUCCESS:' + incomingId, function() {
192 if (server) {
193 sshstream._kalast = Date.now();
194 sshstream._kacnt = 0;
195 } else
196 client._resetKA();
197 if (callbacks.length)
198 callbacks.shift()(false);
199 }).on('CHANNEL_FAILURE:' + incomingId, function() {
200 if (server) {
201 sshstream._kalast = Date.now();
202 sshstream._kacnt = 0;
203 } else
204 client._resetKA();
205 if (callbacks.length)
206 callbacks.shift()(true);
207 }).on('CHANNEL_REQUEST:' + incomingId, function(info) {
208 if (!server) {
209 if (info.request === 'exit-status') {
210 self.emit('exit', exitCode = info.code);
211 return;
212 } else if (info.request === 'exit-signal') {
213 self.emit('exit',
214 exitCode = null,
215 exitSignal = 'SIG' + info.signal,
216 exitDump = info.coredump,
217 exitDesc = info.description,
218 exitLang = info.lang);
219 return;
220 }
221 }
222
223 // keepalive request? OpenSSH will send one as a channel request if there
224 // is a channel open
225
226 if (info.wantReply)
227 sshstream.channelFailure(outgoing.id);
228 });
229
230 this.stdin = this.stdout = this;
231
232 if (server)
233 this.stderr = new ServerStderr(this);
234 else {
235 this.stderr = new ReadableStream(streamOpts);
236 this.stderr._read = function(n) {
237 if (self._waitChanDrain) {
238 self._waitChanDrain = false;
239 if (incoming.window <= WINDOW_THRESHOLD)
240 windowAdjust(self);
241 }
242 };
243
244 sshstream.on('CHANNEL_EXTENDED_DATA:' + incomingId,
245 function(type, data) {
246 // the remote party should not be sending us data if there is no window
247 // space available ...
248 // TODO: raise error on data with not enough window
249 if (incoming.window === 0)
250 return;
251
252 incoming.window -= data.length;
253
254 if (!self.stderr.push(data)) {
255 self._waitChanDrain = true;
256 return;
257 }
258
259 if (incoming.window <= WINDOW_THRESHOLD)
260 windowAdjust(self);
261 }
262 );
263 }
264
265 // outgoing data
266 this._waitClientDrain = false; // Client stream-level backpressure
267 this._waitWindow = false; // SSH-level backpressure
268
269 // incoming data
270 this._waitChanDrain = false; // Channel Readable side backpressure
271
272 this._chunk = undefined;
273 this._chunkcb = undefined;
274 this._chunkErr = undefined;
275 this._chunkcbErr = undefined;
276
277 function onFinish() {
278 self.eof();
279 if (server || (!server && !self.allowHalfOpen))
280 self.close();
281 self.writable = false;
282 }
283 this.on('finish', onFinish)
284 .on('prefinish', onFinish); // for node v0.11+
285 function onEnd() {
286 self.readable = false;
287 }
288 this.on('end', onEnd)
289 .on('close', onEnd);
290}
291inherits(Channel, DuplexStream);
292
293Channel.prototype.eof = function() {
294 var ret = true;
295 var outgoing = this.outgoing;
296
297 if (outgoing.state === 'open') {
298 outgoing.state = 'eof';
299 ret = this._client._sshstream.channelEOF(outgoing.id);
300 }
301
302 return ret;
303};
304
305Channel.prototype.close = function() {
306 var ret = true;
307 var outgoing = this.outgoing;
308
309 if (outgoing.state === 'open' || outgoing.state === 'eof') {
310 outgoing.state = 'closing';
311 ret = this._client._sshstream.channelClose(outgoing.id);
312 }
313
314 return ret;
315};
316
317Channel.prototype._read = function(n) {
318 if (this._waitChanDrain) {
319 this._waitChanDrain = false;
320 if (this.incoming.window <= WINDOW_THRESHOLD)
321 windowAdjust(this);
322 }
323};
324
325Channel.prototype._write = function(data, encoding, cb) {
326 var sshstream = this._client._sshstream;
327 var outgoing = this.outgoing;
328 var packetSize = outgoing.packetSize;
329 var id = outgoing.id;
330 var window = outgoing.window;
331 var len = data.length;
332 var p = 0;
333 var ret;
334 var buf;
335 var sliceLen;
336
337 if (outgoing.state !== 'open')
338 return;
339
340 while (len - p > 0 && window > 0) {
341 sliceLen = len - p;
342 if (sliceLen > window)
343 sliceLen = window;
344 if (sliceLen > packetSize)
345 sliceLen = packetSize;
346
347 ret = sshstream.channelData(id, data.slice(p, p + sliceLen));
348
349 p += sliceLen;
350 window -= sliceLen;
351
352 if (!ret) {
353 this._waitClientDrain = true;
354 this._chunk = undefined;
355 this._chunkcb = cb;
356 break;
357 }
358 }
359
360 outgoing.window = window;
361
362 if (len - p > 0) {
363 if (window === 0)
364 this._waitWindow = true;
365 if (p > 0) {
366 // partial
367 buf = Buffer.allocUnsafe(len - p);
368 data.copy(buf, 0, p);
369 this._chunk = buf;
370 } else
371 this._chunk = data;
372 this._chunkcb = cb;
373 return;
374 }
375
376 if (!this._waitClientDrain)
377 cb();
378};
379
380Channel.prototype.destroy = function() {
381 this.end();
382};
383
384// session type-specific methods
385Channel.prototype.setWindow = function(rows, cols, height, width) {
386 if (this.server)
387 throw new Error('Client-only method called in server mode');
388
389 if (this.type === 'session'
390 && (this.subtype === 'shell' || this.subtype === 'exec')
391 && this.writable
392 && this.outgoing.state === 'open') {
393 return this._client._sshstream.windowChange(this.outgoing.id,
394 rows,
395 cols,
396 height,
397 width);
398 }
399
400 return true;
401};
402Channel.prototype.signal = function(signalName) {
403 if (this.server)
404 throw new Error('Client-only method called in server mode');
405
406 if (this.type === 'session'
407 && this.writable
408 && this.outgoing.state === 'open')
409 return this._client._sshstream.signal(this.outgoing.id, signalName);
410
411 return true;
412};
413Channel.prototype.exit = function(name, coreDumped, msg) {
414 if (!this.server)
415 throw new Error('Server-only method called in client mode');
416
417 if (this.type === 'session'
418 && this.writable
419 && this.outgoing.state === 'open') {
420 if (typeof name === 'number')
421 return this._client._sshstream.exitStatus(this.outgoing.id, name);
422 else {
423 return this._client._sshstream.exitSignal(this.outgoing.id,
424 name,
425 coreDumped,
426 msg);
427 }
428 }
429
430 return true;
431};
432
433Channel.MAX_WINDOW = MAX_WINDOW;
434Channel.PACKET_SIZE = PACKET_SIZE;
435
436function windowAdjust(self) {
437 if (self.outgoing.state === 'closed')
438 return true;
439 var amt = MAX_WINDOW - self.incoming.window;
440 if (amt <= 0)
441 return true;
442 self.incoming.window += amt;
443 return self._client._sshstream.channelWindowAdjust(self.outgoing.id, amt);
444}
445
446function ServerStderr(channel) {
447 WritableStream.call(this, { highWaterMark: MAX_WINDOW });
448 this._channel = channel;
449}
450inherits(ServerStderr, WritableStream);
451
452ServerStderr.prototype._write = function(data, encoding, cb) {
453 var channel = this._channel;
454 var sshstream = channel._client._sshstream;
455 var outgoing = channel.outgoing;
456 var packetSize = outgoing.packetSize;
457 var id = outgoing.id;
458 var window = outgoing.window;
459 var len = data.length;
460 var p = 0;
461 var ret;
462 var buf;
463 var sliceLen;
464
465 if (channel.outgoing.state !== 'open')
466 return;
467
468 while (len - p > 0 && window > 0) {
469 sliceLen = len - p;
470 if (sliceLen > window)
471 sliceLen = window;
472 if (sliceLen > packetSize)
473 sliceLen = packetSize;
474
475 ret = sshstream.channelExtData(id, data.slice(p, p + sliceLen), STDERR);
476
477 p += sliceLen;
478 window -= sliceLen;
479
480 if (!ret) {
481 channel._waitClientDrain = true;
482 channel._chunkErr = undefined;
483 channel._chunkcbErr = cb;
484 break;
485 }
486 }
487
488 outgoing.window = window;
489
490 if (len - p > 0) {
491 if (window === 0)
492 channel._waitWindow = true;
493 if (p > 0) {
494 // partial
495 buf = Buffer.allocUnsafe(len - p);
496 data.copy(buf, 0, p);
497 channel._chunkErr = buf;
498 } else
499 channel._chunkErr = data;
500 channel._chunkcbErr = cb;
501 return;
502 }
503
504 if (!channel._waitClientDrain)
505 cb();
506};
507
508module.exports = Channel;