UNPKG

14.5 kBJavaScriptView Raw
1'use strict';
2
3const Limiter = require('async-limiter');
4const zlib = require('zlib');
5
6const bufferUtil = require('./buffer-util');
7const constants = require('./constants');
8
9const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
10const EMPTY_BLOCK = Buffer.from([0x00]);
11
12const kPerMessageDeflate = Symbol('permessage-deflate');
13const kWriteInProgress = Symbol('write-in-progress');
14const kPendingClose = Symbol('pending-close');
15const kTotalLength = Symbol('total-length');
16const kCallback = Symbol('callback');
17const kBuffers = Symbol('buffers');
18const kError = Symbol('error');
19
20//
21// We limit zlib concurrency, which prevents severe memory fragmentation
22// as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
23// and https://github.com/websockets/ws/issues/1202
24//
25// Intentionally global; it's the global thread pool that's an issue.
26//
27let zlibLimiter;
28
29/**
30 * permessage-deflate implementation.
31 */
32class PerMessageDeflate {
33 /**
34 * Creates a PerMessageDeflate instance.
35 *
36 * @param {Object} options Configuration options
37 * @param {Boolean} options.serverNoContextTakeover Request/accept disabling
38 * of server context takeover
39 * @param {Boolean} options.clientNoContextTakeover Advertise/acknowledge
40 * disabling of client context takeover
41 * @param {(Boolean|Number)} options.serverMaxWindowBits Request/confirm the
42 * use of a custom server window size
43 * @param {(Boolean|Number)} options.clientMaxWindowBits Advertise support
44 * for, or request, a custom client window size
45 * @param {Object} options.zlibDeflateOptions Options to pass to zlib on deflate
46 * @param {Object} options.zlibInflateOptions Options to pass to zlib on inflate
47 * @param {Number} options.threshold Size (in bytes) below which messages
48 * should not be compressed
49 * @param {Number} options.concurrencyLimit The number of concurrent calls to
50 * zlib
51 * @param {Boolean} isServer Create the instance in either server or client
52 * mode
53 * @param {Number} maxPayload The maximum allowed message length
54 */
55 constructor (options, isServer, maxPayload) {
56 this._maxPayload = maxPayload | 0;
57 this._options = options || {};
58 this._threshold = this._options.threshold !== undefined
59 ? this._options.threshold
60 : 1024;
61 this._isServer = !!isServer;
62 this._deflate = null;
63 this._inflate = null;
64
65 this.params = null;
66
67 if (!zlibLimiter) {
68 const concurrency = this._options.concurrencyLimit !== undefined
69 ? this._options.concurrencyLimit
70 : 10;
71 zlibLimiter = new Limiter({ concurrency });
72 }
73 }
74
75 /**
76 * @type {String}
77 */
78 static get extensionName () {
79 return 'permessage-deflate';
80 }
81
82 /**
83 * Create an extension negotiation offer.
84 *
85 * @return {Object} Extension parameters
86 * @public
87 */
88 offer () {
89 const params = {};
90
91 if (this._options.serverNoContextTakeover) {
92 params.server_no_context_takeover = true;
93 }
94 if (this._options.clientNoContextTakeover) {
95 params.client_no_context_takeover = true;
96 }
97 if (this._options.serverMaxWindowBits) {
98 params.server_max_window_bits = this._options.serverMaxWindowBits;
99 }
100 if (this._options.clientMaxWindowBits) {
101 params.client_max_window_bits = this._options.clientMaxWindowBits;
102 } else if (this._options.clientMaxWindowBits == null) {
103 params.client_max_window_bits = true;
104 }
105
106 return params;
107 }
108
109 /**
110 * Accept an extension negotiation offer/response.
111 *
112 * @param {Array} configurations The extension negotiation offers/reponse
113 * @return {Object} Accepted configuration
114 * @public
115 */
116 accept (configurations) {
117 configurations = this.normalizeParams(configurations);
118
119 this.params = this._isServer
120 ? this.acceptAsServer(configurations)
121 : this.acceptAsClient(configurations);
122
123 return this.params;
124 }
125
126 /**
127 * Releases all resources used by the extension.
128 *
129 * @public
130 */
131 cleanup () {
132 if (this._inflate) {
133 if (this._inflate[kWriteInProgress]) {
134 this._inflate[kPendingClose] = true;
135 } else {
136 this._inflate.close();
137 this._inflate = null;
138 }
139 }
140 if (this._deflate) {
141 if (this._deflate[kWriteInProgress]) {
142 this._deflate[kPendingClose] = true;
143 } else {
144 this._deflate.close();
145 this._deflate = null;
146 }
147 }
148 }
149
150 /**
151 * Accept an extension negotiation offer.
152 *
153 * @param {Array} offers The extension negotiation offers
154 * @return {Object} Accepted configuration
155 * @private
156 */
157 acceptAsServer (offers) {
158 const opts = this._options;
159 const accepted = offers.find((params) => {
160 if (
161 (opts.serverNoContextTakeover === false &&
162 params.server_no_context_takeover) ||
163 (params.server_max_window_bits &&
164 (opts.serverMaxWindowBits === false ||
165 (typeof opts.serverMaxWindowBits === 'number' &&
166 opts.serverMaxWindowBits > params.server_max_window_bits))) ||
167 (typeof opts.clientMaxWindowBits === 'number' &&
168 !params.client_max_window_bits)
169 ) {
170 return false;
171 }
172
173 return true;
174 });
175
176 if (!accepted) {
177 throw new Error('None of the extension offers can be accepted');
178 }
179
180 if (opts.serverNoContextTakeover) {
181 accepted.server_no_context_takeover = true;
182 }
183 if (opts.clientNoContextTakeover) {
184 accepted.client_no_context_takeover = true;
185 }
186 if (typeof opts.serverMaxWindowBits === 'number') {
187 accepted.server_max_window_bits = opts.serverMaxWindowBits;
188 }
189 if (typeof opts.clientMaxWindowBits === 'number') {
190 accepted.client_max_window_bits = opts.clientMaxWindowBits;
191 } else if (
192 accepted.client_max_window_bits === true ||
193 opts.clientMaxWindowBits === false
194 ) {
195 delete accepted.client_max_window_bits;
196 }
197
198 return accepted;
199 }
200
201 /**
202 * Accept the extension negotiation response.
203 *
204 * @param {Array} response The extension negotiation response
205 * @return {Object} Accepted configuration
206 * @private
207 */
208 acceptAsClient (response) {
209 const params = response[0];
210
211 if (
212 this._options.clientNoContextTakeover === false &&
213 params.client_no_context_takeover
214 ) {
215 throw new Error('Unexpected parameter "client_no_context_takeover"');
216 }
217
218 if (!params.client_max_window_bits) {
219 if (typeof this._options.clientMaxWindowBits === 'number') {
220 params.client_max_window_bits = this._options.clientMaxWindowBits;
221 }
222 } else if (
223 this._options.clientMaxWindowBits === false ||
224 (typeof this._options.clientMaxWindowBits === 'number' &&
225 params.client_max_window_bits > this._options.clientMaxWindowBits)
226 ) {
227 throw new Error(
228 'Unexpected or invalid parameter "client_max_window_bits"'
229 );
230 }
231
232 return params;
233 }
234
235 /**
236 * Normalize parameters.
237 *
238 * @param {Array} configurations The extension negotiation offers/reponse
239 * @return {Array} The offers/response with normalized parameters
240 * @private
241 */
242 normalizeParams (configurations) {
243 configurations.forEach((params) => {
244 Object.keys(params).forEach((key) => {
245 var value = params[key];
246
247 if (value.length > 1) {
248 throw new Error(`Parameter "${key}" must have only a single value`);
249 }
250
251 value = value[0];
252
253 if (key === 'client_max_window_bits') {
254 if (value !== true) {
255 const num = +value;
256 if (!Number.isInteger(num) || num < 8 || num > 15) {
257 throw new TypeError(
258 `Invalid value for parameter "${key}": ${value}`
259 );
260 }
261 value = num;
262 } else if (!this._isServer) {
263 throw new TypeError(
264 `Invalid value for parameter "${key}": ${value}`
265 );
266 }
267 } else if (key === 'server_max_window_bits') {
268 const num = +value;
269 if (!Number.isInteger(num) || num < 8 || num > 15) {
270 throw new TypeError(
271 `Invalid value for parameter "${key}": ${value}`
272 );
273 }
274 value = num;
275 } else if (
276 key === 'client_no_context_takeover' ||
277 key === 'server_no_context_takeover'
278 ) {
279 if (value !== true) {
280 throw new TypeError(
281 `Invalid value for parameter "${key}": ${value}`
282 );
283 }
284 } else {
285 throw new Error(`Unknown parameter "${key}"`);
286 }
287
288 params[key] = value;
289 });
290 });
291
292 return configurations;
293 }
294
295 /**
296 * Decompress data. Concurrency limited by async-limiter.
297 *
298 * @param {Buffer} data Compressed data
299 * @param {Boolean} fin Specifies whether or not this is the last fragment
300 * @param {Function} callback Callback
301 * @public
302 */
303 decompress (data, fin, callback) {
304 zlibLimiter.push((done) => {
305 this._decompress(data, fin, (err, result) => {
306 done();
307 callback(err, result);
308 });
309 });
310 }
311
312 /**
313 * Compress data. Concurrency limited by async-limiter.
314 *
315 * @param {Buffer} data Data to compress
316 * @param {Boolean} fin Specifies whether or not this is the last fragment
317 * @param {Function} callback Callback
318 * @public
319 */
320 compress (data, fin, callback) {
321 zlibLimiter.push((done) => {
322 this._compress(data, fin, (err, result) => {
323 done();
324 callback(err, result);
325 });
326 });
327 }
328
329 /**
330 * Decompress data.
331 *
332 * @param {Buffer} data Compressed data
333 * @param {Boolean} fin Specifies whether or not this is the last fragment
334 * @param {Function} callback Callback
335 * @private
336 */
337 _decompress (data, fin, callback) {
338 const endpoint = this._isServer ? 'client' : 'server';
339
340 if (!this._inflate) {
341 const key = `${endpoint}_max_window_bits`;
342 const windowBits = typeof this.params[key] !== 'number'
343 ? zlib.Z_DEFAULT_WINDOWBITS
344 : this.params[key];
345
346 this._inflate = zlib.createInflateRaw(
347 Object.assign({}, this._options.zlibInflateOptions, { windowBits })
348 );
349 this._inflate[kPerMessageDeflate] = this;
350 this._inflate[kTotalLength] = 0;
351 this._inflate[kBuffers] = [];
352 this._inflate.on('error', inflateOnError);
353 this._inflate.on('data', inflateOnData);
354 }
355
356 this._inflate[kCallback] = callback;
357 this._inflate[kWriteInProgress] = true;
358
359 this._inflate.write(data);
360 if (fin) this._inflate.write(TRAILER);
361
362 this._inflate.flush(() => {
363 const err = this._inflate[kError];
364
365 if (err) {
366 this._inflate.close();
367 this._inflate = null;
368 callback(err);
369 return;
370 }
371
372 const data = bufferUtil.concat(
373 this._inflate[kBuffers],
374 this._inflate[kTotalLength]
375 );
376
377 if (
378 (fin && this.params[`${endpoint}_no_context_takeover`]) ||
379 this._inflate[kPendingClose]
380 ) {
381 this._inflate.close();
382 this._inflate = null;
383 } else {
384 this._inflate[kWriteInProgress] = false;
385 this._inflate[kTotalLength] = 0;
386 this._inflate[kBuffers] = [];
387 }
388
389 callback(null, data);
390 });
391 }
392
393 /**
394 * Compress data.
395 *
396 * @param {Buffer} data Data to compress
397 * @param {Boolean} fin Specifies whether or not this is the last fragment
398 * @param {Function} callback Callback
399 * @private
400 */
401 _compress (data, fin, callback) {
402 if (!data || data.length === 0) {
403 process.nextTick(callback, null, EMPTY_BLOCK);
404 return;
405 }
406
407 const endpoint = this._isServer ? 'server' : 'client';
408
409 if (!this._deflate) {
410 const key = `${endpoint}_max_window_bits`;
411 const windowBits = typeof this.params[key] !== 'number'
412 ? zlib.Z_DEFAULT_WINDOWBITS
413 : this.params[key];
414
415 this._deflate = zlib.createDeflateRaw(
416 Object.assign(
417 // TODO deprecate memLevel/level and recommend zlibDeflateOptions instead
418 {
419 memLevel: this._options.memLevel,
420 level: this._options.level
421 },
422 this._options.zlibDeflateOptions,
423 { windowBits }
424 )
425 );
426
427 this._deflate[kTotalLength] = 0;
428 this._deflate[kBuffers] = [];
429
430 //
431 // `zlib.DeflateRaw` emits an `'error'` event only when an attempt to use
432 // it is made after it has already been closed. This cannot happen here,
433 // so we only add a listener for the `'data'` event.
434 //
435 this._deflate.on('data', deflateOnData);
436 }
437
438 this._deflate[kWriteInProgress] = true;
439
440 this._deflate.write(data);
441 this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
442 var data = bufferUtil.concat(
443 this._deflate[kBuffers],
444 this._deflate[kTotalLength]
445 );
446
447 if (fin) data = data.slice(0, data.length - 4);
448
449 if (
450 (fin && this.params[`${endpoint}_no_context_takeover`]) ||
451 this._deflate[kPendingClose]
452 ) {
453 this._deflate.close();
454 this._deflate = null;
455 } else {
456 this._deflate[kWriteInProgress] = false;
457 this._deflate[kTotalLength] = 0;
458 this._deflate[kBuffers] = [];
459 }
460
461 callback(null, data);
462 });
463 }
464}
465
466module.exports = PerMessageDeflate;
467
468/**
469 * The listener of the `zlib.DeflateRaw` stream `'data'` event.
470 *
471 * @param {Buffer} chunk A chunk of data
472 * @private
473 */
474function deflateOnData (chunk) {
475 this[kBuffers].push(chunk);
476 this[kTotalLength] += chunk.length;
477}
478
479/**
480 * The listener of the `zlib.InflateRaw` stream `'data'` event.
481 *
482 * @param {Buffer} chunk A chunk of data
483 * @private
484 */
485function inflateOnData (chunk) {
486 this[kTotalLength] += chunk.length;
487
488 if (
489 this[kPerMessageDeflate]._maxPayload < 1 ||
490 this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
491 ) {
492 this[kBuffers].push(chunk);
493 return;
494 }
495
496 this[kError] = new RangeError('Max payload size exceeded');
497 this[kError][constants.kStatusCode] = 1009;
498 this.removeListener('data', inflateOnData);
499 this.reset();
500}
501
502/**
503 * The listener of the `zlib.InflateRaw` stream `'error'` event.
504 *
505 * @param {Error} err The emitted error
506 * @private
507 */
508function inflateOnError (err) {
509 //
510 // There is no need to call `Zlib#close()` as the handle is automatically
511 // closed when an error is emitted.
512 //
513 this[kPerMessageDeflate]._inflate = null;
514 err[constants.kStatusCode] = 1007;
515 this[kCallback](err);
516}