UNPKG

12.7 kBJavaScriptView Raw
1/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^net|tls$" }] */
2
3'use strict';
4
5const net = require('net');
6const tls = require('tls');
7const { randomFillSync } = require('crypto');
8
9const PerMessageDeflate = require('./permessage-deflate');
10const { EMPTY_BUFFER } = require('./constants');
11const { isValidStatusCode } = require('./validation');
12const { mask: applyMask, toBuffer } = require('./buffer-util');
13
14const kByteLength = Symbol('kByteLength');
15const maskBuffer = Buffer.alloc(4);
16
17/**
18 * HyBi Sender implementation.
19 */
20class Sender {
21 /**
22 * Creates a Sender instance.
23 *
24 * @param {(net.Socket|tls.Socket)} socket The connection socket
25 * @param {Object} [extensions] An object containing the negotiated extensions
26 * @param {Function} [generateMask] The function used to generate the masking
27 * key
28 */
29 constructor(socket, extensions, generateMask) {
30 this._extensions = extensions || {};
31
32 if (generateMask) {
33 this._generateMask = generateMask;
34 this._maskBuffer = Buffer.alloc(4);
35 }
36
37 this._socket = socket;
38
39 this._firstFragment = true;
40 this._compress = false;
41
42 this._bufferedBytes = 0;
43 this._deflating = false;
44 this._queue = [];
45 }
46
47 /**
48 * Frames a piece of data according to the HyBi WebSocket protocol.
49 *
50 * @param {(Buffer|String)} data The data to frame
51 * @param {Object} options Options object
52 * @param {Boolean} [options.fin=false] Specifies whether or not to set the
53 * FIN bit
54 * @param {Function} [options.generateMask] The function used to generate the
55 * masking key
56 * @param {Boolean} [options.mask=false] Specifies whether or not to mask
57 * `data`
58 * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
59 * key
60 * @param {Number} options.opcode The opcode
61 * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
62 * modified
63 * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
64 * RSV1 bit
65 * @return {(Buffer|String)[]} The framed data
66 * @public
67 */
68 static frame(data, options) {
69 let mask;
70 let merge = false;
71 let offset = 2;
72 let skipMasking = false;
73
74 if (options.mask) {
75 mask = options.maskBuffer || maskBuffer;
76
77 if (options.generateMask) {
78 options.generateMask(mask);
79 } else {
80 randomFillSync(mask, 0, 4);
81 }
82
83 skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0;
84 offset = 6;
85 }
86
87 let dataLength;
88
89 if (typeof data === 'string') {
90 if (
91 (!options.mask || skipMasking) &&
92 options[kByteLength] !== undefined
93 ) {
94 dataLength = options[kByteLength];
95 } else {
96 data = Buffer.from(data);
97 dataLength = data.length;
98 }
99 } else {
100 dataLength = data.length;
101 merge = options.mask && options.readOnly && !skipMasking;
102 }
103
104 let payloadLength = dataLength;
105
106 if (dataLength >= 65536) {
107 offset += 8;
108 payloadLength = 127;
109 } else if (dataLength > 125) {
110 offset += 2;
111 payloadLength = 126;
112 }
113
114 const target = Buffer.allocUnsafe(merge ? dataLength + offset : offset);
115
116 target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
117 if (options.rsv1) target[0] |= 0x40;
118
119 target[1] = payloadLength;
120
121 if (payloadLength === 126) {
122 target.writeUInt16BE(dataLength, 2);
123 } else if (payloadLength === 127) {
124 target[2] = target[3] = 0;
125 target.writeUIntBE(dataLength, 4, 6);
126 }
127
128 if (!options.mask) return [target, data];
129
130 target[1] |= 0x80;
131 target[offset - 4] = mask[0];
132 target[offset - 3] = mask[1];
133 target[offset - 2] = mask[2];
134 target[offset - 1] = mask[3];
135
136 if (skipMasking) return [target, data];
137
138 if (merge) {
139 applyMask(data, mask, target, offset, dataLength);
140 return [target];
141 }
142
143 applyMask(data, mask, data, 0, dataLength);
144 return [target, data];
145 }
146
147 /**
148 * Sends a close message to the other peer.
149 *
150 * @param {Number} [code] The status code component of the body
151 * @param {(String|Buffer)} [data] The message component of the body
152 * @param {Boolean} [mask=false] Specifies whether or not to mask the message
153 * @param {Function} [cb] Callback
154 * @public
155 */
156 close(code, data, mask, cb) {
157 let buf;
158
159 if (code === undefined) {
160 buf = EMPTY_BUFFER;
161 } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
162 throw new TypeError('First argument must be a valid error code number');
163 } else if (data === undefined || !data.length) {
164 buf = Buffer.allocUnsafe(2);
165 buf.writeUInt16BE(code, 0);
166 } else {
167 const length = Buffer.byteLength(data);
168
169 if (length > 123) {
170 throw new RangeError('The message must not be greater than 123 bytes');
171 }
172
173 buf = Buffer.allocUnsafe(2 + length);
174 buf.writeUInt16BE(code, 0);
175
176 if (typeof data === 'string') {
177 buf.write(data, 2);
178 } else {
179 buf.set(data, 2);
180 }
181 }
182
183 const options = {
184 [kByteLength]: buf.length,
185 fin: true,
186 generateMask: this._generateMask,
187 mask,
188 maskBuffer: this._maskBuffer,
189 opcode: 0x08,
190 readOnly: false,
191 rsv1: false
192 };
193
194 if (this._deflating) {
195 this.enqueue([this.dispatch, buf, false, options, cb]);
196 } else {
197 this.sendFrame(Sender.frame(buf, options), cb);
198 }
199 }
200
201 /**
202 * Sends a ping message to the other peer.
203 *
204 * @param {*} data The message to send
205 * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
206 * @param {Function} [cb] Callback
207 * @public
208 */
209 ping(data, mask, cb) {
210 let byteLength;
211 let readOnly;
212
213 if (typeof data === 'string') {
214 byteLength = Buffer.byteLength(data);
215 readOnly = false;
216 } else {
217 data = toBuffer(data);
218 byteLength = data.length;
219 readOnly = toBuffer.readOnly;
220 }
221
222 if (byteLength > 125) {
223 throw new RangeError('The data size must not be greater than 125 bytes');
224 }
225
226 const options = {
227 [kByteLength]: byteLength,
228 fin: true,
229 generateMask: this._generateMask,
230 mask,
231 maskBuffer: this._maskBuffer,
232 opcode: 0x09,
233 readOnly,
234 rsv1: false
235 };
236
237 if (this._deflating) {
238 this.enqueue([this.dispatch, data, false, options, cb]);
239 } else {
240 this.sendFrame(Sender.frame(data, options), cb);
241 }
242 }
243
244 /**
245 * Sends a pong message to the other peer.
246 *
247 * @param {*} data The message to send
248 * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
249 * @param {Function} [cb] Callback
250 * @public
251 */
252 pong(data, mask, cb) {
253 let byteLength;
254 let readOnly;
255
256 if (typeof data === 'string') {
257 byteLength = Buffer.byteLength(data);
258 readOnly = false;
259 } else {
260 data = toBuffer(data);
261 byteLength = data.length;
262 readOnly = toBuffer.readOnly;
263 }
264
265 if (byteLength > 125) {
266 throw new RangeError('The data size must not be greater than 125 bytes');
267 }
268
269 const options = {
270 [kByteLength]: byteLength,
271 fin: true,
272 generateMask: this._generateMask,
273 mask,
274 maskBuffer: this._maskBuffer,
275 opcode: 0x0a,
276 readOnly,
277 rsv1: false
278 };
279
280 if (this._deflating) {
281 this.enqueue([this.dispatch, data, false, options, cb]);
282 } else {
283 this.sendFrame(Sender.frame(data, options), cb);
284 }
285 }
286
287 /**
288 * Sends a data message to the other peer.
289 *
290 * @param {*} data The message to send
291 * @param {Object} options Options object
292 * @param {Boolean} [options.binary=false] Specifies whether `data` is binary
293 * or text
294 * @param {Boolean} [options.compress=false] Specifies whether or not to
295 * compress `data`
296 * @param {Boolean} [options.fin=false] Specifies whether the fragment is the
297 * last one
298 * @param {Boolean} [options.mask=false] Specifies whether or not to mask
299 * `data`
300 * @param {Function} [cb] Callback
301 * @public
302 */
303 send(data, options, cb) {
304 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
305 let opcode = options.binary ? 2 : 1;
306 let rsv1 = options.compress;
307
308 let byteLength;
309 let readOnly;
310
311 if (typeof data === 'string') {
312 byteLength = Buffer.byteLength(data);
313 readOnly = false;
314 } else {
315 data = toBuffer(data);
316 byteLength = data.length;
317 readOnly = toBuffer.readOnly;
318 }
319
320 if (this._firstFragment) {
321 this._firstFragment = false;
322 if (
323 rsv1 &&
324 perMessageDeflate &&
325 perMessageDeflate.params[
326 perMessageDeflate._isServer
327 ? 'server_no_context_takeover'
328 : 'client_no_context_takeover'
329 ]
330 ) {
331 rsv1 = byteLength >= perMessageDeflate._threshold;
332 }
333 this._compress = rsv1;
334 } else {
335 rsv1 = false;
336 opcode = 0;
337 }
338
339 if (options.fin) this._firstFragment = true;
340
341 if (perMessageDeflate) {
342 const opts = {
343 [kByteLength]: byteLength,
344 fin: options.fin,
345 generateMask: this._generateMask,
346 mask: options.mask,
347 maskBuffer: this._maskBuffer,
348 opcode,
349 readOnly,
350 rsv1
351 };
352
353 if (this._deflating) {
354 this.enqueue([this.dispatch, data, this._compress, opts, cb]);
355 } else {
356 this.dispatch(data, this._compress, opts, cb);
357 }
358 } else {
359 this.sendFrame(
360 Sender.frame(data, {
361 [kByteLength]: byteLength,
362 fin: options.fin,
363 generateMask: this._generateMask,
364 mask: options.mask,
365 maskBuffer: this._maskBuffer,
366 opcode,
367 readOnly,
368 rsv1: false
369 }),
370 cb
371 );
372 }
373 }
374
375 /**
376 * Dispatches a message.
377 *
378 * @param {(Buffer|String)} data The message to send
379 * @param {Boolean} [compress=false] Specifies whether or not to compress
380 * `data`
381 * @param {Object} options Options object
382 * @param {Boolean} [options.fin=false] Specifies whether or not to set the
383 * FIN bit
384 * @param {Function} [options.generateMask] The function used to generate the
385 * masking key
386 * @param {Boolean} [options.mask=false] Specifies whether or not to mask
387 * `data`
388 * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
389 * key
390 * @param {Number} options.opcode The opcode
391 * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
392 * modified
393 * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
394 * RSV1 bit
395 * @param {Function} [cb] Callback
396 * @private
397 */
398 dispatch(data, compress, options, cb) {
399 if (!compress) {
400 this.sendFrame(Sender.frame(data, options), cb);
401 return;
402 }
403
404 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
405
406 this._bufferedBytes += options[kByteLength];
407 this._deflating = true;
408 perMessageDeflate.compress(data, options.fin, (_, buf) => {
409 if (this._socket.destroyed) {
410 const err = new Error(
411 'The socket was closed while data was being compressed'
412 );
413
414 if (typeof cb === 'function') cb(err);
415
416 for (let i = 0; i < this._queue.length; i++) {
417 const params = this._queue[i];
418 const callback = params[params.length - 1];
419
420 if (typeof callback === 'function') callback(err);
421 }
422
423 return;
424 }
425
426 this._bufferedBytes -= options[kByteLength];
427 this._deflating = false;
428 options.readOnly = false;
429 this.sendFrame(Sender.frame(buf, options), cb);
430 this.dequeue();
431 });
432 }
433
434 /**
435 * Executes queued send operations.
436 *
437 * @private
438 */
439 dequeue() {
440 while (!this._deflating && this._queue.length) {
441 const params = this._queue.shift();
442
443 this._bufferedBytes -= params[3][kByteLength];
444 Reflect.apply(params[0], this, params.slice(1));
445 }
446 }
447
448 /**
449 * Enqueues a send operation.
450 *
451 * @param {Array} params Send operation parameters.
452 * @private
453 */
454 enqueue(params) {
455 this._bufferedBytes += params[3][kByteLength];
456 this._queue.push(params);
457 }
458
459 /**
460 * Sends a frame.
461 *
462 * @param {Buffer[]} list The frame to send
463 * @param {Function} [cb] Callback
464 * @private
465 */
466 sendFrame(list, cb) {
467 if (list.length === 2) {
468 this._socket.cork();
469 this._socket.write(list[0]);
470 this._socket.write(list[1], cb);
471 this._socket.uncork();
472 } else {
473 this._socket.write(list[0], cb);
474 }
475 }
476}
477
478module.exports = Sender;