UNPKG

16.4 kBJavaScriptView Raw
1'use strict';
2
3const { Writable } = require('stream');
4
5const PerMessageDeflate = require('./permessage-deflate');
6const {
7 BINARY_TYPES,
8 EMPTY_BUFFER,
9 kStatusCode,
10 kWebSocket
11} = require('./constants');
12const { concat, toArrayBuffer, unmask } = require('./buffer-util');
13const { isValidStatusCode, isValidUTF8 } = require('./validation');
14
15const FastBuffer = Buffer[Symbol.species];
16
17const GET_INFO = 0;
18const GET_PAYLOAD_LENGTH_16 = 1;
19const GET_PAYLOAD_LENGTH_64 = 2;
20const GET_MASK = 3;
21const GET_DATA = 4;
22const INFLATING = 5;
23const DEFER_EVENT = 6;
24
25/**
26 * HyBi Receiver implementation.
27 *
28 * @extends Writable
29 */
30class Receiver extends Writable {
31 /**
32 * Creates a Receiver instance.
33 *
34 * @param {Object} [options] Options object
35 * @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether
36 * any of the `'message'`, `'ping'`, and `'pong'` events can be emitted
37 * multiple times in the same tick
38 * @param {String} [options.binaryType=nodebuffer] The type for binary data
39 * @param {Object} [options.extensions] An object containing the negotiated
40 * extensions
41 * @param {Boolean} [options.isServer=false] Specifies whether to operate in
42 * client or server mode
43 * @param {Number} [options.maxPayload=0] The maximum allowed message length
44 * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
45 * not to skip UTF-8 validation for text and close messages
46 */
47 constructor(options = {}) {
48 super();
49
50 this._allowSynchronousEvents =
51 options.allowSynchronousEvents !== undefined
52 ? options.allowSynchronousEvents
53 : true;
54 this._binaryType = options.binaryType || BINARY_TYPES[0];
55 this._extensions = options.extensions || {};
56 this._isServer = !!options.isServer;
57 this._maxPayload = options.maxPayload | 0;
58 this._skipUTF8Validation = !!options.skipUTF8Validation;
59 this[kWebSocket] = undefined;
60
61 this._bufferedBytes = 0;
62 this._buffers = [];
63
64 this._compressed = false;
65 this._payloadLength = 0;
66 this._mask = undefined;
67 this._fragmented = 0;
68 this._masked = false;
69 this._fin = false;
70 this._opcode = 0;
71
72 this._totalPayloadLength = 0;
73 this._messageLength = 0;
74 this._fragments = [];
75
76 this._errored = false;
77 this._loop = false;
78 this._state = GET_INFO;
79 }
80
81 /**
82 * Implements `Writable.prototype._write()`.
83 *
84 * @param {Buffer} chunk The chunk of data to write
85 * @param {String} encoding The character encoding of `chunk`
86 * @param {Function} cb Callback
87 * @private
88 */
89 _write(chunk, encoding, cb) {
90 if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
91
92 this._bufferedBytes += chunk.length;
93 this._buffers.push(chunk);
94 this.startLoop(cb);
95 }
96
97 /**
98 * Consumes `n` bytes from the buffered data.
99 *
100 * @param {Number} n The number of bytes to consume
101 * @return {Buffer} The consumed bytes
102 * @private
103 */
104 consume(n) {
105 this._bufferedBytes -= n;
106
107 if (n === this._buffers[0].length) return this._buffers.shift();
108
109 if (n < this._buffers[0].length) {
110 const buf = this._buffers[0];
111 this._buffers[0] = new FastBuffer(
112 buf.buffer,
113 buf.byteOffset + n,
114 buf.length - n
115 );
116
117 return new FastBuffer(buf.buffer, buf.byteOffset, n);
118 }
119
120 const dst = Buffer.allocUnsafe(n);
121
122 do {
123 const buf = this._buffers[0];
124 const offset = dst.length - n;
125
126 if (n >= buf.length) {
127 dst.set(this._buffers.shift(), offset);
128 } else {
129 dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
130 this._buffers[0] = new FastBuffer(
131 buf.buffer,
132 buf.byteOffset + n,
133 buf.length - n
134 );
135 }
136
137 n -= buf.length;
138 } while (n > 0);
139
140 return dst;
141 }
142
143 /**
144 * Starts the parsing loop.
145 *
146 * @param {Function} cb Callback
147 * @private
148 */
149 startLoop(cb) {
150 this._loop = true;
151
152 do {
153 switch (this._state) {
154 case GET_INFO:
155 this.getInfo(cb);
156 break;
157 case GET_PAYLOAD_LENGTH_16:
158 this.getPayloadLength16(cb);
159 break;
160 case GET_PAYLOAD_LENGTH_64:
161 this.getPayloadLength64(cb);
162 break;
163 case GET_MASK:
164 this.getMask();
165 break;
166 case GET_DATA:
167 this.getData(cb);
168 break;
169 case INFLATING:
170 case DEFER_EVENT:
171 this._loop = false;
172 return;
173 }
174 } while (this._loop);
175
176 if (!this._errored) cb();
177 }
178
179 /**
180 * Reads the first two bytes of a frame.
181 *
182 * @param {Function} cb Callback
183 * @private
184 */
185 getInfo(cb) {
186 if (this._bufferedBytes < 2) {
187 this._loop = false;
188 return;
189 }
190
191 const buf = this.consume(2);
192
193 if ((buf[0] & 0x30) !== 0x00) {
194 const error = this.createError(
195 RangeError,
196 'RSV2 and RSV3 must be clear',
197 true,
198 1002,
199 'WS_ERR_UNEXPECTED_RSV_2_3'
200 );
201
202 cb(error);
203 return;
204 }
205
206 const compressed = (buf[0] & 0x40) === 0x40;
207
208 if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
209 const error = this.createError(
210 RangeError,
211 'RSV1 must be clear',
212 true,
213 1002,
214 'WS_ERR_UNEXPECTED_RSV_1'
215 );
216
217 cb(error);
218 return;
219 }
220
221 this._fin = (buf[0] & 0x80) === 0x80;
222 this._opcode = buf[0] & 0x0f;
223 this._payloadLength = buf[1] & 0x7f;
224
225 if (this._opcode === 0x00) {
226 if (compressed) {
227 const error = this.createError(
228 RangeError,
229 'RSV1 must be clear',
230 true,
231 1002,
232 'WS_ERR_UNEXPECTED_RSV_1'
233 );
234
235 cb(error);
236 return;
237 }
238
239 if (!this._fragmented) {
240 const error = this.createError(
241 RangeError,
242 'invalid opcode 0',
243 true,
244 1002,
245 'WS_ERR_INVALID_OPCODE'
246 );
247
248 cb(error);
249 return;
250 }
251
252 this._opcode = this._fragmented;
253 } else if (this._opcode === 0x01 || this._opcode === 0x02) {
254 if (this._fragmented) {
255 const error = this.createError(
256 RangeError,
257 `invalid opcode ${this._opcode}`,
258 true,
259 1002,
260 'WS_ERR_INVALID_OPCODE'
261 );
262
263 cb(error);
264 return;
265 }
266
267 this._compressed = compressed;
268 } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
269 if (!this._fin) {
270 const error = this.createError(
271 RangeError,
272 'FIN must be set',
273 true,
274 1002,
275 'WS_ERR_EXPECTED_FIN'
276 );
277
278 cb(error);
279 return;
280 }
281
282 if (compressed) {
283 const error = this.createError(
284 RangeError,
285 'RSV1 must be clear',
286 true,
287 1002,
288 'WS_ERR_UNEXPECTED_RSV_1'
289 );
290
291 cb(error);
292 return;
293 }
294
295 if (
296 this._payloadLength > 0x7d ||
297 (this._opcode === 0x08 && this._payloadLength === 1)
298 ) {
299 const error = this.createError(
300 RangeError,
301 `invalid payload length ${this._payloadLength}`,
302 true,
303 1002,
304 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
305 );
306
307 cb(error);
308 return;
309 }
310 } else {
311 const error = this.createError(
312 RangeError,
313 `invalid opcode ${this._opcode}`,
314 true,
315 1002,
316 'WS_ERR_INVALID_OPCODE'
317 );
318
319 cb(error);
320 return;
321 }
322
323 if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
324 this._masked = (buf[1] & 0x80) === 0x80;
325
326 if (this._isServer) {
327 if (!this._masked) {
328 const error = this.createError(
329 RangeError,
330 'MASK must be set',
331 true,
332 1002,
333 'WS_ERR_EXPECTED_MASK'
334 );
335
336 cb(error);
337 return;
338 }
339 } else if (this._masked) {
340 const error = this.createError(
341 RangeError,
342 'MASK must be clear',
343 true,
344 1002,
345 'WS_ERR_UNEXPECTED_MASK'
346 );
347
348 cb(error);
349 return;
350 }
351
352 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
353 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
354 else this.haveLength(cb);
355 }
356
357 /**
358 * Gets extended payload length (7+16).
359 *
360 * @param {Function} cb Callback
361 * @private
362 */
363 getPayloadLength16(cb) {
364 if (this._bufferedBytes < 2) {
365 this._loop = false;
366 return;
367 }
368
369 this._payloadLength = this.consume(2).readUInt16BE(0);
370 this.haveLength(cb);
371 }
372
373 /**
374 * Gets extended payload length (7+64).
375 *
376 * @param {Function} cb Callback
377 * @private
378 */
379 getPayloadLength64(cb) {
380 if (this._bufferedBytes < 8) {
381 this._loop = false;
382 return;
383 }
384
385 const buf = this.consume(8);
386 const num = buf.readUInt32BE(0);
387
388 //
389 // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
390 // if payload length is greater than this number.
391 //
392 if (num > Math.pow(2, 53 - 32) - 1) {
393 const error = this.createError(
394 RangeError,
395 'Unsupported WebSocket frame: payload length > 2^53 - 1',
396 false,
397 1009,
398 'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH'
399 );
400
401 cb(error);
402 return;
403 }
404
405 this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
406 this.haveLength(cb);
407 }
408
409 /**
410 * Payload length has been read.
411 *
412 * @param {Function} cb Callback
413 * @private
414 */
415 haveLength(cb) {
416 if (this._payloadLength && this._opcode < 0x08) {
417 this._totalPayloadLength += this._payloadLength;
418 if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
419 const error = this.createError(
420 RangeError,
421 'Max payload size exceeded',
422 false,
423 1009,
424 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
425 );
426
427 cb(error);
428 return;
429 }
430 }
431
432 if (this._masked) this._state = GET_MASK;
433 else this._state = GET_DATA;
434 }
435
436 /**
437 * Reads mask bytes.
438 *
439 * @private
440 */
441 getMask() {
442 if (this._bufferedBytes < 4) {
443 this._loop = false;
444 return;
445 }
446
447 this._mask = this.consume(4);
448 this._state = GET_DATA;
449 }
450
451 /**
452 * Reads data bytes.
453 *
454 * @param {Function} cb Callback
455 * @private
456 */
457 getData(cb) {
458 let data = EMPTY_BUFFER;
459
460 if (this._payloadLength) {
461 if (this._bufferedBytes < this._payloadLength) {
462 this._loop = false;
463 return;
464 }
465
466 data = this.consume(this._payloadLength);
467
468 if (
469 this._masked &&
470 (this._mask[0] | this._mask[1] | this._mask[2] | this._mask[3]) !== 0
471 ) {
472 unmask(data, this._mask);
473 }
474 }
475
476 if (this._opcode > 0x07) {
477 this.controlMessage(data, cb);
478 return;
479 }
480
481 if (this._compressed) {
482 this._state = INFLATING;
483 this.decompress(data, cb);
484 return;
485 }
486
487 if (data.length) {
488 //
489 // This message is not compressed so its length is the sum of the payload
490 // length of all fragments.
491 //
492 this._messageLength = this._totalPayloadLength;
493 this._fragments.push(data);
494 }
495
496 this.dataMessage(cb);
497 }
498
499 /**
500 * Decompresses data.
501 *
502 * @param {Buffer} data Compressed data
503 * @param {Function} cb Callback
504 * @private
505 */
506 decompress(data, cb) {
507 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
508
509 perMessageDeflate.decompress(data, this._fin, (err, buf) => {
510 if (err) return cb(err);
511
512 if (buf.length) {
513 this._messageLength += buf.length;
514 if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
515 const error = this.createError(
516 RangeError,
517 'Max payload size exceeded',
518 false,
519 1009,
520 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
521 );
522
523 cb(error);
524 return;
525 }
526
527 this._fragments.push(buf);
528 }
529
530 this.dataMessage(cb);
531 if (this._state === GET_INFO) this.startLoop(cb);
532 });
533 }
534
535 /**
536 * Handles a data message.
537 *
538 * @param {Function} cb Callback
539 * @private
540 */
541 dataMessage(cb) {
542 if (!this._fin) {
543 this._state = GET_INFO;
544 return;
545 }
546
547 const messageLength = this._messageLength;
548 const fragments = this._fragments;
549
550 this._totalPayloadLength = 0;
551 this._messageLength = 0;
552 this._fragmented = 0;
553 this._fragments = [];
554
555 if (this._opcode === 2) {
556 let data;
557
558 if (this._binaryType === 'nodebuffer') {
559 data = concat(fragments, messageLength);
560 } else if (this._binaryType === 'arraybuffer') {
561 data = toArrayBuffer(concat(fragments, messageLength));
562 } else {
563 data = fragments;
564 }
565
566 if (this._allowSynchronousEvents) {
567 this.emit('message', data, true);
568 this._state = GET_INFO;
569 } else {
570 this._state = DEFER_EVENT;
571 setImmediate(() => {
572 this.emit('message', data, true);
573 this._state = GET_INFO;
574 this.startLoop(cb);
575 });
576 }
577 } else {
578 const buf = concat(fragments, messageLength);
579
580 if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
581 const error = this.createError(
582 Error,
583 'invalid UTF-8 sequence',
584 true,
585 1007,
586 'WS_ERR_INVALID_UTF8'
587 );
588
589 cb(error);
590 return;
591 }
592
593 if (this._state === INFLATING || this._allowSynchronousEvents) {
594 this.emit('message', buf, false);
595 this._state = GET_INFO;
596 } else {
597 this._state = DEFER_EVENT;
598 setImmediate(() => {
599 this.emit('message', buf, false);
600 this._state = GET_INFO;
601 this.startLoop(cb);
602 });
603 }
604 }
605 }
606
607 /**
608 * Handles a control message.
609 *
610 * @param {Buffer} data Data to handle
611 * @return {(Error|RangeError|undefined)} A possible error
612 * @private
613 */
614 controlMessage(data, cb) {
615 if (this._opcode === 0x08) {
616 if (data.length === 0) {
617 this._loop = false;
618 this.emit('conclude', 1005, EMPTY_BUFFER);
619 this.end();
620 } else {
621 const code = data.readUInt16BE(0);
622
623 if (!isValidStatusCode(code)) {
624 const error = this.createError(
625 RangeError,
626 `invalid status code ${code}`,
627 true,
628 1002,
629 'WS_ERR_INVALID_CLOSE_CODE'
630 );
631
632 cb(error);
633 return;
634 }
635
636 const buf = new FastBuffer(
637 data.buffer,
638 data.byteOffset + 2,
639 data.length - 2
640 );
641
642 if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
643 const error = this.createError(
644 Error,
645 'invalid UTF-8 sequence',
646 true,
647 1007,
648 'WS_ERR_INVALID_UTF8'
649 );
650
651 cb(error);
652 return;
653 }
654
655 this._loop = false;
656 this.emit('conclude', code, buf);
657 this.end();
658 }
659
660 this._state = GET_INFO;
661 return;
662 }
663
664 if (this._allowSynchronousEvents) {
665 this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
666 this._state = GET_INFO;
667 } else {
668 this._state = DEFER_EVENT;
669 setImmediate(() => {
670 this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
671 this._state = GET_INFO;
672 this.startLoop(cb);
673 });
674 }
675 }
676
677 /**
678 * Builds an error object.
679 *
680 * @param {function(new:Error|RangeError)} ErrorCtor The error constructor
681 * @param {String} message The error message
682 * @param {Boolean} prefix Specifies whether or not to add a default prefix to
683 * `message`
684 * @param {Number} statusCode The status code
685 * @param {String} errorCode The exposed error code
686 * @return {(Error|RangeError)} The error
687 * @private
688 */
689 createError(ErrorCtor, message, prefix, statusCode, errorCode) {
690 this._loop = false;
691 this._errored = true;
692
693 const err = new ErrorCtor(
694 prefix ? `Invalid WebSocket frame: ${message}` : message
695 );
696
697 Error.captureStackTrace(err, this.createError);
698 err.code = errorCode;
699 err[kStatusCode] = statusCode;
700 return err;
701 }
702}
703
704module.exports = Receiver;