UNPKG

10.3 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.CompressionFilterFactory = exports.CompressionFilter = void 0;
20const zlib = require("zlib");
21const compression_algorithms_1 = require("./compression-algorithms");
22const constants_1 = require("./constants");
23const filter_1 = require("./filter");
24const logging = require("./logging");
25const isCompressionAlgorithmKey = (key) => {
26 return (typeof key === 'number' && typeof compression_algorithms_1.CompressionAlgorithms[key] === 'string');
27};
28class CompressionHandler {
29 /**
30 * @param message Raw uncompressed message bytes
31 * @param compress Indicates whether the message should be compressed
32 * @return Framed message, compressed if applicable
33 */
34 async writeMessage(message, compress) {
35 let messageBuffer = message;
36 if (compress) {
37 messageBuffer = await this.compressMessage(messageBuffer);
38 }
39 const output = Buffer.allocUnsafe(messageBuffer.length + 5);
40 output.writeUInt8(compress ? 1 : 0, 0);
41 output.writeUInt32BE(messageBuffer.length, 1);
42 messageBuffer.copy(output, 5);
43 return output;
44 }
45 /**
46 * @param data Framed message, possibly compressed
47 * @return Uncompressed message
48 */
49 async readMessage(data) {
50 const compressed = data.readUInt8(0) === 1;
51 let messageBuffer = data.slice(5);
52 if (compressed) {
53 messageBuffer = await this.decompressMessage(messageBuffer);
54 }
55 return messageBuffer;
56 }
57}
58class IdentityHandler extends CompressionHandler {
59 async compressMessage(message) {
60 return message;
61 }
62 async writeMessage(message, compress) {
63 const output = Buffer.allocUnsafe(message.length + 5);
64 /* With "identity" compression, messages should always be marked as
65 * uncompressed */
66 output.writeUInt8(0, 0);
67 output.writeUInt32BE(message.length, 1);
68 message.copy(output, 5);
69 return output;
70 }
71 decompressMessage(message) {
72 return Promise.reject(new Error('Received compressed message but "grpc-encoding" header was identity'));
73 }
74}
75class DeflateHandler extends CompressionHandler {
76 compressMessage(message) {
77 return new Promise((resolve, reject) => {
78 zlib.deflate(message, (err, output) => {
79 if (err) {
80 reject(err);
81 }
82 else {
83 resolve(output);
84 }
85 });
86 });
87 }
88 decompressMessage(message) {
89 return new Promise((resolve, reject) => {
90 zlib.inflate(message, (err, output) => {
91 if (err) {
92 reject(err);
93 }
94 else {
95 resolve(output);
96 }
97 });
98 });
99 }
100}
101class GzipHandler extends CompressionHandler {
102 compressMessage(message) {
103 return new Promise((resolve, reject) => {
104 zlib.gzip(message, (err, output) => {
105 if (err) {
106 reject(err);
107 }
108 else {
109 resolve(output);
110 }
111 });
112 });
113 }
114 decompressMessage(message) {
115 return new Promise((resolve, reject) => {
116 zlib.unzip(message, (err, output) => {
117 if (err) {
118 reject(err);
119 }
120 else {
121 resolve(output);
122 }
123 });
124 });
125 }
126}
127class UnknownHandler extends CompressionHandler {
128 constructor(compressionName) {
129 super();
130 this.compressionName = compressionName;
131 }
132 compressMessage(message) {
133 return Promise.reject(new Error(`Received message compressed with unsupported compression method ${this.compressionName}`));
134 }
135 decompressMessage(message) {
136 // This should be unreachable
137 return Promise.reject(new Error(`Compression method not supported: ${this.compressionName}`));
138 }
139}
140function getCompressionHandler(compressionName) {
141 switch (compressionName) {
142 case 'identity':
143 return new IdentityHandler();
144 case 'deflate':
145 return new DeflateHandler();
146 case 'gzip':
147 return new GzipHandler();
148 default:
149 return new UnknownHandler(compressionName);
150 }
151}
152class CompressionFilter extends filter_1.BaseFilter {
153 constructor(channelOptions, sharedFilterConfig) {
154 var _a;
155 super();
156 this.sharedFilterConfig = sharedFilterConfig;
157 this.sendCompression = new IdentityHandler();
158 this.receiveCompression = new IdentityHandler();
159 this.currentCompressionAlgorithm = 'identity';
160 const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm'];
161 if (compressionAlgorithmKey !== undefined) {
162 if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
163 const clientSelectedEncoding = compression_algorithms_1.CompressionAlgorithms[compressionAlgorithmKey];
164 const serverSupportedEncodings = (_a = sharedFilterConfig.serverSupportedEncodingHeader) === null || _a === void 0 ? void 0 : _a.split(',');
165 /**
166 * There are two possible situations here:
167 * 1) We don't have any info yet from the server about what compression it supports
168 * In that case we should just use what the client tells us to use
169 * 2) We've previously received a response from the server including a grpc-accept-encoding header
170 * In that case we only want to use the encoding chosen by the client if the server supports it
171 */
172 if (!serverSupportedEncodings ||
173 serverSupportedEncodings.includes(clientSelectedEncoding)) {
174 this.currentCompressionAlgorithm = clientSelectedEncoding;
175 this.sendCompression = getCompressionHandler(this.currentCompressionAlgorithm);
176 }
177 }
178 else {
179 logging.log(constants_1.LogVerbosity.ERROR, `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`);
180 }
181 }
182 }
183 async sendMetadata(metadata) {
184 const headers = await metadata;
185 headers.set('grpc-accept-encoding', 'identity,deflate,gzip');
186 headers.set('accept-encoding', 'identity');
187 // No need to send the header if it's "identity" - behavior is identical; save the bandwidth
188 if (this.currentCompressionAlgorithm === 'identity') {
189 headers.remove('grpc-encoding');
190 }
191 else {
192 headers.set('grpc-encoding', this.currentCompressionAlgorithm);
193 }
194 return headers;
195 }
196 receiveMetadata(metadata) {
197 const receiveEncoding = metadata.get('grpc-encoding');
198 if (receiveEncoding.length > 0) {
199 const encoding = receiveEncoding[0];
200 if (typeof encoding === 'string') {
201 this.receiveCompression = getCompressionHandler(encoding);
202 }
203 }
204 metadata.remove('grpc-encoding');
205 /* Check to see if the compression we're using to send messages is supported by the server
206 * If not, reset the sendCompression filter and have it use the default IdentityHandler */
207 const serverSupportedEncodingsHeader = metadata.get('grpc-accept-encoding')[0];
208 if (serverSupportedEncodingsHeader) {
209 this.sharedFilterConfig.serverSupportedEncodingHeader =
210 serverSupportedEncodingsHeader;
211 const serverSupportedEncodings = serverSupportedEncodingsHeader.split(',');
212 if (!serverSupportedEncodings.includes(this.currentCompressionAlgorithm)) {
213 this.sendCompression = new IdentityHandler();
214 this.currentCompressionAlgorithm = 'identity';
215 }
216 }
217 metadata.remove('grpc-accept-encoding');
218 return metadata;
219 }
220 async sendMessage(message) {
221 var _a;
222 /* This filter is special. The input message is the bare message bytes,
223 * and the output is a framed and possibly compressed message. For this
224 * reason, this filter should be at the bottom of the filter stack */
225 const resolvedMessage = await message;
226 let compress;
227 if (this.sendCompression instanceof IdentityHandler) {
228 compress = false;
229 }
230 else {
231 compress = (((_a = resolvedMessage.flags) !== null && _a !== void 0 ? _a : 0) & 2 /* WriteFlags.NoCompress */) === 0;
232 }
233 return {
234 message: await this.sendCompression.writeMessage(resolvedMessage.message, compress),
235 flags: resolvedMessage.flags,
236 };
237 }
238 async receiveMessage(message) {
239 /* This filter is also special. The input message is framed and possibly
240 * compressed, and the output message is deframed and uncompressed. So
241 * this is another reason that this filter should be at the bottom of the
242 * filter stack. */
243 return this.receiveCompression.readMessage(await message);
244 }
245}
246exports.CompressionFilter = CompressionFilter;
247class CompressionFilterFactory {
248 constructor(channel, options) {
249 this.options = options;
250 this.sharedFilterConfig = {};
251 }
252 createFilter() {
253 return new CompressionFilter(this.options, this.sharedFilterConfig);
254 }
255}
256exports.CompressionFilterFactory = CompressionFilterFactory;
257//# sourceMappingURL=compression-filter.js.map
\No newline at end of file