UNPKG

22.9 kBJavaScriptView Raw
1'use strict';
2
3const EventEmitter = require('events');
4const crypto = require('crypto');
5const https = require('https');
6const http = require('http');
7const net = require('net');
8const tls = require('tls');
9const url = require('url');
10
11const PerMessageDeflate = require('./permessage-deflate');
12const EventTarget = require('./event-target');
13const extension = require('./extension');
14const constants = require('./constants');
15const Receiver = require('./receiver');
16const Sender = require('./sender');
17
18const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'];
19const kWebSocket = constants.kWebSocket;
20const protocolVersions = [8, 13];
21const closeTimeout = 30 * 1000; // Allow 30 seconds to terminate the connection cleanly.
22
23/**
24 * Class representing a WebSocket.
25 *
26 * @extends EventEmitter
27 */
28class WebSocket extends EventEmitter {
29 /**
30 * Create a new `WebSocket`.
31 *
32 * @param {(String|url.Url|url.URL)} address The URL to which to connect
33 * @param {(String|String[])} protocols The subprotocols
34 * @param {Object} options Connection options
35 */
36 constructor (address, protocols, options) {
37 super();
38
39 this.readyState = WebSocket.CONNECTING;
40 this.protocol = '';
41
42 this._binaryType = constants.BINARY_TYPES[0];
43 this._closeFrameReceived = false;
44 this._closeFrameSent = false;
45 this._closeMessage = '';
46 this._closeTimer = null;
47 this._closeCode = 1006;
48 this._extensions = {};
49 this._isServer = true;
50 this._receiver = null;
51 this._sender = null;
52 this._socket = null;
53
54 if (address !== null) {
55 if (Array.isArray(protocols)) {
56 protocols = protocols.join(', ');
57 } else if (typeof protocols === 'object' && protocols !== null) {
58 options = protocols;
59 protocols = undefined;
60 }
61
62 initAsClient.call(this, address, protocols, options);
63 }
64 }
65
66 get CONNECTING () { return WebSocket.CONNECTING; }
67 get CLOSING () { return WebSocket.CLOSING; }
68 get CLOSED () { return WebSocket.CLOSED; }
69 get OPEN () { return WebSocket.OPEN; }
70
71 /**
72 * This deviates from the WHATWG interface since ws doesn't support the required
73 * default "blob" type (instead we define a custom "nodebuffer" type).
74 *
75 * @type {String}
76 */
77 get binaryType () {
78 return this._binaryType;
79 }
80
81 set binaryType (type) {
82 if (constants.BINARY_TYPES.indexOf(type) < 0) return;
83
84 this._binaryType = type;
85
86 //
87 // Allow to change `binaryType` on the fly.
88 //
89 if (this._receiver) this._receiver._binaryType = type;
90 }
91
92 /**
93 * @type {Number}
94 */
95 get bufferedAmount () {
96 if (!this._socket) return 0;
97
98 //
99 // `socket.bufferSize` is `undefined` if the socket is closed.
100 //
101 return (this._socket.bufferSize || 0) + this._sender._bufferedBytes;
102 }
103
104 /**
105 * @type {String}
106 */
107 get extensions () {
108 return Object.keys(this._extensions).join();
109 }
110
111 /**
112 * Set up the socket and the internal resources.
113 *
114 * @param {net.Socket} socket The network socket between the server and client
115 * @param {Buffer} head The first packet of the upgraded stream
116 * @param {Number} maxPayload The maximum allowed message size
117 * @private
118 */
119 setSocket (socket, head, maxPayload) {
120 const receiver = new Receiver(
121 this._binaryType,
122 this._extensions,
123 maxPayload
124 );
125
126 this._sender = new Sender(socket, this._extensions);
127 this._receiver = receiver;
128 this._socket = socket;
129
130 receiver[kWebSocket] = this;
131 socket[kWebSocket] = this;
132
133 receiver.on('conclude', receiverOnConclude);
134 receiver.on('drain', receiverOnDrain);
135 receiver.on('error', receiverOnError);
136 receiver.on('message', receiverOnMessage);
137 receiver.on('ping', receiverOnPing);
138 receiver.on('pong', receiverOnPong);
139
140 socket.setTimeout(0);
141 socket.setNoDelay();
142
143 if (head.length > 0) socket.unshift(head);
144
145 socket.on('close', socketOnClose);
146 socket.on('data', socketOnData);
147 socket.on('end', socketOnEnd);
148 socket.on('error', socketOnError);
149
150 this.readyState = WebSocket.OPEN;
151 this.emit('open');
152 }
153
154 /**
155 * Emit the `'close'` event.
156 *
157 * @private
158 */
159 emitClose () {
160 this.readyState = WebSocket.CLOSED;
161
162 if (!this._socket) {
163 this.emit('close', this._closeCode, this._closeMessage);
164 return;
165 }
166
167 if (this._extensions[PerMessageDeflate.extensionName]) {
168 this._extensions[PerMessageDeflate.extensionName].cleanup();
169 }
170
171 this._receiver.removeAllListeners();
172 this.emit('close', this._closeCode, this._closeMessage);
173 }
174
175 /**
176 * Start a closing handshake.
177 *
178 * +----------+ +-----------+ +----------+
179 * - - -|ws.close()|-->|close frame|-->|ws.close()|- - -
180 * | +----------+ +-----------+ +----------+ |
181 * +----------+ +-----------+ |
182 * CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING
183 * +----------+ +-----------+ |
184 * | | | +---+ |
185 * +------------------------+-->|fin| - - - -
186 * | +---+ | +---+
187 * - - - - -|fin|<---------------------+
188 * +---+
189 *
190 * @param {Number} code Status code explaining why the connection is closing
191 * @param {String} data A string explaining why the connection is closing
192 * @public
193 */
194 close (code, data) {
195 if (this.readyState === WebSocket.CLOSED) return;
196 if (this.readyState === WebSocket.CONNECTING) {
197 const msg = 'WebSocket was closed before the connection was established';
198 return abortHandshake(this, this._req, msg);
199 }
200
201 if (this.readyState === WebSocket.CLOSING) {
202 if (this._closeFrameSent && this._closeFrameReceived) this._socket.end();
203 return;
204 }
205
206 this.readyState = WebSocket.CLOSING;
207 this._sender.close(code, data, !this._isServer, (err) => {
208 //
209 // This error is handled by the `'error'` listener on the socket. We only
210 // want to know if the close frame has been sent here.
211 //
212 if (err) return;
213
214 this._closeFrameSent = true;
215
216 if (this._socket.writable) {
217 if (this._closeFrameReceived) this._socket.end();
218
219 //
220 // Ensure that the connection is closed even if the closing handshake
221 // fails.
222 //
223 this._closeTimer = setTimeout(
224 this._socket.destroy.bind(this._socket),
225 closeTimeout
226 );
227 }
228 });
229 }
230
231 /**
232 * Send a ping.
233 *
234 * @param {*} data The data to send
235 * @param {Boolean} mask Indicates whether or not to mask `data`
236 * @param {Function} cb Callback which is executed when the ping is sent
237 * @public
238 */
239 ping (data, mask, cb) {
240 if (typeof data === 'function') {
241 cb = data;
242 data = mask = undefined;
243 } else if (typeof mask === 'function') {
244 cb = mask;
245 mask = undefined;
246 }
247
248 if (this.readyState !== WebSocket.OPEN) {
249 const err = new Error(
250 `WebSocket is not open: readyState ${this.readyState} ` +
251 `(${readyStates[this.readyState]})`
252 );
253
254 if (cb) return cb(err);
255 throw err;
256 }
257
258 if (typeof data === 'number') data = data.toString();
259 if (mask === undefined) mask = !this._isServer;
260 this._sender.ping(data || constants.EMPTY_BUFFER, mask, cb);
261 }
262
263 /**
264 * Send a pong.
265 *
266 * @param {*} data The data to send
267 * @param {Boolean} mask Indicates whether or not to mask `data`
268 * @param {Function} cb Callback which is executed when the pong is sent
269 * @public
270 */
271 pong (data, mask, cb) {
272 if (typeof data === 'function') {
273 cb = data;
274 data = mask = undefined;
275 } else if (typeof mask === 'function') {
276 cb = mask;
277 mask = undefined;
278 }
279
280 if (this.readyState !== WebSocket.OPEN) {
281 const err = new Error(
282 `WebSocket is not open: readyState ${this.readyState} ` +
283 `(${readyStates[this.readyState]})`
284 );
285
286 if (cb) return cb(err);
287 throw err;
288 }
289
290 if (typeof data === 'number') data = data.toString();
291 if (mask === undefined) mask = !this._isServer;
292 this._sender.pong(data || constants.EMPTY_BUFFER, mask, cb);
293 }
294
295 /**
296 * Send a data message.
297 *
298 * @param {*} data The message to send
299 * @param {Object} options Options object
300 * @param {Boolean} options.compress Specifies whether or not to compress `data`
301 * @param {Boolean} options.binary Specifies whether `data` is binary or text
302 * @param {Boolean} options.fin Specifies whether the fragment is the last one
303 * @param {Boolean} options.mask Specifies whether or not to mask `data`
304 * @param {Function} cb Callback which is executed when data is written out
305 * @public
306 */
307 send (data, options, cb) {
308 if (typeof options === 'function') {
309 cb = options;
310 options = {};
311 }
312
313 if (this.readyState !== WebSocket.OPEN) {
314 const err = new Error(
315 `WebSocket is not open: readyState ${this.readyState} ` +
316 `(${readyStates[this.readyState]})`
317 );
318
319 if (cb) return cb(err);
320 throw err;
321 }
322
323 if (typeof data === 'number') data = data.toString();
324
325 const opts = Object.assign({
326 binary: typeof data !== 'string',
327 mask: !this._isServer,
328 compress: true,
329 fin: true
330 }, options);
331
332 if (!this._extensions[PerMessageDeflate.extensionName]) {
333 opts.compress = false;
334 }
335
336 this._sender.send(data || constants.EMPTY_BUFFER, opts, cb);
337 }
338
339 /**
340 * Forcibly close the connection.
341 *
342 * @public
343 */
344 terminate () {
345 if (this.readyState === WebSocket.CLOSED) return;
346 if (this.readyState === WebSocket.CONNECTING) {
347 const msg = 'WebSocket was closed before the connection was established';
348 return abortHandshake(this, this._req, msg);
349 }
350
351 if (this._socket) {
352 this.readyState = WebSocket.CLOSING;
353 this._socket.destroy();
354 }
355 }
356}
357
358readyStates.forEach((readyState, i) => {
359 WebSocket[readyStates[i]] = i;
360});
361
362//
363// Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes.
364// See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface
365//
366['open', 'error', 'close', 'message'].forEach((method) => {
367 Object.defineProperty(WebSocket.prototype, `on${method}`, {
368 /**
369 * Return the listener of the event.
370 *
371 * @return {(Function|undefined)} The event listener or `undefined`
372 * @public
373 */
374 get () {
375 const listeners = this.listeners(method);
376 for (var i = 0; i < listeners.length; i++) {
377 if (listeners[i]._listener) return listeners[i]._listener;
378 }
379 },
380 /**
381 * Add a listener for the event.
382 *
383 * @param {Function} listener The listener to add
384 * @public
385 */
386 set (listener) {
387 const listeners = this.listeners(method);
388 for (var i = 0; i < listeners.length; i++) {
389 //
390 // Remove only the listeners added via `addEventListener`.
391 //
392 if (listeners[i]._listener) this.removeListener(method, listeners[i]);
393 }
394 this.addEventListener(method, listener);
395 }
396 });
397});
398
399WebSocket.prototype.addEventListener = EventTarget.addEventListener;
400WebSocket.prototype.removeEventListener = EventTarget.removeEventListener;
401
402module.exports = WebSocket;
403
404/**
405 * Initialize a WebSocket client.
406 *
407 * @param {(String|url.Url|url.URL)} address The URL to which to connect
408 * @param {String} protocols The subprotocols
409 * @param {Object} options Connection options
410 * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable permessage-deflate
411 * @param {Number} options.handshakeTimeout Timeout in milliseconds for the handshake request
412 * @param {Number} options.protocolVersion Value of the `Sec-WebSocket-Version` header
413 * @param {String} options.origin Value of the `Origin` or `Sec-WebSocket-Origin` header
414 * @private
415 */
416function initAsClient (address, protocols, options) {
417 options = Object.assign({
418 protocolVersion: protocolVersions[1],
419 perMessageDeflate: true
420 }, options, {
421 createConnection: undefined,
422 socketPath: undefined,
423 hostname: undefined,
424 protocol: undefined,
425 timeout: undefined,
426 method: undefined,
427 auth: undefined,
428 host: undefined,
429 path: undefined,
430 port: undefined
431 });
432
433 if (protocolVersions.indexOf(options.protocolVersion) === -1) {
434 throw new RangeError(
435 `Unsupported protocol version: ${options.protocolVersion} ` +
436 `(supported versions: ${protocolVersions.join(', ')})`
437 );
438 }
439
440 this._isServer = false;
441
442 var parsedUrl;
443
444 if (typeof address === 'object' && address.href !== undefined) {
445 parsedUrl = address;
446 this.url = address.href;
447 } else {
448 parsedUrl = url.parse(address);
449 this.url = address;
450 }
451
452 const isUnixSocket = parsedUrl.protocol === 'ws+unix:';
453
454 if (!parsedUrl.host && (!isUnixSocket || !parsedUrl.pathname)) {
455 throw new Error(`Invalid URL: ${this.url}`);
456 }
457
458 const isSecure = parsedUrl.protocol === 'wss:' || parsedUrl.protocol === 'https:';
459 const key = crypto.randomBytes(16).toString('base64');
460 const httpObj = isSecure ? https : http;
461 const path = parsedUrl.search
462 ? `${parsedUrl.pathname || '/'}${parsedUrl.search}`
463 : parsedUrl.pathname || '/';
464 var perMessageDeflate;
465
466 options.createConnection = isSecure ? tlsConnect : netConnect;
467 options.port = parsedUrl.port || (isSecure ? 443 : 80);
468 options.host = parsedUrl.hostname.startsWith('[')
469 ? parsedUrl.hostname.slice(1, -1)
470 : parsedUrl.hostname;
471 options.headers = Object.assign({
472 'Sec-WebSocket-Version': options.protocolVersion,
473 'Sec-WebSocket-Key': key,
474 'Connection': 'Upgrade',
475 'Upgrade': 'websocket'
476 }, options.headers);
477 options.path = path;
478
479 if (options.perMessageDeflate) {
480 perMessageDeflate = new PerMessageDeflate(
481 options.perMessageDeflate !== true ? options.perMessageDeflate : {},
482 false
483 );
484 options.headers['Sec-WebSocket-Extensions'] = extension.format({
485 [PerMessageDeflate.extensionName]: perMessageDeflate.offer()
486 });
487 }
488 if (protocols) {
489 options.headers['Sec-WebSocket-Protocol'] = protocols;
490 }
491 if (options.origin) {
492 if (options.protocolVersion < 13) {
493 options.headers['Sec-WebSocket-Origin'] = options.origin;
494 } else {
495 options.headers.Origin = options.origin;
496 }
497 }
498 if (parsedUrl.auth) {
499 options.auth = parsedUrl.auth;
500 } else if (parsedUrl.username || parsedUrl.password) {
501 options.auth = `${parsedUrl.username}:${parsedUrl.password}`;
502 }
503
504 if (isUnixSocket) {
505 const parts = path.split(':');
506
507 if (options.agent == null && process.versions.modules < 57) {
508 //
509 // Setting `socketPath` in conjunction with `createConnection` without an
510 // agent throws an error on Node.js < 8. Work around the issue by using a
511 // different property.
512 //
513 options._socketPath = parts[0];
514 } else {
515 options.socketPath = parts[0];
516 }
517
518 options.path = parts[1];
519 }
520
521 var req = this._req = httpObj.get(options);
522
523 if (options.handshakeTimeout) {
524 req.setTimeout(
525 options.handshakeTimeout,
526 () => abortHandshake(this, req, 'Opening handshake has timed out')
527 );
528 }
529
530 req.on('error', (err) => {
531 if (this._req.aborted) return;
532
533 req = this._req = null;
534 this.readyState = WebSocket.CLOSING;
535 this.emit('error', err);
536 this.emitClose();
537 });
538
539 req.on('response', (res) => {
540 if (this.emit('unexpected-response', req, res)) return;
541
542 abortHandshake(this, req, `Unexpected server response: ${res.statusCode}`);
543 });
544
545 req.on('upgrade', (res, socket, head) => {
546 this.emit('upgrade', res);
547
548 //
549 // The user may have closed the connection from a listener of the `upgrade`
550 // event.
551 //
552 if (this.readyState !== WebSocket.CONNECTING) return;
553
554 req = this._req = null;
555
556 const digest = crypto.createHash('sha1')
557 .update(key + constants.GUID, 'binary')
558 .digest('base64');
559
560 if (res.headers['sec-websocket-accept'] !== digest) {
561 abortHandshake(this, socket, 'Invalid Sec-WebSocket-Accept header');
562 return;
563 }
564
565 const serverProt = res.headers['sec-websocket-protocol'];
566 const protList = (protocols || '').split(/, */);
567 var protError;
568
569 if (!protocols && serverProt) {
570 protError = 'Server sent a subprotocol but none was requested';
571 } else if (protocols && !serverProt) {
572 protError = 'Server sent no subprotocol';
573 } else if (serverProt && protList.indexOf(serverProt) === -1) {
574 protError = 'Server sent an invalid subprotocol';
575 }
576
577 if (protError) {
578 abortHandshake(this, socket, protError);
579 return;
580 }
581
582 if (serverProt) this.protocol = serverProt;
583
584 if (perMessageDeflate) {
585 try {
586 const extensions = extension.parse(
587 res.headers['sec-websocket-extensions']
588 );
589
590 if (extensions[PerMessageDeflate.extensionName]) {
591 perMessageDeflate.accept(
592 extensions[PerMessageDeflate.extensionName]
593 );
594 this._extensions[PerMessageDeflate.extensionName] = perMessageDeflate;
595 }
596 } catch (err) {
597 abortHandshake(this, socket, 'Invalid Sec-WebSocket-Extensions header');
598 return;
599 }
600 }
601
602 this.setSocket(socket, head, 0);
603 });
604}
605
606/**
607 * Create a `net.Socket` and initiate a connection.
608 *
609 * @param {Object} options Connection options
610 * @return {net.Socket} The newly created socket used to start the connection
611 * @private
612 */
613function netConnect (options) {
614 options.path = options.socketPath || options._socketPath || undefined;
615 return net.connect(options);
616}
617
618/**
619 * Create a `tls.TLSSocket` and initiate a connection.
620 *
621 * @param {Object} options Connection options
622 * @return {tls.TLSSocket} The newly created socket used to start the connection
623 * @private
624 */
625function tlsConnect (options) {
626 options.path = options.socketPath || options._socketPath || undefined;
627 options.servername = options.servername || options.host;
628 return tls.connect(options);
629}
630
631/**
632 * Abort the handshake and emit an error.
633 *
634 * @param {WebSocket} websocket The WebSocket instance
635 * @param {(http.ClientRequest|net.Socket)} stream The request to abort or the
636 * socket to destroy
637 * @param {String} message The error message
638 * @private
639 */
640function abortHandshake (websocket, stream, message) {
641 websocket.readyState = WebSocket.CLOSING;
642
643 const err = new Error(message);
644 Error.captureStackTrace(err, abortHandshake);
645
646 if (stream.setHeader) {
647 stream.abort();
648 stream.once('abort', websocket.emitClose.bind(websocket));
649 websocket.emit('error', err);
650 } else {
651 stream.destroy(err);
652 stream.once('error', websocket.emit.bind(websocket, 'error'));
653 stream.once('close', websocket.emitClose.bind(websocket));
654 }
655}
656
657/**
658 * The listener of the `Receiver` `'conclude'` event.
659 *
660 * @param {Number} code The status code
661 * @param {String} reason The reason for closing
662 * @private
663 */
664function receiverOnConclude (code, reason) {
665 const websocket = this[kWebSocket];
666
667 websocket._socket.removeListener('data', socketOnData);
668 websocket._socket.resume();
669
670 websocket._closeFrameReceived = true;
671 websocket._closeMessage = reason;
672 websocket._closeCode = code;
673
674 if (code === 1005) websocket.close();
675 else websocket.close(code, reason);
676}
677
678/**
679 * The listener of the `Receiver` `'drain'` event.
680 *
681 * @private
682 */
683function receiverOnDrain () {
684 this[kWebSocket]._socket.resume();
685}
686
687/**
688 * The listener of the `Receiver` `'error'` event.
689 *
690 * @param {(RangeError|Error)} err The emitted error
691 * @private
692 */
693function receiverOnError (err) {
694 const websocket = this[kWebSocket];
695
696 websocket._socket.removeListener('data', socketOnData);
697
698 websocket.readyState = WebSocket.CLOSING;
699 websocket._closeCode = err[constants.kStatusCode];
700 websocket.emit('error', err);
701 websocket._socket.destroy();
702}
703
704/**
705 * The listener of the `Receiver` `'finish'` event.
706 *
707 * @private
708 */
709function receiverOnFinish () {
710 this[kWebSocket].emitClose();
711}
712
713/**
714 * The listener of the `Receiver` `'message'` event.
715 *
716 * @param {(String|Buffer|ArrayBuffer|Buffer[])} data The message
717 * @private
718 */
719function receiverOnMessage (data) {
720 this[kWebSocket].emit('message', data);
721}
722
723/**
724 * The listener of the `Receiver` `'ping'` event.
725 *
726 * @param {Buffer} data The data included in the ping frame
727 * @private
728 */
729function receiverOnPing (data) {
730 const websocket = this[kWebSocket];
731
732 websocket.pong(data, !websocket._isServer, constants.NOOP);
733 websocket.emit('ping', data);
734}
735
736/**
737 * The listener of the `Receiver` `'pong'` event.
738 *
739 * @param {Buffer} data The data included in the pong frame
740 * @private
741 */
742function receiverOnPong (data) {
743 this[kWebSocket].emit('pong', data);
744}
745
746/**
747 * The listener of the `net.Socket` `'close'` event.
748 *
749 * @private
750 */
751function socketOnClose () {
752 const websocket = this[kWebSocket];
753
754 this.removeListener('close', socketOnClose);
755 this.removeListener('end', socketOnEnd);
756 this[kWebSocket] = undefined;
757
758 websocket.readyState = WebSocket.CLOSING;
759
760 //
761 // The close frame might not have been received or the `'end'` event emitted,
762 // for example, if the socket was destroyed due to an error. Ensure that the
763 // `receiver` stream is closed after writing any remaining buffered data to
764 // it. If the readable side of the socket is in flowing mode then there is no
765 // buffered data as everything has been already written and `readable.read()`
766 // will return `null`. If instead, the socket is paused, any possible buffered
767 // data will be read as a single chunk and emitted synchronously in a single
768 // `'data'` event.
769 //
770 websocket._socket.read();
771 websocket._receiver.end();
772
773 this.removeListener('data', socketOnData);
774 clearTimeout(websocket._closeTimer);
775
776 if (
777 websocket._receiver._writableState.finished ||
778 websocket._receiver._writableState.errorEmitted
779 ) {
780 websocket.emitClose();
781 } else {
782 websocket._receiver.on('error', receiverOnFinish);
783 websocket._receiver.on('finish', receiverOnFinish);
784 }
785}
786
787/**
788 * The listener of the `net.Socket` `'data'` event.
789 *
790 * @param {Buffer} chunk A chunk of data
791 * @private
792 */
793function socketOnData (chunk) {
794 if (!this[kWebSocket]._receiver.write(chunk)) {
795 this.pause();
796 }
797}
798
799/**
800 * The listener of the `net.Socket` `'end'` event.
801 *
802 * @private
803 */
804function socketOnEnd () {
805 const websocket = this[kWebSocket];
806
807 websocket.readyState = WebSocket.CLOSING;
808 websocket._receiver.end();
809 this.end();
810}
811
812/**
813 * The listener of the `net.Socket` `'error'` event.
814 *
815 * @private
816 */
817function socketOnError () {
818 const websocket = this[kWebSocket];
819
820 this.removeListener('error', socketOnError);
821 this.on('error', constants.NOOP);
822
823 if (websocket) {
824 websocket.readyState = WebSocket.CLOSING;
825 this.destroy();
826 }
827}