UNPKG

35.3 kBJavaScriptView Raw
1/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex|Readable$", "caughtErrors": "none" }] */
2
3'use strict';
4
5const EventEmitter = require('events');
6const https = require('https');
7const http = require('http');
8const net = require('net');
9const tls = require('tls');
10const { randomBytes, createHash } = require('crypto');
11const { Duplex, Readable } = require('stream');
12const { URL } = require('url');
13
14const PerMessageDeflate = require('./permessage-deflate');
15const Receiver = require('./receiver');
16const Sender = require('./sender');
17const {
18 BINARY_TYPES,
19 EMPTY_BUFFER,
20 GUID,
21 kForOnEventAttribute,
22 kListener,
23 kStatusCode,
24 kWebSocket,
25 NOOP
26} = require('./constants');
27const {
28 EventTarget: { addEventListener, removeEventListener }
29} = require('./event-target');
30const { format, parse } = require('./extension');
31const { toBuffer } = require('./buffer-util');
32
33const closeTimeout = 30 * 1000;
34const kAborted = Symbol('kAborted');
35const protocolVersions = [8, 13];
36const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'];
37const subprotocolRegex = /^[!#$%&'*+\-.0-9A-Z^_`|a-z~]+$/;
38
39/**
40 * Class representing a WebSocket.
41 *
42 * @extends EventEmitter
43 */
44class WebSocket extends EventEmitter {
45 /**
46 * Create a new `WebSocket`.
47 *
48 * @param {(String|URL)} address The URL to which to connect
49 * @param {(String|String[])} [protocols] The subprotocols
50 * @param {Object} [options] Connection options
51 */
52 constructor(address, protocols, options) {
53 super();
54
55 this._binaryType = BINARY_TYPES[0];
56 this._closeCode = 1006;
57 this._closeFrameReceived = false;
58 this._closeFrameSent = false;
59 this._closeMessage = EMPTY_BUFFER;
60 this._closeTimer = null;
61 this._extensions = {};
62 this._paused = false;
63 this._protocol = '';
64 this._readyState = WebSocket.CONNECTING;
65 this._receiver = null;
66 this._sender = null;
67 this._socket = null;
68
69 if (address !== null) {
70 this._bufferedAmount = 0;
71 this._isServer = false;
72 this._redirects = 0;
73
74 if (protocols === undefined) {
75 protocols = [];
76 } else if (!Array.isArray(protocols)) {
77 if (typeof protocols === 'object' && protocols !== null) {
78 options = protocols;
79 protocols = [];
80 } else {
81 protocols = [protocols];
82 }
83 }
84
85 initAsClient(this, address, protocols, options);
86 } else {
87 this._autoPong = options.autoPong;
88 this._isServer = true;
89 }
90 }
91
92 /**
93 * This deviates from the WHATWG interface since ws doesn't support the
94 * required default "blob" type (instead we define a custom "nodebuffer"
95 * type).
96 *
97 * @type {String}
98 */
99 get binaryType() {
100 return this._binaryType;
101 }
102
103 set binaryType(type) {
104 if (!BINARY_TYPES.includes(type)) return;
105
106 this._binaryType = type;
107
108 //
109 // Allow to change `binaryType` on the fly.
110 //
111 if (this._receiver) this._receiver._binaryType = type;
112 }
113
114 /**
115 * @type {Number}
116 */
117 get bufferedAmount() {
118 if (!this._socket) return this._bufferedAmount;
119
120 return this._socket._writableState.length + this._sender._bufferedBytes;
121 }
122
123 /**
124 * @type {String}
125 */
126 get extensions() {
127 return Object.keys(this._extensions).join();
128 }
129
130 /**
131 * @type {Boolean}
132 */
133 get isPaused() {
134 return this._paused;
135 }
136
137 /**
138 * @type {Function}
139 */
140 /* istanbul ignore next */
141 get onclose() {
142 return null;
143 }
144
145 /**
146 * @type {Function}
147 */
148 /* istanbul ignore next */
149 get onerror() {
150 return null;
151 }
152
153 /**
154 * @type {Function}
155 */
156 /* istanbul ignore next */
157 get onopen() {
158 return null;
159 }
160
161 /**
162 * @type {Function}
163 */
164 /* istanbul ignore next */
165 get onmessage() {
166 return null;
167 }
168
169 /**
170 * @type {String}
171 */
172 get protocol() {
173 return this._protocol;
174 }
175
176 /**
177 * @type {Number}
178 */
179 get readyState() {
180 return this._readyState;
181 }
182
183 /**
184 * @type {String}
185 */
186 get url() {
187 return this._url;
188 }
189
190 /**
191 * Set up the socket and the internal resources.
192 *
193 * @param {Duplex} socket The network socket between the server and client
194 * @param {Buffer} head The first packet of the upgraded stream
195 * @param {Object} options Options object
196 * @param {Boolean} [options.allowSynchronousEvents=false] Specifies whether
197 * any of the `'message'`, `'ping'`, and `'pong'` events can be emitted
198 * multiple times in the same tick
199 * @param {Function} [options.generateMask] The function used to generate the
200 * masking key
201 * @param {Number} [options.maxPayload=0] The maximum allowed message size
202 * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
203 * not to skip UTF-8 validation for text and close messages
204 * @private
205 */
206 setSocket(socket, head, options) {
207 const receiver = new Receiver({
208 allowSynchronousEvents: options.allowSynchronousEvents,
209 binaryType: this.binaryType,
210 extensions: this._extensions,
211 isServer: this._isServer,
212 maxPayload: options.maxPayload,
213 skipUTF8Validation: options.skipUTF8Validation
214 });
215
216 this._sender = new Sender(socket, this._extensions, options.generateMask);
217 this._receiver = receiver;
218 this._socket = socket;
219
220 receiver[kWebSocket] = this;
221 socket[kWebSocket] = this;
222
223 receiver.on('conclude', receiverOnConclude);
224 receiver.on('drain', receiverOnDrain);
225 receiver.on('error', receiverOnError);
226 receiver.on('message', receiverOnMessage);
227 receiver.on('ping', receiverOnPing);
228 receiver.on('pong', receiverOnPong);
229
230 //
231 // These methods may not be available if `socket` is just a `Duplex`.
232 //
233 if (socket.setTimeout) socket.setTimeout(0);
234 if (socket.setNoDelay) socket.setNoDelay();
235
236 if (head.length > 0) socket.unshift(head);
237
238 socket.on('close', socketOnClose);
239 socket.on('data', socketOnData);
240 socket.on('end', socketOnEnd);
241 socket.on('error', socketOnError);
242
243 this._readyState = WebSocket.OPEN;
244 this.emit('open');
245 }
246
247 /**
248 * Emit the `'close'` event.
249 *
250 * @private
251 */
252 emitClose() {
253 if (!this._socket) {
254 this._readyState = WebSocket.CLOSED;
255 this.emit('close', this._closeCode, this._closeMessage);
256 return;
257 }
258
259 if (this._extensions[PerMessageDeflate.extensionName]) {
260 this._extensions[PerMessageDeflate.extensionName].cleanup();
261 }
262
263 this._receiver.removeAllListeners();
264 this._readyState = WebSocket.CLOSED;
265 this.emit('close', this._closeCode, this._closeMessage);
266 }
267
268 /**
269 * Start a closing handshake.
270 *
271 * +----------+ +-----------+ +----------+
272 * - - -|ws.close()|-->|close frame|-->|ws.close()|- - -
273 * | +----------+ +-----------+ +----------+ |
274 * +----------+ +-----------+ |
275 * CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING
276 * +----------+ +-----------+ |
277 * | | | +---+ |
278 * +------------------------+-->|fin| - - - -
279 * | +---+ | +---+
280 * - - - - -|fin|<---------------------+
281 * +---+
282 *
283 * @param {Number} [code] Status code explaining why the connection is closing
284 * @param {(String|Buffer)} [data] The reason why the connection is
285 * closing
286 * @public
287 */
288 close(code, data) {
289 if (this.readyState === WebSocket.CLOSED) return;
290 if (this.readyState === WebSocket.CONNECTING) {
291 const msg = 'WebSocket was closed before the connection was established';
292 abortHandshake(this, this._req, msg);
293 return;
294 }
295
296 if (this.readyState === WebSocket.CLOSING) {
297 if (
298 this._closeFrameSent &&
299 (this._closeFrameReceived || this._receiver._writableState.errorEmitted)
300 ) {
301 this._socket.end();
302 }
303
304 return;
305 }
306
307 this._readyState = WebSocket.CLOSING;
308 this._sender.close(code, data, !this._isServer, (err) => {
309 //
310 // This error is handled by the `'error'` listener on the socket. We only
311 // want to know if the close frame has been sent here.
312 //
313 if (err) return;
314
315 this._closeFrameSent = true;
316
317 if (
318 this._closeFrameReceived ||
319 this._receiver._writableState.errorEmitted
320 ) {
321 this._socket.end();
322 }
323 });
324
325 //
326 // Specify a timeout for the closing handshake to complete.
327 //
328 this._closeTimer = setTimeout(
329 this._socket.destroy.bind(this._socket),
330 closeTimeout
331 );
332 }
333
334 /**
335 * Pause the socket.
336 *
337 * @public
338 */
339 pause() {
340 if (
341 this.readyState === WebSocket.CONNECTING ||
342 this.readyState === WebSocket.CLOSED
343 ) {
344 return;
345 }
346
347 this._paused = true;
348 this._socket.pause();
349 }
350
351 /**
352 * Send a ping.
353 *
354 * @param {*} [data] The data to send
355 * @param {Boolean} [mask] Indicates whether or not to mask `data`
356 * @param {Function} [cb] Callback which is executed when the ping is sent
357 * @public
358 */
359 ping(data, mask, cb) {
360 if (this.readyState === WebSocket.CONNECTING) {
361 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)');
362 }
363
364 if (typeof data === 'function') {
365 cb = data;
366 data = mask = undefined;
367 } else if (typeof mask === 'function') {
368 cb = mask;
369 mask = undefined;
370 }
371
372 if (typeof data === 'number') data = data.toString();
373
374 if (this.readyState !== WebSocket.OPEN) {
375 sendAfterClose(this, data, cb);
376 return;
377 }
378
379 if (mask === undefined) mask = !this._isServer;
380 this._sender.ping(data || EMPTY_BUFFER, mask, cb);
381 }
382
383 /**
384 * Send a pong.
385 *
386 * @param {*} [data] The data to send
387 * @param {Boolean} [mask] Indicates whether or not to mask `data`
388 * @param {Function} [cb] Callback which is executed when the pong is sent
389 * @public
390 */
391 pong(data, mask, cb) {
392 if (this.readyState === WebSocket.CONNECTING) {
393 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)');
394 }
395
396 if (typeof data === 'function') {
397 cb = data;
398 data = mask = undefined;
399 } else if (typeof mask === 'function') {
400 cb = mask;
401 mask = undefined;
402 }
403
404 if (typeof data === 'number') data = data.toString();
405
406 if (this.readyState !== WebSocket.OPEN) {
407 sendAfterClose(this, data, cb);
408 return;
409 }
410
411 if (mask === undefined) mask = !this._isServer;
412 this._sender.pong(data || EMPTY_BUFFER, mask, cb);
413 }
414
415 /**
416 * Resume the socket.
417 *
418 * @public
419 */
420 resume() {
421 if (
422 this.readyState === WebSocket.CONNECTING ||
423 this.readyState === WebSocket.CLOSED
424 ) {
425 return;
426 }
427
428 this._paused = false;
429 if (!this._receiver._writableState.needDrain) this._socket.resume();
430 }
431
432 /**
433 * Send a data message.
434 *
435 * @param {*} data The message to send
436 * @param {Object} [options] Options object
437 * @param {Boolean} [options.binary] Specifies whether `data` is binary or
438 * text
439 * @param {Boolean} [options.compress] Specifies whether or not to compress
440 * `data`
441 * @param {Boolean} [options.fin=true] Specifies whether the fragment is the
442 * last one
443 * @param {Boolean} [options.mask] Specifies whether or not to mask `data`
444 * @param {Function} [cb] Callback which is executed when data is written out
445 * @public
446 */
447 send(data, options, cb) {
448 if (this.readyState === WebSocket.CONNECTING) {
449 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)');
450 }
451
452 if (typeof options === 'function') {
453 cb = options;
454 options = {};
455 }
456
457 if (typeof data === 'number') data = data.toString();
458
459 if (this.readyState !== WebSocket.OPEN) {
460 sendAfterClose(this, data, cb);
461 return;
462 }
463
464 const opts = {
465 binary: typeof data !== 'string',
466 mask: !this._isServer,
467 compress: true,
468 fin: true,
469 ...options
470 };
471
472 if (!this._extensions[PerMessageDeflate.extensionName]) {
473 opts.compress = false;
474 }
475
476 this._sender.send(data || EMPTY_BUFFER, opts, cb);
477 }
478
479 /**
480 * Forcibly close the connection.
481 *
482 * @public
483 */
484 terminate() {
485 if (this.readyState === WebSocket.CLOSED) return;
486 if (this.readyState === WebSocket.CONNECTING) {
487 const msg = 'WebSocket was closed before the connection was established';
488 abortHandshake(this, this._req, msg);
489 return;
490 }
491
492 if (this._socket) {
493 this._readyState = WebSocket.CLOSING;
494 this._socket.destroy();
495 }
496 }
497}
498
499/**
500 * @constant {Number} CONNECTING
501 * @memberof WebSocket
502 */
503Object.defineProperty(WebSocket, 'CONNECTING', {
504 enumerable: true,
505 value: readyStates.indexOf('CONNECTING')
506});
507
508/**
509 * @constant {Number} CONNECTING
510 * @memberof WebSocket.prototype
511 */
512Object.defineProperty(WebSocket.prototype, 'CONNECTING', {
513 enumerable: true,
514 value: readyStates.indexOf('CONNECTING')
515});
516
517/**
518 * @constant {Number} OPEN
519 * @memberof WebSocket
520 */
521Object.defineProperty(WebSocket, 'OPEN', {
522 enumerable: true,
523 value: readyStates.indexOf('OPEN')
524});
525
526/**
527 * @constant {Number} OPEN
528 * @memberof WebSocket.prototype
529 */
530Object.defineProperty(WebSocket.prototype, 'OPEN', {
531 enumerable: true,
532 value: readyStates.indexOf('OPEN')
533});
534
535/**
536 * @constant {Number} CLOSING
537 * @memberof WebSocket
538 */
539Object.defineProperty(WebSocket, 'CLOSING', {
540 enumerable: true,
541 value: readyStates.indexOf('CLOSING')
542});
543
544/**
545 * @constant {Number} CLOSING
546 * @memberof WebSocket.prototype
547 */
548Object.defineProperty(WebSocket.prototype, 'CLOSING', {
549 enumerable: true,
550 value: readyStates.indexOf('CLOSING')
551});
552
553/**
554 * @constant {Number} CLOSED
555 * @memberof WebSocket
556 */
557Object.defineProperty(WebSocket, 'CLOSED', {
558 enumerable: true,
559 value: readyStates.indexOf('CLOSED')
560});
561
562/**
563 * @constant {Number} CLOSED
564 * @memberof WebSocket.prototype
565 */
566Object.defineProperty(WebSocket.prototype, 'CLOSED', {
567 enumerable: true,
568 value: readyStates.indexOf('CLOSED')
569});
570
571[
572 'binaryType',
573 'bufferedAmount',
574 'extensions',
575 'isPaused',
576 'protocol',
577 'readyState',
578 'url'
579].forEach((property) => {
580 Object.defineProperty(WebSocket.prototype, property, { enumerable: true });
581});
582
583//
584// Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes.
585// See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface
586//
587['open', 'error', 'close', 'message'].forEach((method) => {
588 Object.defineProperty(WebSocket.prototype, `on${method}`, {
589 enumerable: true,
590 get() {
591 for (const listener of this.listeners(method)) {
592 if (listener[kForOnEventAttribute]) return listener[kListener];
593 }
594
595 return null;
596 },
597 set(handler) {
598 for (const listener of this.listeners(method)) {
599 if (listener[kForOnEventAttribute]) {
600 this.removeListener(method, listener);
601 break;
602 }
603 }
604
605 if (typeof handler !== 'function') return;
606
607 this.addEventListener(method, handler, {
608 [kForOnEventAttribute]: true
609 });
610 }
611 });
612});
613
614WebSocket.prototype.addEventListener = addEventListener;
615WebSocket.prototype.removeEventListener = removeEventListener;
616
617module.exports = WebSocket;
618
619/**
620 * Initialize a WebSocket client.
621 *
622 * @param {WebSocket} websocket The client to initialize
623 * @param {(String|URL)} address The URL to which to connect
624 * @param {Array} protocols The subprotocols
625 * @param {Object} [options] Connection options
626 * @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether any
627 * of the `'message'`, `'ping'`, and `'pong'` events can be emitted multiple
628 * times in the same tick
629 * @param {Boolean} [options.autoPong=true] Specifies whether or not to
630 * automatically send a pong in response to a ping
631 * @param {Function} [options.finishRequest] A function which can be used to
632 * customize the headers of each http request before it is sent
633 * @param {Boolean} [options.followRedirects=false] Whether or not to follow
634 * redirects
635 * @param {Function} [options.generateMask] The function used to generate the
636 * masking key
637 * @param {Number} [options.handshakeTimeout] Timeout in milliseconds for the
638 * handshake request
639 * @param {Number} [options.maxPayload=104857600] The maximum allowed message
640 * size
641 * @param {Number} [options.maxRedirects=10] The maximum number of redirects
642 * allowed
643 * @param {String} [options.origin] Value of the `Origin` or
644 * `Sec-WebSocket-Origin` header
645 * @param {(Boolean|Object)} [options.perMessageDeflate=true] Enable/disable
646 * permessage-deflate
647 * @param {Number} [options.protocolVersion=13] Value of the
648 * `Sec-WebSocket-Version` header
649 * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
650 * not to skip UTF-8 validation for text and close messages
651 * @private
652 */
653function initAsClient(websocket, address, protocols, options) {
654 const opts = {
655 allowSynchronousEvents: true,
656 autoPong: true,
657 protocolVersion: protocolVersions[1],
658 maxPayload: 100 * 1024 * 1024,
659 skipUTF8Validation: false,
660 perMessageDeflate: true,
661 followRedirects: false,
662 maxRedirects: 10,
663 ...options,
664 socketPath: undefined,
665 hostname: undefined,
666 protocol: undefined,
667 timeout: undefined,
668 method: 'GET',
669 host: undefined,
670 path: undefined,
671 port: undefined
672 };
673
674 websocket._autoPong = opts.autoPong;
675
676 if (!protocolVersions.includes(opts.protocolVersion)) {
677 throw new RangeError(
678 `Unsupported protocol version: ${opts.protocolVersion} ` +
679 `(supported versions: ${protocolVersions.join(', ')})`
680 );
681 }
682
683 let parsedUrl;
684
685 if (address instanceof URL) {
686 parsedUrl = address;
687 } else {
688 try {
689 parsedUrl = new URL(address);
690 } catch (e) {
691 throw new SyntaxError(`Invalid URL: ${address}`);
692 }
693 }
694
695 if (parsedUrl.protocol === 'http:') {
696 parsedUrl.protocol = 'ws:';
697 } else if (parsedUrl.protocol === 'https:') {
698 parsedUrl.protocol = 'wss:';
699 }
700
701 websocket._url = parsedUrl.href;
702
703 const isSecure = parsedUrl.protocol === 'wss:';
704 const isIpcUrl = parsedUrl.protocol === 'ws+unix:';
705 let invalidUrlMessage;
706
707 if (parsedUrl.protocol !== 'ws:' && !isSecure && !isIpcUrl) {
708 invalidUrlMessage =
709 'The URL\'s protocol must be one of "ws:", "wss:", ' +
710 '"http:", "https", or "ws+unix:"';
711 } else if (isIpcUrl && !parsedUrl.pathname) {
712 invalidUrlMessage = "The URL's pathname is empty";
713 } else if (parsedUrl.hash) {
714 invalidUrlMessage = 'The URL contains a fragment identifier';
715 }
716
717 if (invalidUrlMessage) {
718 const err = new SyntaxError(invalidUrlMessage);
719
720 if (websocket._redirects === 0) {
721 throw err;
722 } else {
723 emitErrorAndClose(websocket, err);
724 return;
725 }
726 }
727
728 const defaultPort = isSecure ? 443 : 80;
729 const key = randomBytes(16).toString('base64');
730 const request = isSecure ? https.request : http.request;
731 const protocolSet = new Set();
732 let perMessageDeflate;
733
734 opts.createConnection =
735 opts.createConnection || (isSecure ? tlsConnect : netConnect);
736 opts.defaultPort = opts.defaultPort || defaultPort;
737 opts.port = parsedUrl.port || defaultPort;
738 opts.host = parsedUrl.hostname.startsWith('[')
739 ? parsedUrl.hostname.slice(1, -1)
740 : parsedUrl.hostname;
741 opts.headers = {
742 ...opts.headers,
743 'Sec-WebSocket-Version': opts.protocolVersion,
744 'Sec-WebSocket-Key': key,
745 Connection: 'Upgrade',
746 Upgrade: 'websocket'
747 };
748 opts.path = parsedUrl.pathname + parsedUrl.search;
749 opts.timeout = opts.handshakeTimeout;
750
751 if (opts.perMessageDeflate) {
752 perMessageDeflate = new PerMessageDeflate(
753 opts.perMessageDeflate !== true ? opts.perMessageDeflate : {},
754 false,
755 opts.maxPayload
756 );
757 opts.headers['Sec-WebSocket-Extensions'] = format({
758 [PerMessageDeflate.extensionName]: perMessageDeflate.offer()
759 });
760 }
761 if (protocols.length) {
762 for (const protocol of protocols) {
763 if (
764 typeof protocol !== 'string' ||
765 !subprotocolRegex.test(protocol) ||
766 protocolSet.has(protocol)
767 ) {
768 throw new SyntaxError(
769 'An invalid or duplicated subprotocol was specified'
770 );
771 }
772
773 protocolSet.add(protocol);
774 }
775
776 opts.headers['Sec-WebSocket-Protocol'] = protocols.join(',');
777 }
778 if (opts.origin) {
779 if (opts.protocolVersion < 13) {
780 opts.headers['Sec-WebSocket-Origin'] = opts.origin;
781 } else {
782 opts.headers.Origin = opts.origin;
783 }
784 }
785 if (parsedUrl.username || parsedUrl.password) {
786 opts.auth = `${parsedUrl.username}:${parsedUrl.password}`;
787 }
788
789 if (isIpcUrl) {
790 const parts = opts.path.split(':');
791
792 opts.socketPath = parts[0];
793 opts.path = parts[1];
794 }
795
796 let req;
797
798 if (opts.followRedirects) {
799 if (websocket._redirects === 0) {
800 websocket._originalIpc = isIpcUrl;
801 websocket._originalSecure = isSecure;
802 websocket._originalHostOrSocketPath = isIpcUrl
803 ? opts.socketPath
804 : parsedUrl.host;
805
806 const headers = options && options.headers;
807
808 //
809 // Shallow copy the user provided options so that headers can be changed
810 // without mutating the original object.
811 //
812 options = { ...options, headers: {} };
813
814 if (headers) {
815 for (const [key, value] of Object.entries(headers)) {
816 options.headers[key.toLowerCase()] = value;
817 }
818 }
819 } else if (websocket.listenerCount('redirect') === 0) {
820 const isSameHost = isIpcUrl
821 ? websocket._originalIpc
822 ? opts.socketPath === websocket._originalHostOrSocketPath
823 : false
824 : websocket._originalIpc
825 ? false
826 : parsedUrl.host === websocket._originalHostOrSocketPath;
827
828 if (!isSameHost || (websocket._originalSecure && !isSecure)) {
829 //
830 // Match curl 7.77.0 behavior and drop the following headers. These
831 // headers are also dropped when following a redirect to a subdomain.
832 //
833 delete opts.headers.authorization;
834 delete opts.headers.cookie;
835
836 if (!isSameHost) delete opts.headers.host;
837
838 opts.auth = undefined;
839 }
840 }
841
842 //
843 // Match curl 7.77.0 behavior and make the first `Authorization` header win.
844 // If the `Authorization` header is set, then there is nothing to do as it
845 // will take precedence.
846 //
847 if (opts.auth && !options.headers.authorization) {
848 options.headers.authorization =
849 'Basic ' + Buffer.from(opts.auth).toString('base64');
850 }
851
852 req = websocket._req = request(opts);
853
854 if (websocket._redirects) {
855 //
856 // Unlike what is done for the `'upgrade'` event, no early exit is
857 // triggered here if the user calls `websocket.close()` or
858 // `websocket.terminate()` from a listener of the `'redirect'` event. This
859 // is because the user can also call `request.destroy()` with an error
860 // before calling `websocket.close()` or `websocket.terminate()` and this
861 // would result in an error being emitted on the `request` object with no
862 // `'error'` event listeners attached.
863 //
864 websocket.emit('redirect', websocket.url, req);
865 }
866 } else {
867 req = websocket._req = request(opts);
868 }
869
870 if (opts.timeout) {
871 req.on('timeout', () => {
872 abortHandshake(websocket, req, 'Opening handshake has timed out');
873 });
874 }
875
876 req.on('error', (err) => {
877 if (req === null || req[kAborted]) return;
878
879 req = websocket._req = null;
880 emitErrorAndClose(websocket, err);
881 });
882
883 req.on('response', (res) => {
884 const location = res.headers.location;
885 const statusCode = res.statusCode;
886
887 if (
888 location &&
889 opts.followRedirects &&
890 statusCode >= 300 &&
891 statusCode < 400
892 ) {
893 if (++websocket._redirects > opts.maxRedirects) {
894 abortHandshake(websocket, req, 'Maximum redirects exceeded');
895 return;
896 }
897
898 req.abort();
899
900 let addr;
901
902 try {
903 addr = new URL(location, address);
904 } catch (e) {
905 const err = new SyntaxError(`Invalid URL: ${location}`);
906 emitErrorAndClose(websocket, err);
907 return;
908 }
909
910 initAsClient(websocket, addr, protocols, options);
911 } else if (!websocket.emit('unexpected-response', req, res)) {
912 abortHandshake(
913 websocket,
914 req,
915 `Unexpected server response: ${res.statusCode}`
916 );
917 }
918 });
919
920 req.on('upgrade', (res, socket, head) => {
921 websocket.emit('upgrade', res);
922
923 //
924 // The user may have closed the connection from a listener of the
925 // `'upgrade'` event.
926 //
927 if (websocket.readyState !== WebSocket.CONNECTING) return;
928
929 req = websocket._req = null;
930
931 const upgrade = res.headers.upgrade;
932
933 if (upgrade === undefined || upgrade.toLowerCase() !== 'websocket') {
934 abortHandshake(websocket, socket, 'Invalid Upgrade header');
935 return;
936 }
937
938 const digest = createHash('sha1')
939 .update(key + GUID)
940 .digest('base64');
941
942 if (res.headers['sec-websocket-accept'] !== digest) {
943 abortHandshake(websocket, socket, 'Invalid Sec-WebSocket-Accept header');
944 return;
945 }
946
947 const serverProt = res.headers['sec-websocket-protocol'];
948 let protError;
949
950 if (serverProt !== undefined) {
951 if (!protocolSet.size) {
952 protError = 'Server sent a subprotocol but none was requested';
953 } else if (!protocolSet.has(serverProt)) {
954 protError = 'Server sent an invalid subprotocol';
955 }
956 } else if (protocolSet.size) {
957 protError = 'Server sent no subprotocol';
958 }
959
960 if (protError) {
961 abortHandshake(websocket, socket, protError);
962 return;
963 }
964
965 if (serverProt) websocket._protocol = serverProt;
966
967 const secWebSocketExtensions = res.headers['sec-websocket-extensions'];
968
969 if (secWebSocketExtensions !== undefined) {
970 if (!perMessageDeflate) {
971 const message =
972 'Server sent a Sec-WebSocket-Extensions header but no extension ' +
973 'was requested';
974 abortHandshake(websocket, socket, message);
975 return;
976 }
977
978 let extensions;
979
980 try {
981 extensions = parse(secWebSocketExtensions);
982 } catch (err) {
983 const message = 'Invalid Sec-WebSocket-Extensions header';
984 abortHandshake(websocket, socket, message);
985 return;
986 }
987
988 const extensionNames = Object.keys(extensions);
989
990 if (
991 extensionNames.length !== 1 ||
992 extensionNames[0] !== PerMessageDeflate.extensionName
993 ) {
994 const message = 'Server indicated an extension that was not requested';
995 abortHandshake(websocket, socket, message);
996 return;
997 }
998
999 try {
1000 perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]);
1001 } catch (err) {
1002 const message = 'Invalid Sec-WebSocket-Extensions header';
1003 abortHandshake(websocket, socket, message);
1004 return;
1005 }
1006
1007 websocket._extensions[PerMessageDeflate.extensionName] =
1008 perMessageDeflate;
1009 }
1010
1011 websocket.setSocket(socket, head, {
1012 allowSynchronousEvents: opts.allowSynchronousEvents,
1013 generateMask: opts.generateMask,
1014 maxPayload: opts.maxPayload,
1015 skipUTF8Validation: opts.skipUTF8Validation
1016 });
1017 });
1018
1019 if (opts.finishRequest) {
1020 opts.finishRequest(req, websocket);
1021 } else {
1022 req.end();
1023 }
1024}
1025
1026/**
1027 * Emit the `'error'` and `'close'` events.
1028 *
1029 * @param {WebSocket} websocket The WebSocket instance
1030 * @param {Error} The error to emit
1031 * @private
1032 */
1033function emitErrorAndClose(websocket, err) {
1034 websocket._readyState = WebSocket.CLOSING;
1035 websocket.emit('error', err);
1036 websocket.emitClose();
1037}
1038
1039/**
1040 * Create a `net.Socket` and initiate a connection.
1041 *
1042 * @param {Object} options Connection options
1043 * @return {net.Socket} The newly created socket used to start the connection
1044 * @private
1045 */
1046function netConnect(options) {
1047 options.path = options.socketPath;
1048 return net.connect(options);
1049}
1050
1051/**
1052 * Create a `tls.TLSSocket` and initiate a connection.
1053 *
1054 * @param {Object} options Connection options
1055 * @return {tls.TLSSocket} The newly created socket used to start the connection
1056 * @private
1057 */
1058function tlsConnect(options) {
1059 options.path = undefined;
1060
1061 if (!options.servername && options.servername !== '') {
1062 options.servername = net.isIP(options.host) ? '' : options.host;
1063 }
1064
1065 return tls.connect(options);
1066}
1067
1068/**
1069 * Abort the handshake and emit an error.
1070 *
1071 * @param {WebSocket} websocket The WebSocket instance
1072 * @param {(http.ClientRequest|net.Socket|tls.Socket)} stream The request to
1073 * abort or the socket to destroy
1074 * @param {String} message The error message
1075 * @private
1076 */
1077function abortHandshake(websocket, stream, message) {
1078 websocket._readyState = WebSocket.CLOSING;
1079
1080 const err = new Error(message);
1081 Error.captureStackTrace(err, abortHandshake);
1082
1083 if (stream.setHeader) {
1084 stream[kAborted] = true;
1085 stream.abort();
1086
1087 if (stream.socket && !stream.socket.destroyed) {
1088 //
1089 // On Node.js >= 14.3.0 `request.abort()` does not destroy the socket if
1090 // called after the request completed. See
1091 // https://github.com/websockets/ws/issues/1869.
1092 //
1093 stream.socket.destroy();
1094 }
1095
1096 process.nextTick(emitErrorAndClose, websocket, err);
1097 } else {
1098 stream.destroy(err);
1099 stream.once('error', websocket.emit.bind(websocket, 'error'));
1100 stream.once('close', websocket.emitClose.bind(websocket));
1101 }
1102}
1103
1104/**
1105 * Handle cases where the `ping()`, `pong()`, or `send()` methods are called
1106 * when the `readyState` attribute is `CLOSING` or `CLOSED`.
1107 *
1108 * @param {WebSocket} websocket The WebSocket instance
1109 * @param {*} [data] The data to send
1110 * @param {Function} [cb] Callback
1111 * @private
1112 */
1113function sendAfterClose(websocket, data, cb) {
1114 if (data) {
1115 const length = toBuffer(data).length;
1116
1117 //
1118 // The `_bufferedAmount` property is used only when the peer is a client and
1119 // the opening handshake fails. Under these circumstances, in fact, the
1120 // `setSocket()` method is not called, so the `_socket` and `_sender`
1121 // properties are set to `null`.
1122 //
1123 if (websocket._socket) websocket._sender._bufferedBytes += length;
1124 else websocket._bufferedAmount += length;
1125 }
1126
1127 if (cb) {
1128 const err = new Error(
1129 `WebSocket is not open: readyState ${websocket.readyState} ` +
1130 `(${readyStates[websocket.readyState]})`
1131 );
1132 process.nextTick(cb, err);
1133 }
1134}
1135
1136/**
1137 * The listener of the `Receiver` `'conclude'` event.
1138 *
1139 * @param {Number} code The status code
1140 * @param {Buffer} reason The reason for closing
1141 * @private
1142 */
1143function receiverOnConclude(code, reason) {
1144 const websocket = this[kWebSocket];
1145
1146 websocket._closeFrameReceived = true;
1147 websocket._closeMessage = reason;
1148 websocket._closeCode = code;
1149
1150 if (websocket._socket[kWebSocket] === undefined) return;
1151
1152 websocket._socket.removeListener('data', socketOnData);
1153 process.nextTick(resume, websocket._socket);
1154
1155 if (code === 1005) websocket.close();
1156 else websocket.close(code, reason);
1157}
1158
1159/**
1160 * The listener of the `Receiver` `'drain'` event.
1161 *
1162 * @private
1163 */
1164function receiverOnDrain() {
1165 const websocket = this[kWebSocket];
1166
1167 if (!websocket.isPaused) websocket._socket.resume();
1168}
1169
1170/**
1171 * The listener of the `Receiver` `'error'` event.
1172 *
1173 * @param {(RangeError|Error)} err The emitted error
1174 * @private
1175 */
1176function receiverOnError(err) {
1177 const websocket = this[kWebSocket];
1178
1179 if (websocket._socket[kWebSocket] !== undefined) {
1180 websocket._socket.removeListener('data', socketOnData);
1181
1182 //
1183 // On Node.js < 14.0.0 the `'error'` event is emitted synchronously. See
1184 // https://github.com/websockets/ws/issues/1940.
1185 //
1186 process.nextTick(resume, websocket._socket);
1187
1188 websocket.close(err[kStatusCode]);
1189 }
1190
1191 websocket.emit('error', err);
1192}
1193
1194/**
1195 * The listener of the `Receiver` `'finish'` event.
1196 *
1197 * @private
1198 */
1199function receiverOnFinish() {
1200 this[kWebSocket].emitClose();
1201}
1202
1203/**
1204 * The listener of the `Receiver` `'message'` event.
1205 *
1206 * @param {Buffer|ArrayBuffer|Buffer[])} data The message
1207 * @param {Boolean} isBinary Specifies whether the message is binary or not
1208 * @private
1209 */
1210function receiverOnMessage(data, isBinary) {
1211 this[kWebSocket].emit('message', data, isBinary);
1212}
1213
1214/**
1215 * The listener of the `Receiver` `'ping'` event.
1216 *
1217 * @param {Buffer} data The data included in the ping frame
1218 * @private
1219 */
1220function receiverOnPing(data) {
1221 const websocket = this[kWebSocket];
1222
1223 if (websocket._autoPong) websocket.pong(data, !this._isServer, NOOP);
1224 websocket.emit('ping', data);
1225}
1226
1227/**
1228 * The listener of the `Receiver` `'pong'` event.
1229 *
1230 * @param {Buffer} data The data included in the pong frame
1231 * @private
1232 */
1233function receiverOnPong(data) {
1234 this[kWebSocket].emit('pong', data);
1235}
1236
1237/**
1238 * Resume a readable stream
1239 *
1240 * @param {Readable} stream The readable stream
1241 * @private
1242 */
1243function resume(stream) {
1244 stream.resume();
1245}
1246
1247/**
1248 * The listener of the socket `'close'` event.
1249 *
1250 * @private
1251 */
1252function socketOnClose() {
1253 const websocket = this[kWebSocket];
1254
1255 this.removeListener('close', socketOnClose);
1256 this.removeListener('data', socketOnData);
1257 this.removeListener('end', socketOnEnd);
1258
1259 websocket._readyState = WebSocket.CLOSING;
1260
1261 let chunk;
1262
1263 //
1264 // The close frame might not have been received or the `'end'` event emitted,
1265 // for example, if the socket was destroyed due to an error. Ensure that the
1266 // `receiver` stream is closed after writing any remaining buffered data to
1267 // it. If the readable side of the socket is in flowing mode then there is no
1268 // buffered data as everything has been already written and `readable.read()`
1269 // will return `null`. If instead, the socket is paused, any possible buffered
1270 // data will be read as a single chunk.
1271 //
1272 if (
1273 !this._readableState.endEmitted &&
1274 !websocket._closeFrameReceived &&
1275 !websocket._receiver._writableState.errorEmitted &&
1276 (chunk = websocket._socket.read()) !== null
1277 ) {
1278 websocket._receiver.write(chunk);
1279 }
1280
1281 websocket._receiver.end();
1282
1283 this[kWebSocket] = undefined;
1284
1285 clearTimeout(websocket._closeTimer);
1286
1287 if (
1288 websocket._receiver._writableState.finished ||
1289 websocket._receiver._writableState.errorEmitted
1290 ) {
1291 websocket.emitClose();
1292 } else {
1293 websocket._receiver.on('error', receiverOnFinish);
1294 websocket._receiver.on('finish', receiverOnFinish);
1295 }
1296}
1297
1298/**
1299 * The listener of the socket `'data'` event.
1300 *
1301 * @param {Buffer} chunk A chunk of data
1302 * @private
1303 */
1304function socketOnData(chunk) {
1305 if (!this[kWebSocket]._receiver.write(chunk)) {
1306 this.pause();
1307 }
1308}
1309
1310/**
1311 * The listener of the socket `'end'` event.
1312 *
1313 * @private
1314 */
1315function socketOnEnd() {
1316 const websocket = this[kWebSocket];
1317
1318 websocket._readyState = WebSocket.CLOSING;
1319 websocket._receiver.end();
1320 this.end();
1321}
1322
1323/**
1324 * The listener of the socket `'error'` event.
1325 *
1326 * @private
1327 */
1328function socketOnError() {
1329 const websocket = this[kWebSocket];
1330
1331 this.removeListener('error', socketOnError);
1332 this.on('error', NOOP);
1333
1334 if (websocket) {
1335 websocket._readyState = WebSocket.CLOSING;
1336 this.destroy();
1337 }
1338}