UNPKG

9.48 kBJavaScriptView Raw
1'use strict';
2
3const { randomBytes } = require('crypto');
4
5const PerMessageDeflate = require('./permessage-deflate');
6const { EMPTY_BUFFER } = require('./constants');
7const { isValidStatusCode } = require('./validation');
8const { mask: applyMask, toBuffer } = require('./buffer-util');
9
10/**
11 * HyBi Sender implementation.
12 */
13class Sender {
14 /**
15 * Creates a Sender instance.
16 *
17 * @param {net.Socket} socket The connection socket
18 * @param {Object} extensions An object containing the negotiated extensions
19 */
20 constructor(socket, extensions) {
21 this._extensions = extensions || {};
22 this._socket = socket;
23
24 this._firstFragment = true;
25 this._compress = false;
26
27 this._bufferedBytes = 0;
28 this._deflating = false;
29 this._queue = [];
30 }
31
32 /**
33 * Frames a piece of data according to the HyBi WebSocket protocol.
34 *
35 * @param {Buffer} data The data to frame
36 * @param {Object} options Options object
37 * @param {Number} options.opcode The opcode
38 * @param {Boolean} options.readOnly Specifies whether `data` can be modified
39 * @param {Boolean} options.fin Specifies whether or not to set the FIN bit
40 * @param {Boolean} options.mask Specifies whether or not to mask `data`
41 * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit
42 * @return {Buffer[]} The framed data as a list of `Buffer` instances
43 * @public
44 */
45 static frame(data, options) {
46 const merge = options.mask && options.readOnly;
47 var offset = options.mask ? 6 : 2;
48 var payloadLength = data.length;
49
50 if (data.length >= 65536) {
51 offset += 8;
52 payloadLength = 127;
53 } else if (data.length > 125) {
54 offset += 2;
55 payloadLength = 126;
56 }
57
58 const target = Buffer.allocUnsafe(merge ? data.length + offset : offset);
59
60 target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
61 if (options.rsv1) target[0] |= 0x40;
62
63 target[1] = payloadLength;
64
65 if (payloadLength === 126) {
66 target.writeUInt16BE(data.length, 2);
67 } else if (payloadLength === 127) {
68 target.writeUInt32BE(0, 2);
69 target.writeUInt32BE(data.length, 6);
70 }
71
72 if (!options.mask) return [target, data];
73
74 const mask = randomBytes(4);
75
76 target[1] |= 0x80;
77 target[offset - 4] = mask[0];
78 target[offset - 3] = mask[1];
79 target[offset - 2] = mask[2];
80 target[offset - 1] = mask[3];
81
82 if (merge) {
83 applyMask(data, mask, target, offset, data.length);
84 return [target];
85 }
86
87 applyMask(data, mask, data, 0, data.length);
88 return [target, data];
89 }
90
91 /**
92 * Sends a close message to the other peer.
93 *
94 * @param {(Number|undefined)} code The status code component of the body
95 * @param {String} data The message component of the body
96 * @param {Boolean} mask Specifies whether or not to mask the message
97 * @param {Function} cb Callback
98 * @public
99 */
100 close(code, data, mask, cb) {
101 var buf;
102
103 if (code === undefined) {
104 buf = EMPTY_BUFFER;
105 } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
106 throw new TypeError('First argument must be a valid error code number');
107 } else if (data === undefined || data === '') {
108 buf = Buffer.allocUnsafe(2);
109 buf.writeUInt16BE(code, 0);
110 } else {
111 buf = Buffer.allocUnsafe(2 + Buffer.byteLength(data));
112 buf.writeUInt16BE(code, 0);
113 buf.write(data, 2);
114 }
115
116 if (this._deflating) {
117 this.enqueue([this.doClose, buf, mask, cb]);
118 } else {
119 this.doClose(buf, mask, cb);
120 }
121 }
122
123 /**
124 * Frames and sends a close message.
125 *
126 * @param {Buffer} data The message to send
127 * @param {Boolean} mask Specifies whether or not to mask `data`
128 * @param {Function} cb Callback
129 * @private
130 */
131 doClose(data, mask, cb) {
132 this.sendFrame(
133 Sender.frame(data, {
134 fin: true,
135 rsv1: false,
136 opcode: 0x08,
137 mask,
138 readOnly: false
139 }),
140 cb
141 );
142 }
143
144 /**
145 * Sends a ping message to the other peer.
146 *
147 * @param {*} data The message to send
148 * @param {Boolean} mask Specifies whether or not to mask `data`
149 * @param {Function} cb Callback
150 * @public
151 */
152 ping(data, mask, cb) {
153 const buf = toBuffer(data);
154
155 if (this._deflating) {
156 this.enqueue([this.doPing, buf, mask, toBuffer.readOnly, cb]);
157 } else {
158 this.doPing(buf, mask, toBuffer.readOnly, cb);
159 }
160 }
161
162 /**
163 * Frames and sends a ping message.
164 *
165 * @param {*} data The message to send
166 * @param {Boolean} mask Specifies whether or not to mask `data`
167 * @param {Boolean} readOnly Specifies whether `data` can be modified
168 * @param {Function} cb Callback
169 * @private
170 */
171 doPing(data, mask, readOnly, cb) {
172 this.sendFrame(
173 Sender.frame(data, {
174 fin: true,
175 rsv1: false,
176 opcode: 0x09,
177 mask,
178 readOnly
179 }),
180 cb
181 );
182 }
183
184 /**
185 * Sends a pong message to the other peer.
186 *
187 * @param {*} data The message to send
188 * @param {Boolean} mask Specifies whether or not to mask `data`
189 * @param {Function} cb Callback
190 * @public
191 */
192 pong(data, mask, cb) {
193 const buf = toBuffer(data);
194
195 if (this._deflating) {
196 this.enqueue([this.doPong, buf, mask, toBuffer.readOnly, cb]);
197 } else {
198 this.doPong(buf, mask, toBuffer.readOnly, cb);
199 }
200 }
201
202 /**
203 * Frames and sends a pong message.
204 *
205 * @param {*} data The message to send
206 * @param {Boolean} mask Specifies whether or not to mask `data`
207 * @param {Boolean} readOnly Specifies whether `data` can be modified
208 * @param {Function} cb Callback
209 * @private
210 */
211 doPong(data, mask, readOnly, cb) {
212 this.sendFrame(
213 Sender.frame(data, {
214 fin: true,
215 rsv1: false,
216 opcode: 0x0a,
217 mask,
218 readOnly
219 }),
220 cb
221 );
222 }
223
224 /**
225 * Sends a data message to the other peer.
226 *
227 * @param {*} data The message to send
228 * @param {Object} options Options object
229 * @param {Boolean} options.compress Specifies whether or not to compress `data`
230 * @param {Boolean} options.binary Specifies whether `data` is binary or text
231 * @param {Boolean} options.fin Specifies whether the fragment is the last one
232 * @param {Boolean} options.mask Specifies whether or not to mask `data`
233 * @param {Function} cb Callback
234 * @public
235 */
236 send(data, options, cb) {
237 const buf = toBuffer(data);
238 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
239 var opcode = options.binary ? 2 : 1;
240 var rsv1 = options.compress;
241
242 if (this._firstFragment) {
243 this._firstFragment = false;
244 if (rsv1 && perMessageDeflate) {
245 rsv1 = buf.length >= perMessageDeflate._threshold;
246 }
247 this._compress = rsv1;
248 } else {
249 rsv1 = false;
250 opcode = 0;
251 }
252
253 if (options.fin) this._firstFragment = true;
254
255 if (perMessageDeflate) {
256 const opts = {
257 fin: options.fin,
258 rsv1,
259 opcode,
260 mask: options.mask,
261 readOnly: toBuffer.readOnly
262 };
263
264 if (this._deflating) {
265 this.enqueue([this.dispatch, buf, this._compress, opts, cb]);
266 } else {
267 this.dispatch(buf, this._compress, opts, cb);
268 }
269 } else {
270 this.sendFrame(
271 Sender.frame(buf, {
272 fin: options.fin,
273 rsv1: false,
274 opcode,
275 mask: options.mask,
276 readOnly: toBuffer.readOnly
277 }),
278 cb
279 );
280 }
281 }
282
283 /**
284 * Dispatches a data message.
285 *
286 * @param {Buffer} data The message to send
287 * @param {Boolean} compress Specifies whether or not to compress `data`
288 * @param {Object} options Options object
289 * @param {Number} options.opcode The opcode
290 * @param {Boolean} options.readOnly Specifies whether `data` can be modified
291 * @param {Boolean} options.fin Specifies whether or not to set the FIN bit
292 * @param {Boolean} options.mask Specifies whether or not to mask `data`
293 * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit
294 * @param {Function} cb Callback
295 * @private
296 */
297 dispatch(data, compress, options, cb) {
298 if (!compress) {
299 this.sendFrame(Sender.frame(data, options), cb);
300 return;
301 }
302
303 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
304
305 this._deflating = true;
306 perMessageDeflate.compress(data, options.fin, (_, buf) => {
307 this._deflating = false;
308 options.readOnly = false;
309 this.sendFrame(Sender.frame(buf, options), cb);
310 this.dequeue();
311 });
312 }
313
314 /**
315 * Executes queued send operations.
316 *
317 * @private
318 */
319 dequeue() {
320 while (!this._deflating && this._queue.length) {
321 const params = this._queue.shift();
322
323 this._bufferedBytes -= params[1].length;
324 params[0].apply(this, params.slice(1));
325 }
326 }
327
328 /**
329 * Enqueues a send operation.
330 *
331 * @param {Array} params Send operation parameters.
332 * @private
333 */
334 enqueue(params) {
335 this._bufferedBytes += params[1].length;
336 this._queue.push(params);
337 }
338
339 /**
340 * Sends a frame.
341 *
342 * @param {Buffer[]} list The frame to send
343 * @param {Function} cb Callback
344 * @private
345 */
346 sendFrame(list, cb) {
347 if (list.length === 2) {
348 this._socket.cork();
349 this._socket.write(list[0]);
350 this._socket.write(list[1], cb);
351 this._socket.uncork();
352 } else {
353 this._socket.write(list[0], cb);
354 }
355 }
356}
357
358module.exports = Sender;