UNPKG

13.4 kBJavaScriptView Raw
1/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex" }] */
2
3'use strict';
4
5const { Duplex } = require('stream');
6const { randomFillSync } = require('crypto');
7
8const PerMessageDeflate = require('./permessage-deflate');
9const { EMPTY_BUFFER } = require('./constants');
10const { isValidStatusCode } = require('./validation');
11const { mask: applyMask, toBuffer } = require('./buffer-util');
12
13const kByteLength = Symbol('kByteLength');
14const maskBuffer = Buffer.alloc(4);
15const RANDOM_POOL_SIZE = 8 * 1024;
16let randomPool;
17let randomPoolPointer = RANDOM_POOL_SIZE;
18
19/**
20 * HyBi Sender implementation.
21 */
22class Sender {
23 /**
24 * Creates a Sender instance.
25 *
26 * @param {Duplex} socket The connection socket
27 * @param {Object} [extensions] An object containing the negotiated extensions
28 * @param {Function} [generateMask] The function used to generate the masking
29 * key
30 */
31 constructor(socket, extensions, generateMask) {
32 this._extensions = extensions || {};
33
34 if (generateMask) {
35 this._generateMask = generateMask;
36 this._maskBuffer = Buffer.alloc(4);
37 }
38
39 this._socket = socket;
40
41 this._firstFragment = true;
42 this._compress = false;
43
44 this._bufferedBytes = 0;
45 this._deflating = false;
46 this._queue = [];
47 }
48
49 /**
50 * Frames a piece of data according to the HyBi WebSocket protocol.
51 *
52 * @param {(Buffer|String)} data The data to frame
53 * @param {Object} options Options object
54 * @param {Boolean} [options.fin=false] Specifies whether or not to set the
55 * FIN bit
56 * @param {Function} [options.generateMask] The function used to generate the
57 * masking key
58 * @param {Boolean} [options.mask=false] Specifies whether or not to mask
59 * `data`
60 * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
61 * key
62 * @param {Number} options.opcode The opcode
63 * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
64 * modified
65 * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
66 * RSV1 bit
67 * @return {(Buffer|String)[]} The framed data
68 * @public
69 */
70 static frame(data, options) {
71 let mask;
72 let merge = false;
73 let offset = 2;
74 let skipMasking = false;
75
76 if (options.mask) {
77 mask = options.maskBuffer || maskBuffer;
78
79 if (options.generateMask) {
80 options.generateMask(mask);
81 } else {
82 if (randomPoolPointer === RANDOM_POOL_SIZE) {
83 /* istanbul ignore else */
84 if (randomPool === undefined) {
85 //
86 // This is lazily initialized because server-sent frames must not
87 // be masked so it may never be used.
88 //
89 randomPool = Buffer.alloc(RANDOM_POOL_SIZE);
90 }
91
92 randomFillSync(randomPool, 0, RANDOM_POOL_SIZE);
93 randomPoolPointer = 0;
94 }
95
96 mask[0] = randomPool[randomPoolPointer++];
97 mask[1] = randomPool[randomPoolPointer++];
98 mask[2] = randomPool[randomPoolPointer++];
99 mask[3] = randomPool[randomPoolPointer++];
100 }
101
102 skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0;
103 offset = 6;
104 }
105
106 let dataLength;
107
108 if (typeof data === 'string') {
109 if (
110 (!options.mask || skipMasking) &&
111 options[kByteLength] !== undefined
112 ) {
113 dataLength = options[kByteLength];
114 } else {
115 data = Buffer.from(data);
116 dataLength = data.length;
117 }
118 } else {
119 dataLength = data.length;
120 merge = options.mask && options.readOnly && !skipMasking;
121 }
122
123 let payloadLength = dataLength;
124
125 if (dataLength >= 65536) {
126 offset += 8;
127 payloadLength = 127;
128 } else if (dataLength > 125) {
129 offset += 2;
130 payloadLength = 126;
131 }
132
133 const target = Buffer.allocUnsafe(merge ? dataLength + offset : offset);
134
135 target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
136 if (options.rsv1) target[0] |= 0x40;
137
138 target[1] = payloadLength;
139
140 if (payloadLength === 126) {
141 target.writeUInt16BE(dataLength, 2);
142 } else if (payloadLength === 127) {
143 target[2] = target[3] = 0;
144 target.writeUIntBE(dataLength, 4, 6);
145 }
146
147 if (!options.mask) return [target, data];
148
149 target[1] |= 0x80;
150 target[offset - 4] = mask[0];
151 target[offset - 3] = mask[1];
152 target[offset - 2] = mask[2];
153 target[offset - 1] = mask[3];
154
155 if (skipMasking) return [target, data];
156
157 if (merge) {
158 applyMask(data, mask, target, offset, dataLength);
159 return [target];
160 }
161
162 applyMask(data, mask, data, 0, dataLength);
163 return [target, data];
164 }
165
166 /**
167 * Sends a close message to the other peer.
168 *
169 * @param {Number} [code] The status code component of the body
170 * @param {(String|Buffer)} [data] The message component of the body
171 * @param {Boolean} [mask=false] Specifies whether or not to mask the message
172 * @param {Function} [cb] Callback
173 * @public
174 */
175 close(code, data, mask, cb) {
176 let buf;
177
178 if (code === undefined) {
179 buf = EMPTY_BUFFER;
180 } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
181 throw new TypeError('First argument must be a valid error code number');
182 } else if (data === undefined || !data.length) {
183 buf = Buffer.allocUnsafe(2);
184 buf.writeUInt16BE(code, 0);
185 } else {
186 const length = Buffer.byteLength(data);
187
188 if (length > 123) {
189 throw new RangeError('The message must not be greater than 123 bytes');
190 }
191
192 buf = Buffer.allocUnsafe(2 + length);
193 buf.writeUInt16BE(code, 0);
194
195 if (typeof data === 'string') {
196 buf.write(data, 2);
197 } else {
198 buf.set(data, 2);
199 }
200 }
201
202 const options = {
203 [kByteLength]: buf.length,
204 fin: true,
205 generateMask: this._generateMask,
206 mask,
207 maskBuffer: this._maskBuffer,
208 opcode: 0x08,
209 readOnly: false,
210 rsv1: false
211 };
212
213 if (this._deflating) {
214 this.enqueue([this.dispatch, buf, false, options, cb]);
215 } else {
216 this.sendFrame(Sender.frame(buf, options), cb);
217 }
218 }
219
220 /**
221 * Sends a ping message to the other peer.
222 *
223 * @param {*} data The message to send
224 * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
225 * @param {Function} [cb] Callback
226 * @public
227 */
228 ping(data, mask, cb) {
229 let byteLength;
230 let readOnly;
231
232 if (typeof data === 'string') {
233 byteLength = Buffer.byteLength(data);
234 readOnly = false;
235 } else {
236 data = toBuffer(data);
237 byteLength = data.length;
238 readOnly = toBuffer.readOnly;
239 }
240
241 if (byteLength > 125) {
242 throw new RangeError('The data size must not be greater than 125 bytes');
243 }
244
245 const options = {
246 [kByteLength]: byteLength,
247 fin: true,
248 generateMask: this._generateMask,
249 mask,
250 maskBuffer: this._maskBuffer,
251 opcode: 0x09,
252 readOnly,
253 rsv1: false
254 };
255
256 if (this._deflating) {
257 this.enqueue([this.dispatch, data, false, options, cb]);
258 } else {
259 this.sendFrame(Sender.frame(data, options), cb);
260 }
261 }
262
263 /**
264 * Sends a pong message to the other peer.
265 *
266 * @param {*} data The message to send
267 * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
268 * @param {Function} [cb] Callback
269 * @public
270 */
271 pong(data, mask, cb) {
272 let byteLength;
273 let readOnly;
274
275 if (typeof data === 'string') {
276 byteLength = Buffer.byteLength(data);
277 readOnly = false;
278 } else {
279 data = toBuffer(data);
280 byteLength = data.length;
281 readOnly = toBuffer.readOnly;
282 }
283
284 if (byteLength > 125) {
285 throw new RangeError('The data size must not be greater than 125 bytes');
286 }
287
288 const options = {
289 [kByteLength]: byteLength,
290 fin: true,
291 generateMask: this._generateMask,
292 mask,
293 maskBuffer: this._maskBuffer,
294 opcode: 0x0a,
295 readOnly,
296 rsv1: false
297 };
298
299 if (this._deflating) {
300 this.enqueue([this.dispatch, data, false, options, cb]);
301 } else {
302 this.sendFrame(Sender.frame(data, options), cb);
303 }
304 }
305
306 /**
307 * Sends a data message to the other peer.
308 *
309 * @param {*} data The message to send
310 * @param {Object} options Options object
311 * @param {Boolean} [options.binary=false] Specifies whether `data` is binary
312 * or text
313 * @param {Boolean} [options.compress=false] Specifies whether or not to
314 * compress `data`
315 * @param {Boolean} [options.fin=false] Specifies whether the fragment is the
316 * last one
317 * @param {Boolean} [options.mask=false] Specifies whether or not to mask
318 * `data`
319 * @param {Function} [cb] Callback
320 * @public
321 */
322 send(data, options, cb) {
323 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
324 let opcode = options.binary ? 2 : 1;
325 let rsv1 = options.compress;
326
327 let byteLength;
328 let readOnly;
329
330 if (typeof data === 'string') {
331 byteLength = Buffer.byteLength(data);
332 readOnly = false;
333 } else {
334 data = toBuffer(data);
335 byteLength = data.length;
336 readOnly = toBuffer.readOnly;
337 }
338
339 if (this._firstFragment) {
340 this._firstFragment = false;
341 if (
342 rsv1 &&
343 perMessageDeflate &&
344 perMessageDeflate.params[
345 perMessageDeflate._isServer
346 ? 'server_no_context_takeover'
347 : 'client_no_context_takeover'
348 ]
349 ) {
350 rsv1 = byteLength >= perMessageDeflate._threshold;
351 }
352 this._compress = rsv1;
353 } else {
354 rsv1 = false;
355 opcode = 0;
356 }
357
358 if (options.fin) this._firstFragment = true;
359
360 if (perMessageDeflate) {
361 const opts = {
362 [kByteLength]: byteLength,
363 fin: options.fin,
364 generateMask: this._generateMask,
365 mask: options.mask,
366 maskBuffer: this._maskBuffer,
367 opcode,
368 readOnly,
369 rsv1
370 };
371
372 if (this._deflating) {
373 this.enqueue([this.dispatch, data, this._compress, opts, cb]);
374 } else {
375 this.dispatch(data, this._compress, opts, cb);
376 }
377 } else {
378 this.sendFrame(
379 Sender.frame(data, {
380 [kByteLength]: byteLength,
381 fin: options.fin,
382 generateMask: this._generateMask,
383 mask: options.mask,
384 maskBuffer: this._maskBuffer,
385 opcode,
386 readOnly,
387 rsv1: false
388 }),
389 cb
390 );
391 }
392 }
393
394 /**
395 * Dispatches a message.
396 *
397 * @param {(Buffer|String)} data The message to send
398 * @param {Boolean} [compress=false] Specifies whether or not to compress
399 * `data`
400 * @param {Object} options Options object
401 * @param {Boolean} [options.fin=false] Specifies whether or not to set the
402 * FIN bit
403 * @param {Function} [options.generateMask] The function used to generate the
404 * masking key
405 * @param {Boolean} [options.mask=false] Specifies whether or not to mask
406 * `data`
407 * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
408 * key
409 * @param {Number} options.opcode The opcode
410 * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
411 * modified
412 * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
413 * RSV1 bit
414 * @param {Function} [cb] Callback
415 * @private
416 */
417 dispatch(data, compress, options, cb) {
418 if (!compress) {
419 this.sendFrame(Sender.frame(data, options), cb);
420 return;
421 }
422
423 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
424
425 this._bufferedBytes += options[kByteLength];
426 this._deflating = true;
427 perMessageDeflate.compress(data, options.fin, (_, buf) => {
428 if (this._socket.destroyed) {
429 const err = new Error(
430 'The socket was closed while data was being compressed'
431 );
432
433 if (typeof cb === 'function') cb(err);
434
435 for (let i = 0; i < this._queue.length; i++) {
436 const params = this._queue[i];
437 const callback = params[params.length - 1];
438
439 if (typeof callback === 'function') callback(err);
440 }
441
442 return;
443 }
444
445 this._bufferedBytes -= options[kByteLength];
446 this._deflating = false;
447 options.readOnly = false;
448 this.sendFrame(Sender.frame(buf, options), cb);
449 this.dequeue();
450 });
451 }
452
453 /**
454 * Executes queued send operations.
455 *
456 * @private
457 */
458 dequeue() {
459 while (!this._deflating && this._queue.length) {
460 const params = this._queue.shift();
461
462 this._bufferedBytes -= params[3][kByteLength];
463 Reflect.apply(params[0], this, params.slice(1));
464 }
465 }
466
467 /**
468 * Enqueues a send operation.
469 *
470 * @param {Array} params Send operation parameters.
471 * @private
472 */
473 enqueue(params) {
474 this._bufferedBytes += params[3][kByteLength];
475 this._queue.push(params);
476 }
477
478 /**
479 * Sends a frame.
480 *
481 * @param {Buffer[]} list The frame to send
482 * @param {Function} [cb] Callback
483 * @private
484 */
485 sendFrame(list, cb) {
486 if (list.length === 2) {
487 this._socket.cork();
488 this._socket.write(list[0]);
489 this._socket.write(list[1], cb);
490 this._socket.uncork();
491 } else {
492 this._socket.write(list[0], cb);
493 }
494 }
495}
496
497module.exports = Sender;