UNPKG

12.6 kBJavaScriptView Raw
1'use strict';
2
3const stream = require('stream');
4
5const PerMessageDeflate = require('./permessage-deflate');
6const bufferUtil = require('./buffer-util');
7const validation = require('./validation');
8const constants = require('./constants');
9
10const GET_INFO = 0;
11const GET_PAYLOAD_LENGTH_16 = 1;
12const GET_PAYLOAD_LENGTH_64 = 2;
13const GET_MASK = 3;
14const GET_DATA = 4;
15const INFLATING = 5;
16
17/**
18 * HyBi Receiver implementation.
19 *
20 * @extends stream.Writable
21 */
22class Receiver extends stream.Writable {
23 /**
24 * Creates a Receiver instance.
25 *
26 * @param {String} binaryType The type for binary data
27 * @param {Object} extensions An object containing the negotiated extensions
28 * @param {Number} maxPayload The maximum allowed message length
29 */
30 constructor (binaryType, extensions, maxPayload) {
31 super();
32
33 this._binaryType = binaryType || constants.BINARY_TYPES[0];
34 this[constants.kWebSocket] = undefined;
35 this._extensions = extensions || {};
36 this._maxPayload = maxPayload | 0;
37
38 this._bufferedBytes = 0;
39 this._buffers = [];
40
41 this._compressed = false;
42 this._payloadLength = 0;
43 this._mask = undefined;
44 this._fragmented = 0;
45 this._masked = false;
46 this._fin = false;
47 this._opcode = 0;
48
49 this._totalPayloadLength = 0;
50 this._messageLength = 0;
51 this._fragments = [];
52
53 this._state = GET_INFO;
54 this._loop = false;
55 }
56
57 /**
58 * Implements `Writable.prototype._write()`.
59 *
60 * @param {Buffer} chunk The chunk of data to write
61 * @param {String} encoding The character encoding of `chunk`
62 * @param {Function} cb Callback
63 */
64 _write (chunk, encoding, cb) {
65 if (this._opcode === 0x08) return cb();
66
67 this._bufferedBytes += chunk.length;
68 this._buffers.push(chunk);
69 this.startLoop(cb);
70 }
71
72 /**
73 * Consumes `n` bytes from the buffered data.
74 *
75 * @param {Number} n The number of bytes to consume
76 * @return {Buffer} The consumed bytes
77 * @private
78 */
79 consume (n) {
80 this._bufferedBytes -= n;
81
82 if (n === this._buffers[0].length) return this._buffers.shift();
83
84 if (n < this._buffers[0].length) {
85 const buf = this._buffers[0];
86 this._buffers[0] = buf.slice(n);
87 return buf.slice(0, n);
88 }
89
90 const dst = Buffer.allocUnsafe(n);
91
92 do {
93 const buf = this._buffers[0];
94
95 if (n >= buf.length) {
96 this._buffers.shift().copy(dst, dst.length - n);
97 } else {
98 buf.copy(dst, dst.length - n, 0, n);
99 this._buffers[0] = buf.slice(n);
100 }
101
102 n -= buf.length;
103 } while (n > 0);
104
105 return dst;
106 }
107
108 /**
109 * Starts the parsing loop.
110 *
111 * @param {Function} cb Callback
112 * @private
113 */
114 startLoop (cb) {
115 var err;
116 this._loop = true;
117
118 do {
119 switch (this._state) {
120 case GET_INFO:
121 err = this.getInfo();
122 break;
123 case GET_PAYLOAD_LENGTH_16:
124 err = this.getPayloadLength16();
125 break;
126 case GET_PAYLOAD_LENGTH_64:
127 err = this.getPayloadLength64();
128 break;
129 case GET_MASK:
130 this.getMask();
131 break;
132 case GET_DATA:
133 err = this.getData(cb);
134 break;
135 default: // `INFLATING`
136 this._loop = false;
137 return;
138 }
139 } while (this._loop);
140
141 cb(err);
142 }
143
144 /**
145 * Reads the first two bytes of a frame.
146 *
147 * @return {(RangeError|undefined)} A possible error
148 * @private
149 */
150 getInfo () {
151 if (this._bufferedBytes < 2) {
152 this._loop = false;
153 return;
154 }
155
156 const buf = this.consume(2);
157
158 if ((buf[0] & 0x30) !== 0x00) {
159 this._loop = false;
160 return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002);
161 }
162
163 const compressed = (buf[0] & 0x40) === 0x40;
164
165 if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
166 this._loop = false;
167 return error(RangeError, 'RSV1 must be clear', true, 1002);
168 }
169
170 this._fin = (buf[0] & 0x80) === 0x80;
171 this._opcode = buf[0] & 0x0f;
172 this._payloadLength = buf[1] & 0x7f;
173
174 if (this._opcode === 0x00) {
175 if (compressed) {
176 this._loop = false;
177 return error(RangeError, 'RSV1 must be clear', true, 1002);
178 }
179
180 if (!this._fragmented) {
181 this._loop = false;
182 return error(RangeError, 'invalid opcode 0', true, 1002);
183 }
184
185 this._opcode = this._fragmented;
186 } else if (this._opcode === 0x01 || this._opcode === 0x02) {
187 if (this._fragmented) {
188 this._loop = false;
189 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
190 }
191
192 this._compressed = compressed;
193 } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
194 if (!this._fin) {
195 this._loop = false;
196 return error(RangeError, 'FIN must be set', true, 1002);
197 }
198
199 if (compressed) {
200 this._loop = false;
201 return error(RangeError, 'RSV1 must be clear', true, 1002);
202 }
203
204 if (this._payloadLength > 0x7d) {
205 this._loop = false;
206 return error(
207 RangeError,
208 `invalid payload length ${this._payloadLength}`,
209 true,
210 1002
211 );
212 }
213 } else {
214 this._loop = false;
215 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
216 }
217
218 if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
219 this._masked = (buf[1] & 0x80) === 0x80;
220
221 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
222 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
223 else return this.haveLength();
224 }
225
226 /**
227 * Gets extended payload length (7+16).
228 *
229 * @return {(RangeError|undefined)} A possible error
230 * @private
231 */
232 getPayloadLength16 () {
233 if (this._bufferedBytes < 2) {
234 this._loop = false;
235 return;
236 }
237
238 this._payloadLength = this.consume(2).readUInt16BE(0);
239 return this.haveLength();
240 }
241
242 /**
243 * Gets extended payload length (7+64).
244 *
245 * @return {(RangeError|undefined)} A possible error
246 * @private
247 */
248 getPayloadLength64 () {
249 if (this._bufferedBytes < 8) {
250 this._loop = false;
251 return;
252 }
253
254 const buf = this.consume(8);
255 const num = buf.readUInt32BE(0);
256
257 //
258 // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
259 // if payload length is greater than this number.
260 //
261 if (num > Math.pow(2, 53 - 32) - 1) {
262 this._loop = false;
263 return error(
264 RangeError,
265 'Unsupported WebSocket frame: payload length > 2^53 - 1',
266 false,
267 1009
268 );
269 }
270
271 this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
272 return this.haveLength();
273 }
274
275 /**
276 * Payload length has been read.
277 *
278 * @return {(RangeError|undefined)} A possible error
279 * @private
280 */
281 haveLength () {
282 if (this._payloadLength && this._opcode < 0x08) {
283 this._totalPayloadLength += this._payloadLength;
284 if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
285 this._loop = false;
286 return error(RangeError, 'Max payload size exceeded', false, 1009);
287 }
288 }
289
290 if (this._masked) this._state = GET_MASK;
291 else this._state = GET_DATA;
292 }
293
294 /**
295 * Reads mask bytes.
296 *
297 * @private
298 */
299 getMask () {
300 if (this._bufferedBytes < 4) {
301 this._loop = false;
302 return;
303 }
304
305 this._mask = this.consume(4);
306 this._state = GET_DATA;
307 }
308
309 /**
310 * Reads data bytes.
311 *
312 * @param {Function} cb Callback
313 * @return {(Error|RangeError|undefined)} A possible error
314 * @private
315 */
316 getData (cb) {
317 var data = constants.EMPTY_BUFFER;
318
319 if (this._payloadLength) {
320 if (this._bufferedBytes < this._payloadLength) {
321 this._loop = false;
322 return;
323 }
324
325 data = this.consume(this._payloadLength);
326 if (this._masked) bufferUtil.unmask(data, this._mask);
327 }
328
329 if (this._opcode > 0x07) return this.controlMessage(data);
330
331 if (this._compressed) {
332 this._state = INFLATING;
333 this.decompress(data, cb);
334 return;
335 }
336
337 if (data.length) {
338 //
339 // This message is not compressed so its lenght is the sum of the payload
340 // length of all fragments.
341 //
342 this._messageLength = this._totalPayloadLength;
343 this._fragments.push(data);
344 }
345
346 return this.dataMessage();
347 }
348
349 /**
350 * Decompresses data.
351 *
352 * @param {Buffer} data Compressed data
353 * @param {Function} cb Callback
354 * @private
355 */
356 decompress (data, cb) {
357 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
358
359 perMessageDeflate.decompress(data, this._fin, (err, buf) => {
360 if (err) return cb(err);
361
362 if (buf.length) {
363 this._messageLength += buf.length;
364 if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
365 return cb(error(RangeError, 'Max payload size exceeded', false, 1009));
366 }
367
368 this._fragments.push(buf);
369 }
370
371 const er = this.dataMessage();
372 if (er) return cb(er);
373
374 this.startLoop(cb);
375 });
376 }
377
378 /**
379 * Handles a data message.
380 *
381 * @return {(Error|undefined)} A possible error
382 * @private
383 */
384 dataMessage () {
385 if (this._fin) {
386 const messageLength = this._messageLength;
387 const fragments = this._fragments;
388
389 this._totalPayloadLength = 0;
390 this._messageLength = 0;
391 this._fragmented = 0;
392 this._fragments = [];
393
394 if (this._opcode === 2) {
395 var data;
396
397 if (this._binaryType === 'nodebuffer') {
398 data = toBuffer(fragments, messageLength);
399 } else if (this._binaryType === 'arraybuffer') {
400 data = toArrayBuffer(toBuffer(fragments, messageLength));
401 } else {
402 data = fragments;
403 }
404
405 this.emit('message', data);
406 } else {
407 const buf = toBuffer(fragments, messageLength);
408
409 if (!validation.isValidUTF8(buf)) {
410 this._loop = false;
411 return error(Error, 'invalid UTF-8 sequence', true, 1007);
412 }
413
414 this.emit('message', buf.toString());
415 }
416 }
417
418 this._state = GET_INFO;
419 }
420
421 /**
422 * Handles a control message.
423 *
424 * @param {Buffer} data Data to handle
425 * @return {(Error|RangeError|undefined)} A possible error
426 * @private
427 */
428 controlMessage (data) {
429 if (this._opcode === 0x08) {
430 this._loop = false;
431
432 if (data.length === 0) {
433 this.emit('conclude', 1005, '');
434 this.end();
435 } else if (data.length === 1) {
436 return error(RangeError, 'invalid payload length 1', true, 1002);
437 } else {
438 const code = data.readUInt16BE(0);
439
440 if (!validation.isValidStatusCode(code)) {
441 return error(RangeError, `invalid status code ${code}`, true, 1002);
442 }
443
444 const buf = data.slice(2);
445
446 if (!validation.isValidUTF8(buf)) {
447 return error(Error, 'invalid UTF-8 sequence', true, 1007);
448 }
449
450 this.emit('conclude', code, buf.toString());
451 this.end();
452 }
453
454 return;
455 }
456
457 if (this._opcode === 0x09) this.emit('ping', data);
458 else this.emit('pong', data);
459
460 this._state = GET_INFO;
461 }
462}
463
464module.exports = Receiver;
465
466/**
467 * Builds an error object.
468 *
469 * @param {(Error|RangeError)} ErrorCtor The error constructor
470 * @param {String} message The error message
471 * @param {Boolean} prefix Specifies whether or not to add a default prefix to
472 * `message`
473 * @param {Number} statusCode The status code
474 * @return {(Error|RangeError)} The error
475 * @private
476 */
477function error (ErrorCtor, message, prefix, statusCode) {
478 const err = new ErrorCtor(
479 prefix ? `Invalid WebSocket frame: ${message}` : message
480 );
481
482 Error.captureStackTrace(err, error);
483 err[constants.kStatusCode] = statusCode;
484 return err;
485}
486
487/**
488 * Makes a buffer from a list of fragments.
489 *
490 * @param {Buffer[]} fragments The list of fragments composing the message
491 * @param {Number} messageLength The length of the message
492 * @return {Buffer}
493 * @private
494 */
495function toBuffer (fragments, messageLength) {
496 if (fragments.length === 1) return fragments[0];
497 if (fragments.length > 1) return bufferUtil.concat(fragments, messageLength);
498 return constants.EMPTY_BUFFER;
499}
500
501/**
502 * Converts a buffer to an `ArrayBuffer`.
503 *
504 * @param {Buffer} The buffer to convert
505 * @return {ArrayBuffer} Converted buffer
506 */
507function toArrayBuffer (buf) {
508 if (buf.byteOffset === 0 && buf.byteLength === buf.buffer.byteLength) {
509 return buf.buffer;
510 }
511
512 return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
513}