UNPKG

16.3 kBJavaScriptView Raw
1/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex$", "caughtErrors": "none" }] */
2
3'use strict';
4
5const EventEmitter = require('events');
6const http = require('http');
7const { Duplex } = require('stream');
8const { createHash } = require('crypto');
9
10const extension = require('./extension');
11const PerMessageDeflate = require('./permessage-deflate');
12const subprotocol = require('./subprotocol');
13const WebSocket = require('./websocket');
14const { GUID, kWebSocket } = require('./constants');
15
16const keyRegex = /^[+/0-9A-Za-z]{22}==$/;
17
18const RUNNING = 0;
19const CLOSING = 1;
20const CLOSED = 2;
21
22/**
23 * Class representing a WebSocket server.
24 *
25 * @extends EventEmitter
26 */
27class WebSocketServer extends EventEmitter {
28 /**
29 * Create a `WebSocketServer` instance.
30 *
31 * @param {Object} options Configuration options
32 * @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether
33 * any of the `'message'`, `'ping'`, and `'pong'` events can be emitted
34 * multiple times in the same tick
35 * @param {Boolean} [options.autoPong=true] Specifies whether or not to
36 * automatically send a pong in response to a ping
37 * @param {Number} [options.backlog=511] The maximum length of the queue of
38 * pending connections
39 * @param {Boolean} [options.clientTracking=true] Specifies whether or not to
40 * track clients
41 * @param {Function} [options.handleProtocols] A hook to handle protocols
42 * @param {String} [options.host] The hostname where to bind the server
43 * @param {Number} [options.maxPayload=104857600] The maximum allowed message
44 * size
45 * @param {Boolean} [options.noServer=false] Enable no server mode
46 * @param {String} [options.path] Accept only connections matching this path
47 * @param {(Boolean|Object)} [options.perMessageDeflate=false] Enable/disable
48 * permessage-deflate
49 * @param {Number} [options.port] The port where to bind the server
50 * @param {(http.Server|https.Server)} [options.server] A pre-created HTTP/S
51 * server to use
52 * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
53 * not to skip UTF-8 validation for text and close messages
54 * @param {Function} [options.verifyClient] A hook to reject connections
55 * @param {Function} [options.WebSocket=WebSocket] Specifies the `WebSocket`
56 * class to use. It must be the `WebSocket` class or class that extends it
57 * @param {Function} [callback] A listener for the `listening` event
58 */
59 constructor(options, callback) {
60 super();
61
62 options = {
63 allowSynchronousEvents: true,
64 autoPong: true,
65 maxPayload: 100 * 1024 * 1024,
66 skipUTF8Validation: false,
67 perMessageDeflate: false,
68 handleProtocols: null,
69 clientTracking: true,
70 verifyClient: null,
71 noServer: false,
72 backlog: null, // use default (511 as implemented in net.js)
73 server: null,
74 host: null,
75 path: null,
76 port: null,
77 WebSocket,
78 ...options
79 };
80
81 if (
82 (options.port == null && !options.server && !options.noServer) ||
83 (options.port != null && (options.server || options.noServer)) ||
84 (options.server && options.noServer)
85 ) {
86 throw new TypeError(
87 'One and only one of the "port", "server", or "noServer" options ' +
88 'must be specified'
89 );
90 }
91
92 if (options.port != null) {
93 this._server = http.createServer((req, res) => {
94 const body = http.STATUS_CODES[426];
95
96 res.writeHead(426, {
97 'Content-Length': body.length,
98 'Content-Type': 'text/plain'
99 });
100 res.end(body);
101 });
102 this._server.listen(
103 options.port,
104 options.host,
105 options.backlog,
106 callback
107 );
108 } else if (options.server) {
109 this._server = options.server;
110 }
111
112 if (this._server) {
113 const emitConnection = this.emit.bind(this, 'connection');
114
115 this._removeListeners = addListeners(this._server, {
116 listening: this.emit.bind(this, 'listening'),
117 error: this.emit.bind(this, 'error'),
118 upgrade: (req, socket, head) => {
119 this.handleUpgrade(req, socket, head, emitConnection);
120 }
121 });
122 }
123
124 if (options.perMessageDeflate === true) options.perMessageDeflate = {};
125 if (options.clientTracking) {
126 this.clients = new Set();
127 this._shouldEmitClose = false;
128 }
129
130 this.options = options;
131 this._state = RUNNING;
132 }
133
134 /**
135 * Returns the bound address, the address family name, and port of the server
136 * as reported by the operating system if listening on an IP socket.
137 * If the server is listening on a pipe or UNIX domain socket, the name is
138 * returned as a string.
139 *
140 * @return {(Object|String|null)} The address of the server
141 * @public
142 */
143 address() {
144 if (this.options.noServer) {
145 throw new Error('The server is operating in "noServer" mode');
146 }
147
148 if (!this._server) return null;
149 return this._server.address();
150 }
151
152 /**
153 * Stop the server from accepting new connections and emit the `'close'` event
154 * when all existing connections are closed.
155 *
156 * @param {Function} [cb] A one-time listener for the `'close'` event
157 * @public
158 */
159 close(cb) {
160 if (this._state === CLOSED) {
161 if (cb) {
162 this.once('close', () => {
163 cb(new Error('The server is not running'));
164 });
165 }
166
167 process.nextTick(emitClose, this);
168 return;
169 }
170
171 if (cb) this.once('close', cb);
172
173 if (this._state === CLOSING) return;
174 this._state = CLOSING;
175
176 if (this.options.noServer || this.options.server) {
177 if (this._server) {
178 this._removeListeners();
179 this._removeListeners = this._server = null;
180 }
181
182 if (this.clients) {
183 if (!this.clients.size) {
184 process.nextTick(emitClose, this);
185 } else {
186 this._shouldEmitClose = true;
187 }
188 } else {
189 process.nextTick(emitClose, this);
190 }
191 } else {
192 const server = this._server;
193
194 this._removeListeners();
195 this._removeListeners = this._server = null;
196
197 //
198 // The HTTP/S server was created internally. Close it, and rely on its
199 // `'close'` event.
200 //
201 server.close(() => {
202 emitClose(this);
203 });
204 }
205 }
206
207 /**
208 * See if a given request should be handled by this server instance.
209 *
210 * @param {http.IncomingMessage} req Request object to inspect
211 * @return {Boolean} `true` if the request is valid, else `false`
212 * @public
213 */
214 shouldHandle(req) {
215 if (this.options.path) {
216 const index = req.url.indexOf('?');
217 const pathname = index !== -1 ? req.url.slice(0, index) : req.url;
218
219 if (pathname !== this.options.path) return false;
220 }
221
222 return true;
223 }
224
225 /**
226 * Handle a HTTP Upgrade request.
227 *
228 * @param {http.IncomingMessage} req The request object
229 * @param {Duplex} socket The network socket between the server and client
230 * @param {Buffer} head The first packet of the upgraded stream
231 * @param {Function} cb Callback
232 * @public
233 */
234 handleUpgrade(req, socket, head, cb) {
235 socket.on('error', socketOnError);
236
237 const key = req.headers['sec-websocket-key'];
238 const upgrade = req.headers.upgrade;
239 const version = +req.headers['sec-websocket-version'];
240
241 if (req.method !== 'GET') {
242 const message = 'Invalid HTTP method';
243 abortHandshakeOrEmitwsClientError(this, req, socket, 405, message);
244 return;
245 }
246
247 if (upgrade === undefined || upgrade.toLowerCase() !== 'websocket') {
248 const message = 'Invalid Upgrade header';
249 abortHandshakeOrEmitwsClientError(this, req, socket, 400, message);
250 return;
251 }
252
253 if (key === undefined || !keyRegex.test(key)) {
254 const message = 'Missing or invalid Sec-WebSocket-Key header';
255 abortHandshakeOrEmitwsClientError(this, req, socket, 400, message);
256 return;
257 }
258
259 if (version !== 8 && version !== 13) {
260 const message = 'Missing or invalid Sec-WebSocket-Version header';
261 abortHandshakeOrEmitwsClientError(this, req, socket, 400, message);
262 return;
263 }
264
265 if (!this.shouldHandle(req)) {
266 abortHandshake(socket, 400);
267 return;
268 }
269
270 const secWebSocketProtocol = req.headers['sec-websocket-protocol'];
271 let protocols = new Set();
272
273 if (secWebSocketProtocol !== undefined) {
274 try {
275 protocols = subprotocol.parse(secWebSocketProtocol);
276 } catch (err) {
277 const message = 'Invalid Sec-WebSocket-Protocol header';
278 abortHandshakeOrEmitwsClientError(this, req, socket, 400, message);
279 return;
280 }
281 }
282
283 const secWebSocketExtensions = req.headers['sec-websocket-extensions'];
284 const extensions = {};
285
286 if (
287 this.options.perMessageDeflate &&
288 secWebSocketExtensions !== undefined
289 ) {
290 const perMessageDeflate = new PerMessageDeflate(
291 this.options.perMessageDeflate,
292 true,
293 this.options.maxPayload
294 );
295
296 try {
297 const offers = extension.parse(secWebSocketExtensions);
298
299 if (offers[PerMessageDeflate.extensionName]) {
300 perMessageDeflate.accept(offers[PerMessageDeflate.extensionName]);
301 extensions[PerMessageDeflate.extensionName] = perMessageDeflate;
302 }
303 } catch (err) {
304 const message =
305 'Invalid or unacceptable Sec-WebSocket-Extensions header';
306 abortHandshakeOrEmitwsClientError(this, req, socket, 400, message);
307 return;
308 }
309 }
310
311 //
312 // Optionally call external client verification handler.
313 //
314 if (this.options.verifyClient) {
315 const info = {
316 origin:
317 req.headers[`${version === 8 ? 'sec-websocket-origin' : 'origin'}`],
318 secure: !!(req.socket.authorized || req.socket.encrypted),
319 req
320 };
321
322 if (this.options.verifyClient.length === 2) {
323 this.options.verifyClient(info, (verified, code, message, headers) => {
324 if (!verified) {
325 return abortHandshake(socket, code || 401, message, headers);
326 }
327
328 this.completeUpgrade(
329 extensions,
330 key,
331 protocols,
332 req,
333 socket,
334 head,
335 cb
336 );
337 });
338 return;
339 }
340
341 if (!this.options.verifyClient(info)) return abortHandshake(socket, 401);
342 }
343
344 this.completeUpgrade(extensions, key, protocols, req, socket, head, cb);
345 }
346
347 /**
348 * Upgrade the connection to WebSocket.
349 *
350 * @param {Object} extensions The accepted extensions
351 * @param {String} key The value of the `Sec-WebSocket-Key` header
352 * @param {Set} protocols The subprotocols
353 * @param {http.IncomingMessage} req The request object
354 * @param {Duplex} socket The network socket between the server and client
355 * @param {Buffer} head The first packet of the upgraded stream
356 * @param {Function} cb Callback
357 * @throws {Error} If called more than once with the same socket
358 * @private
359 */
360 completeUpgrade(extensions, key, protocols, req, socket, head, cb) {
361 //
362 // Destroy the socket if the client has already sent a FIN packet.
363 //
364 if (!socket.readable || !socket.writable) return socket.destroy();
365
366 if (socket[kWebSocket]) {
367 throw new Error(
368 'server.handleUpgrade() was called more than once with the same ' +
369 'socket, possibly due to a misconfiguration'
370 );
371 }
372
373 if (this._state > RUNNING) return abortHandshake(socket, 503);
374
375 const digest = createHash('sha1')
376 .update(key + GUID)
377 .digest('base64');
378
379 const headers = [
380 'HTTP/1.1 101 Switching Protocols',
381 'Upgrade: websocket',
382 'Connection: Upgrade',
383 `Sec-WebSocket-Accept: ${digest}`
384 ];
385
386 const ws = new this.options.WebSocket(null, undefined, this.options);
387
388 if (protocols.size) {
389 //
390 // Optionally call external protocol selection handler.
391 //
392 const protocol = this.options.handleProtocols
393 ? this.options.handleProtocols(protocols, req)
394 : protocols.values().next().value;
395
396 if (protocol) {
397 headers.push(`Sec-WebSocket-Protocol: ${protocol}`);
398 ws._protocol = protocol;
399 }
400 }
401
402 if (extensions[PerMessageDeflate.extensionName]) {
403 const params = extensions[PerMessageDeflate.extensionName].params;
404 const value = extension.format({
405 [PerMessageDeflate.extensionName]: [params]
406 });
407 headers.push(`Sec-WebSocket-Extensions: ${value}`);
408 ws._extensions = extensions;
409 }
410
411 //
412 // Allow external modification/inspection of handshake headers.
413 //
414 this.emit('headers', headers, req);
415
416 socket.write(headers.concat('\r\n').join('\r\n'));
417 socket.removeListener('error', socketOnError);
418
419 ws.setSocket(socket, head, {
420 allowSynchronousEvents: this.options.allowSynchronousEvents,
421 maxPayload: this.options.maxPayload,
422 skipUTF8Validation: this.options.skipUTF8Validation
423 });
424
425 if (this.clients) {
426 this.clients.add(ws);
427 ws.on('close', () => {
428 this.clients.delete(ws);
429
430 if (this._shouldEmitClose && !this.clients.size) {
431 process.nextTick(emitClose, this);
432 }
433 });
434 }
435
436 cb(ws, req);
437 }
438}
439
440module.exports = WebSocketServer;
441
442/**
443 * Add event listeners on an `EventEmitter` using a map of <event, listener>
444 * pairs.
445 *
446 * @param {EventEmitter} server The event emitter
447 * @param {Object.<String, Function>} map The listeners to add
448 * @return {Function} A function that will remove the added listeners when
449 * called
450 * @private
451 */
452function addListeners(server, map) {
453 for (const event of Object.keys(map)) server.on(event, map[event]);
454
455 return function removeListeners() {
456 for (const event of Object.keys(map)) {
457 server.removeListener(event, map[event]);
458 }
459 };
460}
461
462/**
463 * Emit a `'close'` event on an `EventEmitter`.
464 *
465 * @param {EventEmitter} server The event emitter
466 * @private
467 */
468function emitClose(server) {
469 server._state = CLOSED;
470 server.emit('close');
471}
472
473/**
474 * Handle socket errors.
475 *
476 * @private
477 */
478function socketOnError() {
479 this.destroy();
480}
481
482/**
483 * Close the connection when preconditions are not fulfilled.
484 *
485 * @param {Duplex} socket The socket of the upgrade request
486 * @param {Number} code The HTTP response status code
487 * @param {String} [message] The HTTP response body
488 * @param {Object} [headers] Additional HTTP response headers
489 * @private
490 */
491function abortHandshake(socket, code, message, headers) {
492 //
493 // The socket is writable unless the user destroyed or ended it before calling
494 // `server.handleUpgrade()` or in the `verifyClient` function, which is a user
495 // error. Handling this does not make much sense as the worst that can happen
496 // is that some of the data written by the user might be discarded due to the
497 // call to `socket.end()` below, which triggers an `'error'` event that in
498 // turn causes the socket to be destroyed.
499 //
500 message = message || http.STATUS_CODES[code];
501 headers = {
502 Connection: 'close',
503 'Content-Type': 'text/html',
504 'Content-Length': Buffer.byteLength(message),
505 ...headers
506 };
507
508 socket.once('finish', socket.destroy);
509
510 socket.end(
511 `HTTP/1.1 ${code} ${http.STATUS_CODES[code]}\r\n` +
512 Object.keys(headers)
513 .map((h) => `${h}: ${headers[h]}`)
514 .join('\r\n') +
515 '\r\n\r\n' +
516 message
517 );
518}
519
520/**
521 * Emit a `'wsClientError'` event on a `WebSocketServer` if there is at least
522 * one listener for it, otherwise call `abortHandshake()`.
523 *
524 * @param {WebSocketServer} server The WebSocket server
525 * @param {http.IncomingMessage} req The request object
526 * @param {Duplex} socket The socket of the upgrade request
527 * @param {Number} code The HTTP response status code
528 * @param {String} message The HTTP response body
529 * @private
530 */
531function abortHandshakeOrEmitwsClientError(server, req, socket, code, message) {
532 if (server.listenerCount('wsClientError')) {
533 const err = new Error(message);
534 Error.captureStackTrace(err, abortHandshakeOrEmitwsClientError);
535
536 server.emit('wsClientError', err, socket, req);
537 } else {
538 abortHandshake(socket, code, message);
539 }
540}