UNPKG

10.3 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.CompressionFilterFactory = exports.CompressionFilter = void 0;
20const zlib = require("zlib");
21const compression_algorithms_1 = require("./compression-algorithms");
22const constants_1 = require("./constants");
23const filter_1 = require("./filter");
24const logging = require("./logging");
25const isCompressionAlgorithmKey = (key) => {
26 return typeof key === 'number' && typeof compression_algorithms_1.CompressionAlgorithms[key] === 'string';
27};
28class CompressionHandler {
29 /**
30 * @param message Raw uncompressed message bytes
31 * @param compress Indicates whether the message should be compressed
32 * @return Framed message, compressed if applicable
33 */
34 async writeMessage(message, compress) {
35 let messageBuffer = message;
36 if (compress) {
37 messageBuffer = await this.compressMessage(messageBuffer);
38 }
39 const output = Buffer.allocUnsafe(messageBuffer.length + 5);
40 output.writeUInt8(compress ? 1 : 0, 0);
41 output.writeUInt32BE(messageBuffer.length, 1);
42 messageBuffer.copy(output, 5);
43 return output;
44 }
45 /**
46 * @param data Framed message, possibly compressed
47 * @return Uncompressed message
48 */
49 async readMessage(data) {
50 const compressed = data.readUInt8(0) === 1;
51 let messageBuffer = data.slice(5);
52 if (compressed) {
53 messageBuffer = await this.decompressMessage(messageBuffer);
54 }
55 return messageBuffer;
56 }
57}
58class IdentityHandler extends CompressionHandler {
59 async compressMessage(message) {
60 return message;
61 }
62 async writeMessage(message, compress) {
63 const output = Buffer.allocUnsafe(message.length + 5);
64 /* With "identity" compression, messages should always be marked as
65 * uncompressed */
66 output.writeUInt8(0, 0);
67 output.writeUInt32BE(message.length, 1);
68 message.copy(output, 5);
69 return output;
70 }
71 decompressMessage(message) {
72 return Promise.reject(new Error('Received compressed message but "grpc-encoding" header was identity'));
73 }
74}
75class DeflateHandler extends CompressionHandler {
76 compressMessage(message) {
77 return new Promise((resolve, reject) => {
78 zlib.deflate(message, (err, output) => {
79 if (err) {
80 reject(err);
81 }
82 else {
83 resolve(output);
84 }
85 });
86 });
87 }
88 decompressMessage(message) {
89 return new Promise((resolve, reject) => {
90 zlib.inflate(message, (err, output) => {
91 if (err) {
92 reject(err);
93 }
94 else {
95 resolve(output);
96 }
97 });
98 });
99 }
100}
101class GzipHandler extends CompressionHandler {
102 compressMessage(message) {
103 return new Promise((resolve, reject) => {
104 zlib.gzip(message, (err, output) => {
105 if (err) {
106 reject(err);
107 }
108 else {
109 resolve(output);
110 }
111 });
112 });
113 }
114 decompressMessage(message) {
115 return new Promise((resolve, reject) => {
116 zlib.unzip(message, (err, output) => {
117 if (err) {
118 reject(err);
119 }
120 else {
121 resolve(output);
122 }
123 });
124 });
125 }
126}
127class UnknownHandler extends CompressionHandler {
128 constructor(compressionName) {
129 super();
130 this.compressionName = compressionName;
131 }
132 compressMessage(message) {
133 return Promise.reject(new Error(`Received message compressed with unsupported compression method ${this.compressionName}`));
134 }
135 decompressMessage(message) {
136 // This should be unreachable
137 return Promise.reject(new Error(`Compression method not supported: ${this.compressionName}`));
138 }
139}
140function getCompressionHandler(compressionName) {
141 switch (compressionName) {
142 case 'identity':
143 return new IdentityHandler();
144 case 'deflate':
145 return new DeflateHandler();
146 case 'gzip':
147 return new GzipHandler();
148 default:
149 return new UnknownHandler(compressionName);
150 }
151}
152class CompressionFilter extends filter_1.BaseFilter {
153 constructor(channelOptions, sharedFilterConfig) {
154 var _a;
155 super();
156 this.sharedFilterConfig = sharedFilterConfig;
157 this.sendCompression = new IdentityHandler();
158 this.receiveCompression = new IdentityHandler();
159 this.currentCompressionAlgorithm = 'identity';
160 const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm'];
161 if (compressionAlgorithmKey !== undefined) {
162 if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
163 const clientSelectedEncoding = compression_algorithms_1.CompressionAlgorithms[compressionAlgorithmKey];
164 const serverSupportedEncodings = (_a = sharedFilterConfig.serverSupportedEncodingHeader) === null || _a === void 0 ? void 0 : _a.split(',');
165 /**
166 * There are two possible situations here:
167 * 1) We don't have any info yet from the server about what compression it supports
168 * In that case we should just use what the client tells us to use
169 * 2) We've previously received a response from the server including a grpc-accept-encoding header
170 * In that case we only want to use the encoding chosen by the client if the server supports it
171 */
172 if (!serverSupportedEncodings || serverSupportedEncodings.includes(clientSelectedEncoding)) {
173 this.currentCompressionAlgorithm = clientSelectedEncoding;
174 this.sendCompression = getCompressionHandler(this.currentCompressionAlgorithm);
175 }
176 }
177 else {
178 logging.log(constants_1.LogVerbosity.ERROR, `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`);
179 }
180 }
181 }
182 async sendMetadata(metadata) {
183 const headers = await metadata;
184 headers.set('grpc-accept-encoding', 'identity,deflate,gzip');
185 headers.set('accept-encoding', 'identity');
186 // No need to send the header if it's "identity" - behavior is identical; save the bandwidth
187 if (this.currentCompressionAlgorithm === 'identity') {
188 headers.remove('grpc-encoding');
189 }
190 else {
191 headers.set('grpc-encoding', this.currentCompressionAlgorithm);
192 }
193 return headers;
194 }
195 receiveMetadata(metadata) {
196 const receiveEncoding = metadata.get('grpc-encoding');
197 if (receiveEncoding.length > 0) {
198 const encoding = receiveEncoding[0];
199 if (typeof encoding === 'string') {
200 this.receiveCompression = getCompressionHandler(encoding);
201 }
202 }
203 metadata.remove('grpc-encoding');
204 /* Check to see if the compression we're using to send messages is supported by the server
205 * If not, reset the sendCompression filter and have it use the default IdentityHandler */
206 const serverSupportedEncodingsHeader = metadata.get('grpc-accept-encoding')[0];
207 if (serverSupportedEncodingsHeader) {
208 this.sharedFilterConfig.serverSupportedEncodingHeader = serverSupportedEncodingsHeader;
209 const serverSupportedEncodings = serverSupportedEncodingsHeader.split(',');
210 if (!serverSupportedEncodings.includes(this.currentCompressionAlgorithm)) {
211 this.sendCompression = new IdentityHandler();
212 this.currentCompressionAlgorithm = 'identity';
213 }
214 }
215 metadata.remove('grpc-accept-encoding');
216 return metadata;
217 }
218 async sendMessage(message) {
219 var _a;
220 /* This filter is special. The input message is the bare message bytes,
221 * and the output is a framed and possibly compressed message. For this
222 * reason, this filter should be at the bottom of the filter stack */
223 const resolvedMessage = await message;
224 let compress;
225 if (this.sendCompression instanceof IdentityHandler) {
226 compress = false;
227 }
228 else {
229 compress = (((_a = resolvedMessage.flags) !== null && _a !== void 0 ? _a : 0) & 2 /* NoCompress */) === 0;
230 }
231 return {
232 message: await this.sendCompression.writeMessage(resolvedMessage.message, compress),
233 flags: resolvedMessage.flags,
234 };
235 }
236 async receiveMessage(message) {
237 /* This filter is also special. The input message is framed and possibly
238 * compressed, and the output message is deframed and uncompressed. So
239 * this is another reason that this filter should be at the bottom of the
240 * filter stack. */
241 return this.receiveCompression.readMessage(await message);
242 }
243}
244exports.CompressionFilter = CompressionFilter;
245class CompressionFilterFactory {
246 constructor(channel, options) {
247 this.channel = channel;
248 this.options = options;
249 this.sharedFilterConfig = {};
250 }
251 createFilter(callStream) {
252 return new CompressionFilter(this.options, this.sharedFilterConfig);
253 }
254}
255exports.CompressionFilterFactory = CompressionFilterFactory;
256//# sourceMappingURL=compression-filter.js.map
\No newline at end of file