UNPKG

10.2 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as zlib from 'zlib';
19
20import { Call, WriteObject, WriteFlags } from './call-stream';
21import { Channel } from './channel';
22import { ChannelOptions } from './channel-options';
23import { CompressionAlgorithms } from './compression-algorithms';
24import { LogVerbosity } from './constants';
25import { BaseFilter, Filter, FilterFactory } from './filter';
26import * as logging from './logging';
27import { Metadata, MetadataValue } from './metadata';
28
29const isCompressionAlgorithmKey = (key: number): key is CompressionAlgorithms => {
30 return typeof key === 'number' && typeof CompressionAlgorithms[key] === 'string';
31}
32
33type CompressionAlgorithm = keyof typeof CompressionAlgorithms;
34
35type SharedCompressionFilterConfig = {
36 serverSupportedEncodingHeader?: string;
37};
38
39abstract class CompressionHandler {
40 protected abstract compressMessage(message: Buffer): Promise<Buffer>;
41 protected abstract decompressMessage(data: Buffer): Promise<Buffer>;
42 /**
43 * @param message Raw uncompressed message bytes
44 * @param compress Indicates whether the message should be compressed
45 * @return Framed message, compressed if applicable
46 */
47 async writeMessage(message: Buffer, compress: boolean): Promise<Buffer> {
48 let messageBuffer = message;
49 if (compress) {
50 messageBuffer = await this.compressMessage(messageBuffer);
51 }
52 const output = Buffer.allocUnsafe(messageBuffer.length + 5);
53 output.writeUInt8(compress ? 1 : 0, 0);
54 output.writeUInt32BE(messageBuffer.length, 1);
55 messageBuffer.copy(output, 5);
56 return output;
57 }
58 /**
59 * @param data Framed message, possibly compressed
60 * @return Uncompressed message
61 */
62 async readMessage(data: Buffer): Promise<Buffer> {
63 const compressed = data.readUInt8(0) === 1;
64 let messageBuffer = data.slice(5);
65 if (compressed) {
66 messageBuffer = await this.decompressMessage(messageBuffer);
67 }
68 return messageBuffer;
69 }
70}
71
72class IdentityHandler extends CompressionHandler {
73 async compressMessage(message: Buffer) {
74 return message;
75 }
76
77 async writeMessage(message: Buffer, compress: boolean): Promise<Buffer> {
78 const output = Buffer.allocUnsafe(message.length + 5);
79 /* With "identity" compression, messages should always be marked as
80 * uncompressed */
81 output.writeUInt8(0, 0);
82 output.writeUInt32BE(message.length, 1);
83 message.copy(output, 5);
84 return output;
85 }
86
87 decompressMessage(message: Buffer): Promise<Buffer> {
88 return Promise.reject<Buffer>(
89 new Error(
90 'Received compressed message but "grpc-encoding" header was identity'
91 )
92 );
93 }
94}
95
96class DeflateHandler extends CompressionHandler {
97 compressMessage(message: Buffer) {
98 return new Promise<Buffer>((resolve, reject) => {
99 zlib.deflate(message, (err, output) => {
100 if (err) {
101 reject(err);
102 } else {
103 resolve(output);
104 }
105 });
106 });
107 }
108
109 decompressMessage(message: Buffer) {
110 return new Promise<Buffer>((resolve, reject) => {
111 zlib.inflate(message, (err, output) => {
112 if (err) {
113 reject(err);
114 } else {
115 resolve(output);
116 }
117 });
118 });
119 }
120}
121
122class GzipHandler extends CompressionHandler {
123 compressMessage(message: Buffer) {
124 return new Promise<Buffer>((resolve, reject) => {
125 zlib.gzip(message, (err, output) => {
126 if (err) {
127 reject(err);
128 } else {
129 resolve(output);
130 }
131 });
132 });
133 }
134
135 decompressMessage(message: Buffer) {
136 return new Promise<Buffer>((resolve, reject) => {
137 zlib.unzip(message, (err, output) => {
138 if (err) {
139 reject(err);
140 } else {
141 resolve(output);
142 }
143 });
144 });
145 }
146}
147
148class UnknownHandler extends CompressionHandler {
149 constructor(private readonly compressionName: string) {
150 super();
151 }
152 compressMessage(message: Buffer): Promise<Buffer> {
153 return Promise.reject<Buffer>(
154 new Error(
155 `Received message compressed with unsupported compression method ${this.compressionName}`
156 )
157 );
158 }
159
160 decompressMessage(message: Buffer): Promise<Buffer> {
161 // This should be unreachable
162 return Promise.reject<Buffer>(
163 new Error(`Compression method not supported: ${this.compressionName}`)
164 );
165 }
166}
167
168function getCompressionHandler(compressionName: string): CompressionHandler {
169 switch (compressionName) {
170 case 'identity':
171 return new IdentityHandler();
172 case 'deflate':
173 return new DeflateHandler();
174 case 'gzip':
175 return new GzipHandler();
176 default:
177 return new UnknownHandler(compressionName);
178 }
179}
180
181export class CompressionFilter extends BaseFilter implements Filter {
182 private sendCompression: CompressionHandler = new IdentityHandler();
183 private receiveCompression: CompressionHandler = new IdentityHandler();
184 private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
185
186 constructor(channelOptions: ChannelOptions, private sharedFilterConfig: SharedCompressionFilterConfig) {
187 super();
188
189 const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm'];
190 if (compressionAlgorithmKey !== undefined) {
191 if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
192 const clientSelectedEncoding = CompressionAlgorithms[compressionAlgorithmKey] as CompressionAlgorithm;
193 const serverSupportedEncodings = sharedFilterConfig.serverSupportedEncodingHeader?.split(',');
194 /**
195 * There are two possible situations here:
196 * 1) We don't have any info yet from the server about what compression it supports
197 * In that case we should just use what the client tells us to use
198 * 2) We've previously received a response from the server including a grpc-accept-encoding header
199 * In that case we only want to use the encoding chosen by the client if the server supports it
200 */
201 if (!serverSupportedEncodings || serverSupportedEncodings.includes(clientSelectedEncoding)) {
202 this.currentCompressionAlgorithm = clientSelectedEncoding;
203 this.sendCompression = getCompressionHandler(this.currentCompressionAlgorithm);
204 }
205 } else {
206 logging.log(LogVerbosity.ERROR, `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`);
207 }
208 }
209 }
210
211 async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
212 const headers: Metadata = await metadata;
213 headers.set('grpc-accept-encoding', 'identity,deflate,gzip');
214 headers.set('accept-encoding', 'identity');
215
216 // No need to send the header if it's "identity" - behavior is identical; save the bandwidth
217 if (this.currentCompressionAlgorithm === 'identity') {
218 headers.remove('grpc-encoding');
219 } else {
220 headers.set('grpc-encoding', this.currentCompressionAlgorithm);
221 }
222
223 return headers;
224 }
225
226 receiveMetadata(metadata: Metadata): Metadata {
227 const receiveEncoding: MetadataValue[] = metadata.get('grpc-encoding');
228 if (receiveEncoding.length > 0) {
229 const encoding: MetadataValue = receiveEncoding[0];
230 if (typeof encoding === 'string') {
231 this.receiveCompression = getCompressionHandler(encoding);
232 }
233 }
234 metadata.remove('grpc-encoding');
235
236 /* Check to see if the compression we're using to send messages is supported by the server
237 * If not, reset the sendCompression filter and have it use the default IdentityHandler */
238 const serverSupportedEncodingsHeader = metadata.get('grpc-accept-encoding')[0] as string | undefined;
239 if (serverSupportedEncodingsHeader) {
240 this.sharedFilterConfig.serverSupportedEncodingHeader = serverSupportedEncodingsHeader;
241 const serverSupportedEncodings = serverSupportedEncodingsHeader.split(',');
242
243 if (!serverSupportedEncodings.includes(this.currentCompressionAlgorithm)) {
244 this.sendCompression = new IdentityHandler();
245 this.currentCompressionAlgorithm = 'identity';
246 }
247 }
248 metadata.remove('grpc-accept-encoding');
249 return metadata;
250 }
251
252 async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
253 /* This filter is special. The input message is the bare message bytes,
254 * and the output is a framed and possibly compressed message. For this
255 * reason, this filter should be at the bottom of the filter stack */
256 const resolvedMessage: WriteObject = await message;
257 let compress: boolean;
258 if (this.sendCompression instanceof IdentityHandler) {
259 compress = false;
260 } else {
261 compress = ((resolvedMessage.flags ?? 0) & WriteFlags.NoCompress) === 0;
262 }
263
264 return {
265 message: await this.sendCompression.writeMessage(
266 resolvedMessage.message,
267 compress
268 ),
269 flags: resolvedMessage.flags,
270 };
271 }
272
273 async receiveMessage(message: Promise<Buffer>) {
274 /* This filter is also special. The input message is framed and possibly
275 * compressed, and the output message is deframed and uncompressed. So
276 * this is another reason that this filter should be at the bottom of the
277 * filter stack. */
278 return this.receiveCompression.readMessage(await message);
279 }
280}
281
282export class CompressionFilterFactory
283 implements FilterFactory<CompressionFilter> {
284 private sharedFilterConfig: SharedCompressionFilterConfig = {};
285 constructor(private readonly channel: Channel, private readonly options: ChannelOptions) {}
286 createFilter(callStream: Call): CompressionFilter {
287 return new CompressionFilter(this.options, this.sharedFilterConfig);
288 }
289}
290
\No newline at end of file