UNPKG

14.5 kBJavaScriptView Raw
1'use strict';
2
3const { Writable } = require('stream');
4
5const PerMessageDeflate = require('./permessage-deflate');
6const {
7 BINARY_TYPES,
8 EMPTY_BUFFER,
9 kStatusCode,
10 kWebSocket
11} = require('./constants');
12const { concat, toArrayBuffer, unmask } = require('./buffer-util');
13const { isValidStatusCode, isValidUTF8 } = require('./validation');
14
15const GET_INFO = 0;
16const GET_PAYLOAD_LENGTH_16 = 1;
17const GET_PAYLOAD_LENGTH_64 = 2;
18const GET_MASK = 3;
19const GET_DATA = 4;
20const INFLATING = 5;
21
22/**
23 * HyBi Receiver implementation.
24 *
25 * @extends Writable
26 */
27class Receiver extends Writable {
28 /**
29 * Creates a Receiver instance.
30 *
31 * @param {Object} [options] Options object
32 * @param {String} [options.binaryType=nodebuffer] The type for binary data
33 * @param {Object} [options.extensions] An object containing the negotiated
34 * extensions
35 * @param {Boolean} [options.isServer=false] Specifies whether to operate in
36 * client or server mode
37 * @param {Number} [options.maxPayload=0] The maximum allowed message length
38 * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
39 * not to skip UTF-8 validation for text and close messages
40 */
41 constructor(options = {}) {
42 super();
43
44 this._binaryType = options.binaryType || BINARY_TYPES[0];
45 this._extensions = options.extensions || {};
46 this._isServer = !!options.isServer;
47 this._maxPayload = options.maxPayload | 0;
48 this._skipUTF8Validation = !!options.skipUTF8Validation;
49 this[kWebSocket] = undefined;
50
51 this._bufferedBytes = 0;
52 this._buffers = [];
53
54 this._compressed = false;
55 this._payloadLength = 0;
56 this._mask = undefined;
57 this._fragmented = 0;
58 this._masked = false;
59 this._fin = false;
60 this._opcode = 0;
61
62 this._totalPayloadLength = 0;
63 this._messageLength = 0;
64 this._fragments = [];
65
66 this._state = GET_INFO;
67 this._loop = false;
68 }
69
70 /**
71 * Implements `Writable.prototype._write()`.
72 *
73 * @param {Buffer} chunk The chunk of data to write
74 * @param {String} encoding The character encoding of `chunk`
75 * @param {Function} cb Callback
76 * @private
77 */
78 _write(chunk, encoding, cb) {
79 if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
80
81 this._bufferedBytes += chunk.length;
82 this._buffers.push(chunk);
83 this.startLoop(cb);
84 }
85
86 /**
87 * Consumes `n` bytes from the buffered data.
88 *
89 * @param {Number} n The number of bytes to consume
90 * @return {Buffer} The consumed bytes
91 * @private
92 */
93 consume(n) {
94 this._bufferedBytes -= n;
95
96 if (n === this._buffers[0].length) return this._buffers.shift();
97
98 if (n < this._buffers[0].length) {
99 const buf = this._buffers[0];
100 this._buffers[0] = buf.slice(n);
101 return buf.slice(0, n);
102 }
103
104 const dst = Buffer.allocUnsafe(n);
105
106 do {
107 const buf = this._buffers[0];
108 const offset = dst.length - n;
109
110 if (n >= buf.length) {
111 dst.set(this._buffers.shift(), offset);
112 } else {
113 dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
114 this._buffers[0] = buf.slice(n);
115 }
116
117 n -= buf.length;
118 } while (n > 0);
119
120 return dst;
121 }
122
123 /**
124 * Starts the parsing loop.
125 *
126 * @param {Function} cb Callback
127 * @private
128 */
129 startLoop(cb) {
130 let err;
131 this._loop = true;
132
133 do {
134 switch (this._state) {
135 case GET_INFO:
136 err = this.getInfo();
137 break;
138 case GET_PAYLOAD_LENGTH_16:
139 err = this.getPayloadLength16();
140 break;
141 case GET_PAYLOAD_LENGTH_64:
142 err = this.getPayloadLength64();
143 break;
144 case GET_MASK:
145 this.getMask();
146 break;
147 case GET_DATA:
148 err = this.getData(cb);
149 break;
150 default:
151 // `INFLATING`
152 this._loop = false;
153 return;
154 }
155 } while (this._loop);
156
157 cb(err);
158 }
159
160 /**
161 * Reads the first two bytes of a frame.
162 *
163 * @return {(RangeError|undefined)} A possible error
164 * @private
165 */
166 getInfo() {
167 if (this._bufferedBytes < 2) {
168 this._loop = false;
169 return;
170 }
171
172 const buf = this.consume(2);
173
174 if ((buf[0] & 0x30) !== 0x00) {
175 this._loop = false;
176 return error(
177 RangeError,
178 'RSV2 and RSV3 must be clear',
179 true,
180 1002,
181 'WS_ERR_UNEXPECTED_RSV_2_3'
182 );
183 }
184
185 const compressed = (buf[0] & 0x40) === 0x40;
186
187 if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
188 this._loop = false;
189 return error(
190 RangeError,
191 'RSV1 must be clear',
192 true,
193 1002,
194 'WS_ERR_UNEXPECTED_RSV_1'
195 );
196 }
197
198 this._fin = (buf[0] & 0x80) === 0x80;
199 this._opcode = buf[0] & 0x0f;
200 this._payloadLength = buf[1] & 0x7f;
201
202 if (this._opcode === 0x00) {
203 if (compressed) {
204 this._loop = false;
205 return error(
206 RangeError,
207 'RSV1 must be clear',
208 true,
209 1002,
210 'WS_ERR_UNEXPECTED_RSV_1'
211 );
212 }
213
214 if (!this._fragmented) {
215 this._loop = false;
216 return error(
217 RangeError,
218 'invalid opcode 0',
219 true,
220 1002,
221 'WS_ERR_INVALID_OPCODE'
222 );
223 }
224
225 this._opcode = this._fragmented;
226 } else if (this._opcode === 0x01 || this._opcode === 0x02) {
227 if (this._fragmented) {
228 this._loop = false;
229 return error(
230 RangeError,
231 `invalid opcode ${this._opcode}`,
232 true,
233 1002,
234 'WS_ERR_INVALID_OPCODE'
235 );
236 }
237
238 this._compressed = compressed;
239 } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
240 if (!this._fin) {
241 this._loop = false;
242 return error(
243 RangeError,
244 'FIN must be set',
245 true,
246 1002,
247 'WS_ERR_EXPECTED_FIN'
248 );
249 }
250
251 if (compressed) {
252 this._loop = false;
253 return error(
254 RangeError,
255 'RSV1 must be clear',
256 true,
257 1002,
258 'WS_ERR_UNEXPECTED_RSV_1'
259 );
260 }
261
262 if (this._payloadLength > 0x7d) {
263 this._loop = false;
264 return error(
265 RangeError,
266 `invalid payload length ${this._payloadLength}`,
267 true,
268 1002,
269 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
270 );
271 }
272 } else {
273 this._loop = false;
274 return error(
275 RangeError,
276 `invalid opcode ${this._opcode}`,
277 true,
278 1002,
279 'WS_ERR_INVALID_OPCODE'
280 );
281 }
282
283 if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
284 this._masked = (buf[1] & 0x80) === 0x80;
285
286 if (this._isServer) {
287 if (!this._masked) {
288 this._loop = false;
289 return error(
290 RangeError,
291 'MASK must be set',
292 true,
293 1002,
294 'WS_ERR_EXPECTED_MASK'
295 );
296 }
297 } else if (this._masked) {
298 this._loop = false;
299 return error(
300 RangeError,
301 'MASK must be clear',
302 true,
303 1002,
304 'WS_ERR_UNEXPECTED_MASK'
305 );
306 }
307
308 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
309 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
310 else return this.haveLength();
311 }
312
313 /**
314 * Gets extended payload length (7+16).
315 *
316 * @return {(RangeError|undefined)} A possible error
317 * @private
318 */
319 getPayloadLength16() {
320 if (this._bufferedBytes < 2) {
321 this._loop = false;
322 return;
323 }
324
325 this._payloadLength = this.consume(2).readUInt16BE(0);
326 return this.haveLength();
327 }
328
329 /**
330 * Gets extended payload length (7+64).
331 *
332 * @return {(RangeError|undefined)} A possible error
333 * @private
334 */
335 getPayloadLength64() {
336 if (this._bufferedBytes < 8) {
337 this._loop = false;
338 return;
339 }
340
341 const buf = this.consume(8);
342 const num = buf.readUInt32BE(0);
343
344 //
345 // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
346 // if payload length is greater than this number.
347 //
348 if (num > Math.pow(2, 53 - 32) - 1) {
349 this._loop = false;
350 return error(
351 RangeError,
352 'Unsupported WebSocket frame: payload length > 2^53 - 1',
353 false,
354 1009,
355 'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH'
356 );
357 }
358
359 this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
360 return this.haveLength();
361 }
362
363 /**
364 * Payload length has been read.
365 *
366 * @return {(RangeError|undefined)} A possible error
367 * @private
368 */
369 haveLength() {
370 if (this._payloadLength && this._opcode < 0x08) {
371 this._totalPayloadLength += this._payloadLength;
372 if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
373 this._loop = false;
374 return error(
375 RangeError,
376 'Max payload size exceeded',
377 false,
378 1009,
379 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
380 );
381 }
382 }
383
384 if (this._masked) this._state = GET_MASK;
385 else this._state = GET_DATA;
386 }
387
388 /**
389 * Reads mask bytes.
390 *
391 * @private
392 */
393 getMask() {
394 if (this._bufferedBytes < 4) {
395 this._loop = false;
396 return;
397 }
398
399 this._mask = this.consume(4);
400 this._state = GET_DATA;
401 }
402
403 /**
404 * Reads data bytes.
405 *
406 * @param {Function} cb Callback
407 * @return {(Error|RangeError|undefined)} A possible error
408 * @private
409 */
410 getData(cb) {
411 let data = EMPTY_BUFFER;
412
413 if (this._payloadLength) {
414 if (this._bufferedBytes < this._payloadLength) {
415 this._loop = false;
416 return;
417 }
418
419 data = this.consume(this._payloadLength);
420
421 if (
422 this._masked &&
423 (this._mask[0] | this._mask[1] | this._mask[2] | this._mask[3]) !== 0
424 ) {
425 unmask(data, this._mask);
426 }
427 }
428
429 if (this._opcode > 0x07) return this.controlMessage(data);
430
431 if (this._compressed) {
432 this._state = INFLATING;
433 this.decompress(data, cb);
434 return;
435 }
436
437 if (data.length) {
438 //
439 // This message is not compressed so its length is the sum of the payload
440 // length of all fragments.
441 //
442 this._messageLength = this._totalPayloadLength;
443 this._fragments.push(data);
444 }
445
446 return this.dataMessage();
447 }
448
449 /**
450 * Decompresses data.
451 *
452 * @param {Buffer} data Compressed data
453 * @param {Function} cb Callback
454 * @private
455 */
456 decompress(data, cb) {
457 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
458
459 perMessageDeflate.decompress(data, this._fin, (err, buf) => {
460 if (err) return cb(err);
461
462 if (buf.length) {
463 this._messageLength += buf.length;
464 if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
465 return cb(
466 error(
467 RangeError,
468 'Max payload size exceeded',
469 false,
470 1009,
471 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
472 )
473 );
474 }
475
476 this._fragments.push(buf);
477 }
478
479 const er = this.dataMessage();
480 if (er) return cb(er);
481
482 this.startLoop(cb);
483 });
484 }
485
486 /**
487 * Handles a data message.
488 *
489 * @return {(Error|undefined)} A possible error
490 * @private
491 */
492 dataMessage() {
493 if (this._fin) {
494 const messageLength = this._messageLength;
495 const fragments = this._fragments;
496
497 this._totalPayloadLength = 0;
498 this._messageLength = 0;
499 this._fragmented = 0;
500 this._fragments = [];
501
502 if (this._opcode === 2) {
503 let data;
504
505 if (this._binaryType === 'nodebuffer') {
506 data = concat(fragments, messageLength);
507 } else if (this._binaryType === 'arraybuffer') {
508 data = toArrayBuffer(concat(fragments, messageLength));
509 } else {
510 data = fragments;
511 }
512
513 this.emit('message', data, true);
514 } else {
515 const buf = concat(fragments, messageLength);
516
517 if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
518 this._loop = false;
519 return error(
520 Error,
521 'invalid UTF-8 sequence',
522 true,
523 1007,
524 'WS_ERR_INVALID_UTF8'
525 );
526 }
527
528 this.emit('message', buf, false);
529 }
530 }
531
532 this._state = GET_INFO;
533 }
534
535 /**
536 * Handles a control message.
537 *
538 * @param {Buffer} data Data to handle
539 * @return {(Error|RangeError|undefined)} A possible error
540 * @private
541 */
542 controlMessage(data) {
543 if (this._opcode === 0x08) {
544 this._loop = false;
545
546 if (data.length === 0) {
547 this.emit('conclude', 1005, EMPTY_BUFFER);
548 this.end();
549 } else if (data.length === 1) {
550 return error(
551 RangeError,
552 'invalid payload length 1',
553 true,
554 1002,
555 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
556 );
557 } else {
558 const code = data.readUInt16BE(0);
559
560 if (!isValidStatusCode(code)) {
561 return error(
562 RangeError,
563 `invalid status code ${code}`,
564 true,
565 1002,
566 'WS_ERR_INVALID_CLOSE_CODE'
567 );
568 }
569
570 const buf = data.slice(2);
571
572 if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
573 return error(
574 Error,
575 'invalid UTF-8 sequence',
576 true,
577 1007,
578 'WS_ERR_INVALID_UTF8'
579 );
580 }
581
582 this.emit('conclude', code, buf);
583 this.end();
584 }
585 } else if (this._opcode === 0x09) {
586 this.emit('ping', data);
587 } else {
588 this.emit('pong', data);
589 }
590
591 this._state = GET_INFO;
592 }
593}
594
595module.exports = Receiver;
596
597/**
598 * Builds an error object.
599 *
600 * @param {function(new:Error|RangeError)} ErrorCtor The error constructor
601 * @param {String} message The error message
602 * @param {Boolean} prefix Specifies whether or not to add a default prefix to
603 * `message`
604 * @param {Number} statusCode The status code
605 * @param {String} errorCode The exposed error code
606 * @return {(Error|RangeError)} The error
607 * @private
608 */
609function error(ErrorCtor, message, prefix, statusCode, errorCode) {
610 const err = new ErrorCtor(
611 prefix ? `Invalid WebSocket frame: ${message}` : message
612 );
613
614 Error.captureStackTrace(err, error);
615 err.code = errorCode;
616 err[kStatusCode] = statusCode;
617 return err;
618}