UNPKG

14.1 kBJavaScriptView Raw
1'use strict';
2
3const zlib = require('zlib');
4
5const bufferUtil = require('./buffer-util');
6const Limiter = require('./limiter');
7const { kStatusCode } = require('./constants');
8
9const FastBuffer = Buffer[Symbol.species];
10const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
11const kPerMessageDeflate = Symbol('permessage-deflate');
12const kTotalLength = Symbol('total-length');
13const kCallback = Symbol('callback');
14const kBuffers = Symbol('buffers');
15const kError = Symbol('error');
16
17//
18// We limit zlib concurrency, which prevents severe memory fragmentation
19// as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
20// and https://github.com/websockets/ws/issues/1202
21//
22// Intentionally global; it's the global thread pool that's an issue.
23//
24let zlibLimiter;
25
26/**
27 * permessage-deflate implementation.
28 */
29class PerMessageDeflate {
30 /**
31 * Creates a PerMessageDeflate instance.
32 *
33 * @param {Object} [options] Configuration options
34 * @param {(Boolean|Number)} [options.clientMaxWindowBits] Advertise support
35 * for, or request, a custom client window size
36 * @param {Boolean} [options.clientNoContextTakeover=false] Advertise/
37 * acknowledge disabling of client context takeover
38 * @param {Number} [options.concurrencyLimit=10] The number of concurrent
39 * calls to zlib
40 * @param {(Boolean|Number)} [options.serverMaxWindowBits] Request/confirm the
41 * use of a custom server window size
42 * @param {Boolean} [options.serverNoContextTakeover=false] Request/accept
43 * disabling of server context takeover
44 * @param {Number} [options.threshold=1024] Size (in bytes) below which
45 * messages should not be compressed if context takeover is disabled
46 * @param {Object} [options.zlibDeflateOptions] Options to pass to zlib on
47 * deflate
48 * @param {Object} [options.zlibInflateOptions] Options to pass to zlib on
49 * inflate
50 * @param {Boolean} [isServer=false] Create the instance in either server or
51 * client mode
52 * @param {Number} [maxPayload=0] The maximum allowed message length
53 */
54 constructor(options, isServer, maxPayload) {
55 this._maxPayload = maxPayload | 0;
56 this._options = options || {};
57 this._threshold =
58 this._options.threshold !== undefined ? this._options.threshold : 1024;
59 this._isServer = !!isServer;
60 this._deflate = null;
61 this._inflate = null;
62
63 this.params = null;
64
65 if (!zlibLimiter) {
66 const concurrency =
67 this._options.concurrencyLimit !== undefined
68 ? this._options.concurrencyLimit
69 : 10;
70 zlibLimiter = new Limiter(concurrency);
71 }
72 }
73
74 /**
75 * @type {String}
76 */
77 static get extensionName() {
78 return 'permessage-deflate';
79 }
80
81 /**
82 * Create an extension negotiation offer.
83 *
84 * @return {Object} Extension parameters
85 * @public
86 */
87 offer() {
88 const params = {};
89
90 if (this._options.serverNoContextTakeover) {
91 params.server_no_context_takeover = true;
92 }
93 if (this._options.clientNoContextTakeover) {
94 params.client_no_context_takeover = true;
95 }
96 if (this._options.serverMaxWindowBits) {
97 params.server_max_window_bits = this._options.serverMaxWindowBits;
98 }
99 if (this._options.clientMaxWindowBits) {
100 params.client_max_window_bits = this._options.clientMaxWindowBits;
101 } else if (this._options.clientMaxWindowBits == null) {
102 params.client_max_window_bits = true;
103 }
104
105 return params;
106 }
107
108 /**
109 * Accept an extension negotiation offer/response.
110 *
111 * @param {Array} configurations The extension negotiation offers/reponse
112 * @return {Object} Accepted configuration
113 * @public
114 */
115 accept(configurations) {
116 configurations = this.normalizeParams(configurations);
117
118 this.params = this._isServer
119 ? this.acceptAsServer(configurations)
120 : this.acceptAsClient(configurations);
121
122 return this.params;
123 }
124
125 /**
126 * Releases all resources used by the extension.
127 *
128 * @public
129 */
130 cleanup() {
131 if (this._inflate) {
132 this._inflate.close();
133 this._inflate = null;
134 }
135
136 if (this._deflate) {
137 const callback = this._deflate[kCallback];
138
139 this._deflate.close();
140 this._deflate = null;
141
142 if (callback) {
143 callback(
144 new Error(
145 'The deflate stream was closed while data was being processed'
146 )
147 );
148 }
149 }
150 }
151
152 /**
153 * Accept an extension negotiation offer.
154 *
155 * @param {Array} offers The extension negotiation offers
156 * @return {Object} Accepted configuration
157 * @private
158 */
159 acceptAsServer(offers) {
160 const opts = this._options;
161 const accepted = offers.find((params) => {
162 if (
163 (opts.serverNoContextTakeover === false &&
164 params.server_no_context_takeover) ||
165 (params.server_max_window_bits &&
166 (opts.serverMaxWindowBits === false ||
167 (typeof opts.serverMaxWindowBits === 'number' &&
168 opts.serverMaxWindowBits > params.server_max_window_bits))) ||
169 (typeof opts.clientMaxWindowBits === 'number' &&
170 !params.client_max_window_bits)
171 ) {
172 return false;
173 }
174
175 return true;
176 });
177
178 if (!accepted) {
179 throw new Error('None of the extension offers can be accepted');
180 }
181
182 if (opts.serverNoContextTakeover) {
183 accepted.server_no_context_takeover = true;
184 }
185 if (opts.clientNoContextTakeover) {
186 accepted.client_no_context_takeover = true;
187 }
188 if (typeof opts.serverMaxWindowBits === 'number') {
189 accepted.server_max_window_bits = opts.serverMaxWindowBits;
190 }
191 if (typeof opts.clientMaxWindowBits === 'number') {
192 accepted.client_max_window_bits = opts.clientMaxWindowBits;
193 } else if (
194 accepted.client_max_window_bits === true ||
195 opts.clientMaxWindowBits === false
196 ) {
197 delete accepted.client_max_window_bits;
198 }
199
200 return accepted;
201 }
202
203 /**
204 * Accept the extension negotiation response.
205 *
206 * @param {Array} response The extension negotiation response
207 * @return {Object} Accepted configuration
208 * @private
209 */
210 acceptAsClient(response) {
211 const params = response[0];
212
213 if (
214 this._options.clientNoContextTakeover === false &&
215 params.client_no_context_takeover
216 ) {
217 throw new Error('Unexpected parameter "client_no_context_takeover"');
218 }
219
220 if (!params.client_max_window_bits) {
221 if (typeof this._options.clientMaxWindowBits === 'number') {
222 params.client_max_window_bits = this._options.clientMaxWindowBits;
223 }
224 } else if (
225 this._options.clientMaxWindowBits === false ||
226 (typeof this._options.clientMaxWindowBits === 'number' &&
227 params.client_max_window_bits > this._options.clientMaxWindowBits)
228 ) {
229 throw new Error(
230 'Unexpected or invalid parameter "client_max_window_bits"'
231 );
232 }
233
234 return params;
235 }
236
237 /**
238 * Normalize parameters.
239 *
240 * @param {Array} configurations The extension negotiation offers/reponse
241 * @return {Array} The offers/response with normalized parameters
242 * @private
243 */
244 normalizeParams(configurations) {
245 configurations.forEach((params) => {
246 Object.keys(params).forEach((key) => {
247 let value = params[key];
248
249 if (value.length > 1) {
250 throw new Error(`Parameter "${key}" must have only a single value`);
251 }
252
253 value = value[0];
254
255 if (key === 'client_max_window_bits') {
256 if (value !== true) {
257 const num = +value;
258 if (!Number.isInteger(num) || num < 8 || num > 15) {
259 throw new TypeError(
260 `Invalid value for parameter "${key}": ${value}`
261 );
262 }
263 value = num;
264 } else if (!this._isServer) {
265 throw new TypeError(
266 `Invalid value for parameter "${key}": ${value}`
267 );
268 }
269 } else if (key === 'server_max_window_bits') {
270 const num = +value;
271 if (!Number.isInteger(num) || num < 8 || num > 15) {
272 throw new TypeError(
273 `Invalid value for parameter "${key}": ${value}`
274 );
275 }
276 value = num;
277 } else if (
278 key === 'client_no_context_takeover' ||
279 key === 'server_no_context_takeover'
280 ) {
281 if (value !== true) {
282 throw new TypeError(
283 `Invalid value for parameter "${key}": ${value}`
284 );
285 }
286 } else {
287 throw new Error(`Unknown parameter "${key}"`);
288 }
289
290 params[key] = value;
291 });
292 });
293
294 return configurations;
295 }
296
297 /**
298 * Decompress data. Concurrency limited.
299 *
300 * @param {Buffer} data Compressed data
301 * @param {Boolean} fin Specifies whether or not this is the last fragment
302 * @param {Function} callback Callback
303 * @public
304 */
305 decompress(data, fin, callback) {
306 zlibLimiter.add((done) => {
307 this._decompress(data, fin, (err, result) => {
308 done();
309 callback(err, result);
310 });
311 });
312 }
313
314 /**
315 * Compress data. Concurrency limited.
316 *
317 * @param {(Buffer|String)} data Data to compress
318 * @param {Boolean} fin Specifies whether or not this is the last fragment
319 * @param {Function} callback Callback
320 * @public
321 */
322 compress(data, fin, callback) {
323 zlibLimiter.add((done) => {
324 this._compress(data, fin, (err, result) => {
325 done();
326 callback(err, result);
327 });
328 });
329 }
330
331 /**
332 * Decompress data.
333 *
334 * @param {Buffer} data Compressed data
335 * @param {Boolean} fin Specifies whether or not this is the last fragment
336 * @param {Function} callback Callback
337 * @private
338 */
339 _decompress(data, fin, callback) {
340 const endpoint = this._isServer ? 'client' : 'server';
341
342 if (!this._inflate) {
343 const key = `${endpoint}_max_window_bits`;
344 const windowBits =
345 typeof this.params[key] !== 'number'
346 ? zlib.Z_DEFAULT_WINDOWBITS
347 : this.params[key];
348
349 this._inflate = zlib.createInflateRaw({
350 ...this._options.zlibInflateOptions,
351 windowBits
352 });
353 this._inflate[kPerMessageDeflate] = this;
354 this._inflate[kTotalLength] = 0;
355 this._inflate[kBuffers] = [];
356 this._inflate.on('error', inflateOnError);
357 this._inflate.on('data', inflateOnData);
358 }
359
360 this._inflate[kCallback] = callback;
361
362 this._inflate.write(data);
363 if (fin) this._inflate.write(TRAILER);
364
365 this._inflate.flush(() => {
366 const err = this._inflate[kError];
367
368 if (err) {
369 this._inflate.close();
370 this._inflate = null;
371 callback(err);
372 return;
373 }
374
375 const data = bufferUtil.concat(
376 this._inflate[kBuffers],
377 this._inflate[kTotalLength]
378 );
379
380 if (this._inflate._readableState.endEmitted) {
381 this._inflate.close();
382 this._inflate = null;
383 } else {
384 this._inflate[kTotalLength] = 0;
385 this._inflate[kBuffers] = [];
386
387 if (fin && this.params[`${endpoint}_no_context_takeover`]) {
388 this._inflate.reset();
389 }
390 }
391
392 callback(null, data);
393 });
394 }
395
396 /**
397 * Compress data.
398 *
399 * @param {(Buffer|String)} data Data to compress
400 * @param {Boolean} fin Specifies whether or not this is the last fragment
401 * @param {Function} callback Callback
402 * @private
403 */
404 _compress(data, fin, callback) {
405 const endpoint = this._isServer ? 'server' : 'client';
406
407 if (!this._deflate) {
408 const key = `${endpoint}_max_window_bits`;
409 const windowBits =
410 typeof this.params[key] !== 'number'
411 ? zlib.Z_DEFAULT_WINDOWBITS
412 : this.params[key];
413
414 this._deflate = zlib.createDeflateRaw({
415 ...this._options.zlibDeflateOptions,
416 windowBits
417 });
418
419 this._deflate[kTotalLength] = 0;
420 this._deflate[kBuffers] = [];
421
422 this._deflate.on('data', deflateOnData);
423 }
424
425 this._deflate[kCallback] = callback;
426
427 this._deflate.write(data);
428 this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
429 if (!this._deflate) {
430 //
431 // The deflate stream was closed while data was being processed.
432 //
433 return;
434 }
435
436 let data = bufferUtil.concat(
437 this._deflate[kBuffers],
438 this._deflate[kTotalLength]
439 );
440
441 if (fin) {
442 data = new FastBuffer(data.buffer, data.byteOffset, data.length - 4);
443 }
444
445 //
446 // Ensure that the callback will not be called again in
447 // `PerMessageDeflate#cleanup()`.
448 //
449 this._deflate[kCallback] = null;
450
451 this._deflate[kTotalLength] = 0;
452 this._deflate[kBuffers] = [];
453
454 if (fin && this.params[`${endpoint}_no_context_takeover`]) {
455 this._deflate.reset();
456 }
457
458 callback(null, data);
459 });
460 }
461}
462
463module.exports = PerMessageDeflate;
464
465/**
466 * The listener of the `zlib.DeflateRaw` stream `'data'` event.
467 *
468 * @param {Buffer} chunk A chunk of data
469 * @private
470 */
471function deflateOnData(chunk) {
472 this[kBuffers].push(chunk);
473 this[kTotalLength] += chunk.length;
474}
475
476/**
477 * The listener of the `zlib.InflateRaw` stream `'data'` event.
478 *
479 * @param {Buffer} chunk A chunk of data
480 * @private
481 */
482function inflateOnData(chunk) {
483 this[kTotalLength] += chunk.length;
484
485 if (
486 this[kPerMessageDeflate]._maxPayload < 1 ||
487 this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
488 ) {
489 this[kBuffers].push(chunk);
490 return;
491 }
492
493 this[kError] = new RangeError('Max payload size exceeded');
494 this[kError].code = 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH';
495 this[kError][kStatusCode] = 1009;
496 this.removeListener('data', inflateOnData);
497 this.reset();
498}
499
500/**
501 * The listener of the `zlib.InflateRaw` stream `'error'` event.
502 *
503 * @param {Error} err The emitted error
504 * @private
505 */
506function inflateOnError(err) {
507 //
508 // There is no need to call `Zlib#close()` as the handle is automatically
509 // closed when an error is emitted.
510 //
511 this[kPerMessageDeflate]._inflate = null;
512 err[kStatusCode] = 1007;
513 this[kCallback](err);
514}