UNPKG

10.3 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as zlib from 'zlib';
19
20import { WriteObject, WriteFlags } from './call-interface';
21import { Channel } from './channel';
22import { ChannelOptions } from './channel-options';
23import { CompressionAlgorithms } from './compression-algorithms';
24import { LogVerbosity } from './constants';
25import { BaseFilter, Filter, FilterFactory } from './filter';
26import * as logging from './logging';
27import { Metadata, MetadataValue } from './metadata';
28
29const isCompressionAlgorithmKey = (
30 key: number
31): key is CompressionAlgorithms => {
32 return (
33 typeof key === 'number' && typeof CompressionAlgorithms[key] === 'string'
34 );
35};
36
37type CompressionAlgorithm = keyof typeof CompressionAlgorithms;
38
39type SharedCompressionFilterConfig = {
40 serverSupportedEncodingHeader?: string;
41};
42
43abstract class CompressionHandler {
44 protected abstract compressMessage(message: Buffer): Promise<Buffer>;
45 protected abstract decompressMessage(data: Buffer): Promise<Buffer>;
46 /**
47 * @param message Raw uncompressed message bytes
48 * @param compress Indicates whether the message should be compressed
49 * @return Framed message, compressed if applicable
50 */
51 async writeMessage(message: Buffer, compress: boolean): Promise<Buffer> {
52 let messageBuffer = message;
53 if (compress) {
54 messageBuffer = await this.compressMessage(messageBuffer);
55 }
56 const output = Buffer.allocUnsafe(messageBuffer.length + 5);
57 output.writeUInt8(compress ? 1 : 0, 0);
58 output.writeUInt32BE(messageBuffer.length, 1);
59 messageBuffer.copy(output, 5);
60 return output;
61 }
62 /**
63 * @param data Framed message, possibly compressed
64 * @return Uncompressed message
65 */
66 async readMessage(data: Buffer): Promise<Buffer> {
67 const compressed = data.readUInt8(0) === 1;
68 let messageBuffer = data.slice(5);
69 if (compressed) {
70 messageBuffer = await this.decompressMessage(messageBuffer);
71 }
72 return messageBuffer;
73 }
74}
75
76class IdentityHandler extends CompressionHandler {
77 async compressMessage(message: Buffer) {
78 return message;
79 }
80
81 async writeMessage(message: Buffer, compress: boolean): Promise<Buffer> {
82 const output = Buffer.allocUnsafe(message.length + 5);
83 /* With "identity" compression, messages should always be marked as
84 * uncompressed */
85 output.writeUInt8(0, 0);
86 output.writeUInt32BE(message.length, 1);
87 message.copy(output, 5);
88 return output;
89 }
90
91 decompressMessage(message: Buffer): Promise<Buffer> {
92 return Promise.reject<Buffer>(
93 new Error(
94 'Received compressed message but "grpc-encoding" header was identity'
95 )
96 );
97 }
98}
99
100class DeflateHandler extends CompressionHandler {
101 compressMessage(message: Buffer) {
102 return new Promise<Buffer>((resolve, reject) => {
103 zlib.deflate(message, (err, output) => {
104 if (err) {
105 reject(err);
106 } else {
107 resolve(output);
108 }
109 });
110 });
111 }
112
113 decompressMessage(message: Buffer) {
114 return new Promise<Buffer>((resolve, reject) => {
115 zlib.inflate(message, (err, output) => {
116 if (err) {
117 reject(err);
118 } else {
119 resolve(output);
120 }
121 });
122 });
123 }
124}
125
126class GzipHandler extends CompressionHandler {
127 compressMessage(message: Buffer) {
128 return new Promise<Buffer>((resolve, reject) => {
129 zlib.gzip(message, (err, output) => {
130 if (err) {
131 reject(err);
132 } else {
133 resolve(output);
134 }
135 });
136 });
137 }
138
139 decompressMessage(message: Buffer) {
140 return new Promise<Buffer>((resolve, reject) => {
141 zlib.unzip(message, (err, output) => {
142 if (err) {
143 reject(err);
144 } else {
145 resolve(output);
146 }
147 });
148 });
149 }
150}
151
152class UnknownHandler extends CompressionHandler {
153 constructor(private readonly compressionName: string) {
154 super();
155 }
156 compressMessage(message: Buffer): Promise<Buffer> {
157 return Promise.reject<Buffer>(
158 new Error(
159 `Received message compressed with unsupported compression method ${this.compressionName}`
160 )
161 );
162 }
163
164 decompressMessage(message: Buffer): Promise<Buffer> {
165 // This should be unreachable
166 return Promise.reject<Buffer>(
167 new Error(`Compression method not supported: ${this.compressionName}`)
168 );
169 }
170}
171
172function getCompressionHandler(compressionName: string): CompressionHandler {
173 switch (compressionName) {
174 case 'identity':
175 return new IdentityHandler();
176 case 'deflate':
177 return new DeflateHandler();
178 case 'gzip':
179 return new GzipHandler();
180 default:
181 return new UnknownHandler(compressionName);
182 }
183}
184
185export class CompressionFilter extends BaseFilter implements Filter {
186 private sendCompression: CompressionHandler = new IdentityHandler();
187 private receiveCompression: CompressionHandler = new IdentityHandler();
188 private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
189
190 constructor(
191 channelOptions: ChannelOptions,
192 private sharedFilterConfig: SharedCompressionFilterConfig
193 ) {
194 super();
195
196 const compressionAlgorithmKey =
197 channelOptions['grpc.default_compression_algorithm'];
198 if (compressionAlgorithmKey !== undefined) {
199 if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
200 const clientSelectedEncoding = CompressionAlgorithms[
201 compressionAlgorithmKey
202 ] as CompressionAlgorithm;
203 const serverSupportedEncodings =
204 sharedFilterConfig.serverSupportedEncodingHeader?.split(',');
205 /**
206 * There are two possible situations here:
207 * 1) We don't have any info yet from the server about what compression it supports
208 * In that case we should just use what the client tells us to use
209 * 2) We've previously received a response from the server including a grpc-accept-encoding header
210 * In that case we only want to use the encoding chosen by the client if the server supports it
211 */
212 if (
213 !serverSupportedEncodings ||
214 serverSupportedEncodings.includes(clientSelectedEncoding)
215 ) {
216 this.currentCompressionAlgorithm = clientSelectedEncoding;
217 this.sendCompression = getCompressionHandler(
218 this.currentCompressionAlgorithm
219 );
220 }
221 } else {
222 logging.log(
223 LogVerbosity.ERROR,
224 `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`
225 );
226 }
227 }
228 }
229
230 async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
231 const headers: Metadata = await metadata;
232 headers.set('grpc-accept-encoding', 'identity,deflate,gzip');
233 headers.set('accept-encoding', 'identity');
234
235 // No need to send the header if it's "identity" - behavior is identical; save the bandwidth
236 if (this.currentCompressionAlgorithm === 'identity') {
237 headers.remove('grpc-encoding');
238 } else {
239 headers.set('grpc-encoding', this.currentCompressionAlgorithm);
240 }
241
242 return headers;
243 }
244
245 receiveMetadata(metadata: Metadata): Metadata {
246 const receiveEncoding: MetadataValue[] = metadata.get('grpc-encoding');
247 if (receiveEncoding.length > 0) {
248 const encoding: MetadataValue = receiveEncoding[0];
249 if (typeof encoding === 'string') {
250 this.receiveCompression = getCompressionHandler(encoding);
251 }
252 }
253 metadata.remove('grpc-encoding');
254
255 /* Check to see if the compression we're using to send messages is supported by the server
256 * If not, reset the sendCompression filter and have it use the default IdentityHandler */
257 const serverSupportedEncodingsHeader = metadata.get(
258 'grpc-accept-encoding'
259 )[0] as string | undefined;
260 if (serverSupportedEncodingsHeader) {
261 this.sharedFilterConfig.serverSupportedEncodingHeader =
262 serverSupportedEncodingsHeader;
263 const serverSupportedEncodings =
264 serverSupportedEncodingsHeader.split(',');
265
266 if (
267 !serverSupportedEncodings.includes(this.currentCompressionAlgorithm)
268 ) {
269 this.sendCompression = new IdentityHandler();
270 this.currentCompressionAlgorithm = 'identity';
271 }
272 }
273 metadata.remove('grpc-accept-encoding');
274 return metadata;
275 }
276
277 async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
278 /* This filter is special. The input message is the bare message bytes,
279 * and the output is a framed and possibly compressed message. For this
280 * reason, this filter should be at the bottom of the filter stack */
281 const resolvedMessage: WriteObject = await message;
282 let compress: boolean;
283 if (this.sendCompression instanceof IdentityHandler) {
284 compress = false;
285 } else {
286 compress = ((resolvedMessage.flags ?? 0) & WriteFlags.NoCompress) === 0;
287 }
288
289 return {
290 message: await this.sendCompression.writeMessage(
291 resolvedMessage.message,
292 compress
293 ),
294 flags: resolvedMessage.flags,
295 };
296 }
297
298 async receiveMessage(message: Promise<Buffer>) {
299 /* This filter is also special. The input message is framed and possibly
300 * compressed, and the output message is deframed and uncompressed. So
301 * this is another reason that this filter should be at the bottom of the
302 * filter stack. */
303 return this.receiveCompression.readMessage(await message);
304 }
305}
306
307export class CompressionFilterFactory
308 implements FilterFactory<CompressionFilter>
309{
310 private sharedFilterConfig: SharedCompressionFilterConfig = {};
311 constructor(channel: Channel, private readonly options: ChannelOptions) {}
312 createFilter(): CompressionFilter {
313 return new CompressionFilter(this.options, this.sharedFilterConfig);
314 }
315}
316
\No newline at end of file