1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import * as zlib from 'zlib';
|
19 |
|
20 | import { WriteObject, WriteFlags } from './call-interface';
|
21 | import { Channel } from './channel';
|
22 | import { ChannelOptions } from './channel-options';
|
23 | import { CompressionAlgorithms } from './compression-algorithms';
|
24 | import { LogVerbosity } from './constants';
|
25 | import { BaseFilter, Filter, FilterFactory } from './filter';
|
26 | import * as logging from './logging';
|
27 | import { Metadata, MetadataValue } from './metadata';
|
28 |
|
29 | const isCompressionAlgorithmKey = (
|
30 | key: number
|
31 | ): key is CompressionAlgorithms => {
|
32 | return (
|
33 | typeof key === 'number' && typeof CompressionAlgorithms[key] === 'string'
|
34 | );
|
35 | };
|
36 |
|
37 | type CompressionAlgorithm = keyof typeof CompressionAlgorithms;
|
38 |
|
39 | type SharedCompressionFilterConfig = {
|
40 | serverSupportedEncodingHeader?: string;
|
41 | };
|
42 |
|
43 | abstract class CompressionHandler {
|
44 | protected abstract compressMessage(message: Buffer): Promise<Buffer>;
|
45 | protected abstract decompressMessage(data: Buffer): Promise<Buffer>;
|
46 | |
47 |
|
48 |
|
49 |
|
50 |
|
51 | async writeMessage(message: Buffer, compress: boolean): Promise<Buffer> {
|
52 | let messageBuffer = message;
|
53 | if (compress) {
|
54 | messageBuffer = await this.compressMessage(messageBuffer);
|
55 | }
|
56 | const output = Buffer.allocUnsafe(messageBuffer.length + 5);
|
57 | output.writeUInt8(compress ? 1 : 0, 0);
|
58 | output.writeUInt32BE(messageBuffer.length, 1);
|
59 | messageBuffer.copy(output, 5);
|
60 | return output;
|
61 | }
|
62 | |
63 |
|
64 |
|
65 |
|
66 | async readMessage(data: Buffer): Promise<Buffer> {
|
67 | const compressed = data.readUInt8(0) === 1;
|
68 | let messageBuffer = data.slice(5);
|
69 | if (compressed) {
|
70 | messageBuffer = await this.decompressMessage(messageBuffer);
|
71 | }
|
72 | return messageBuffer;
|
73 | }
|
74 | }
|
75 |
|
76 | class IdentityHandler extends CompressionHandler {
|
77 | async compressMessage(message: Buffer) {
|
78 | return message;
|
79 | }
|
80 |
|
81 | async writeMessage(message: Buffer, compress: boolean): Promise<Buffer> {
|
82 | const output = Buffer.allocUnsafe(message.length + 5);
|
83 | |
84 |
|
85 | output.writeUInt8(0, 0);
|
86 | output.writeUInt32BE(message.length, 1);
|
87 | message.copy(output, 5);
|
88 | return output;
|
89 | }
|
90 |
|
91 | decompressMessage(message: Buffer): Promise<Buffer> {
|
92 | return Promise.reject<Buffer>(
|
93 | new Error(
|
94 | 'Received compressed message but "grpc-encoding" header was identity'
|
95 | )
|
96 | );
|
97 | }
|
98 | }
|
99 |
|
100 | class DeflateHandler extends CompressionHandler {
|
101 | compressMessage(message: Buffer) {
|
102 | return new Promise<Buffer>((resolve, reject) => {
|
103 | zlib.deflate(message, (err, output) => {
|
104 | if (err) {
|
105 | reject(err);
|
106 | } else {
|
107 | resolve(output);
|
108 | }
|
109 | });
|
110 | });
|
111 | }
|
112 |
|
113 | decompressMessage(message: Buffer) {
|
114 | return new Promise<Buffer>((resolve, reject) => {
|
115 | zlib.inflate(message, (err, output) => {
|
116 | if (err) {
|
117 | reject(err);
|
118 | } else {
|
119 | resolve(output);
|
120 | }
|
121 | });
|
122 | });
|
123 | }
|
124 | }
|
125 |
|
126 | class GzipHandler extends CompressionHandler {
|
127 | compressMessage(message: Buffer) {
|
128 | return new Promise<Buffer>((resolve, reject) => {
|
129 | zlib.gzip(message, (err, output) => {
|
130 | if (err) {
|
131 | reject(err);
|
132 | } else {
|
133 | resolve(output);
|
134 | }
|
135 | });
|
136 | });
|
137 | }
|
138 |
|
139 | decompressMessage(message: Buffer) {
|
140 | return new Promise<Buffer>((resolve, reject) => {
|
141 | zlib.unzip(message, (err, output) => {
|
142 | if (err) {
|
143 | reject(err);
|
144 | } else {
|
145 | resolve(output);
|
146 | }
|
147 | });
|
148 | });
|
149 | }
|
150 | }
|
151 |
|
152 | class UnknownHandler extends CompressionHandler {
|
153 | constructor(private readonly compressionName: string) {
|
154 | super();
|
155 | }
|
156 | compressMessage(message: Buffer): Promise<Buffer> {
|
157 | return Promise.reject<Buffer>(
|
158 | new Error(
|
159 | `Received message compressed with unsupported compression method ${this.compressionName}`
|
160 | )
|
161 | );
|
162 | }
|
163 |
|
164 | decompressMessage(message: Buffer): Promise<Buffer> {
|
165 | // This should be unreachable
|
166 | return Promise.reject<Buffer>(
|
167 | new Error(`Compression method not supported: ${this.compressionName}`)
|
168 | );
|
169 | }
|
170 | }
|
171 |
|
172 | function getCompressionHandler(compressionName: string): CompressionHandler {
|
173 | switch (compressionName) {
|
174 | case 'identity':
|
175 | return new IdentityHandler();
|
176 | case 'deflate':
|
177 | return new DeflateHandler();
|
178 | case 'gzip':
|
179 | return new GzipHandler();
|
180 | default:
|
181 | return new UnknownHandler(compressionName);
|
182 | }
|
183 | }
|
184 |
|
185 | export class CompressionFilter extends BaseFilter implements Filter {
|
186 | private sendCompression: CompressionHandler = new IdentityHandler();
|
187 | private receiveCompression: CompressionHandler = new IdentityHandler();
|
188 | private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
|
189 |
|
190 | constructor(
|
191 | channelOptions: ChannelOptions,
|
192 | private sharedFilterConfig: SharedCompressionFilterConfig
|
193 | ) {
|
194 | super();
|
195 |
|
196 | const compressionAlgorithmKey =
|
197 | channelOptions['grpc.default_compression_algorithm'];
|
198 | if (compressionAlgorithmKey !== undefined) {
|
199 | if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
|
200 | const clientSelectedEncoding = CompressionAlgorithms[
|
201 | compressionAlgorithmKey
|
202 | ] as CompressionAlgorithm;
|
203 | const serverSupportedEncodings =
|
204 | sharedFilterConfig.serverSupportedEncodingHeader?.split(',');
|
205 | /**
|
206 | * There are two possible situations here:
|
207 | * 1) We don't have any info yet from the server about what compression it supports
|
208 | * In that case we should just use what the client tells us to use
|
209 | * 2) We've previously received a response from the server including a grpc-accept-encoding header
|
210 | * In that case we only want to use the encoding chosen by the client if the server supports it
|
211 | */
|
212 | if (
|
213 | !serverSupportedEncodings ||
|
214 | serverSupportedEncodings.includes(clientSelectedEncoding)
|
215 | ) {
|
216 | this.currentCompressionAlgorithm = clientSelectedEncoding;
|
217 | this.sendCompression = getCompressionHandler(
|
218 | this.currentCompressionAlgorithm
|
219 | );
|
220 | }
|
221 | } else {
|
222 | logging.log(
|
223 | LogVerbosity.ERROR,
|
224 | `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`
|
225 | );
|
226 | }
|
227 | }
|
228 | }
|
229 |
|
230 | async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
|
231 | const headers: Metadata = await metadata;
|
232 | headers.set('grpc-accept-encoding', 'identity,deflate,gzip');
|
233 | headers.set('accept-encoding', 'identity');
|
234 |
|
235 | // No need to send the header if it's "identity" - behavior is identical; save the bandwidth
|
236 | if (this.currentCompressionAlgorithm === 'identity') {
|
237 | headers.remove('grpc-encoding');
|
238 | } else {
|
239 | headers.set('grpc-encoding', this.currentCompressionAlgorithm);
|
240 | }
|
241 |
|
242 | return headers;
|
243 | }
|
244 |
|
245 | receiveMetadata(metadata: Metadata): Metadata {
|
246 | const receiveEncoding: MetadataValue[] = metadata.get('grpc-encoding');
|
247 | if (receiveEncoding.length > 0) {
|
248 | const encoding: MetadataValue = receiveEncoding[0];
|
249 | if (typeof encoding === 'string') {
|
250 | this.receiveCompression = getCompressionHandler(encoding);
|
251 | }
|
252 | }
|
253 | metadata.remove('grpc-encoding');
|
254 |
|
255 | /* Check to see if the compression we're using to send messages is supported by the server
|
256 | * If not, reset the sendCompression filter and have it use the default IdentityHandler */
|
257 | const serverSupportedEncodingsHeader = metadata.get(
|
258 | 'grpc-accept-encoding'
|
259 | )[0] as string | undefined;
|
260 | if (serverSupportedEncodingsHeader) {
|
261 | this.sharedFilterConfig.serverSupportedEncodingHeader =
|
262 | serverSupportedEncodingsHeader;
|
263 | const serverSupportedEncodings =
|
264 | serverSupportedEncodingsHeader.split(',');
|
265 |
|
266 | if (
|
267 | !serverSupportedEncodings.includes(this.currentCompressionAlgorithm)
|
268 | ) {
|
269 | this.sendCompression = new IdentityHandler();
|
270 | this.currentCompressionAlgorithm = 'identity';
|
271 | }
|
272 | }
|
273 | metadata.remove('grpc-accept-encoding');
|
274 | return metadata;
|
275 | }
|
276 |
|
277 | async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
|
278 | /* This filter is special. The input message is the bare message bytes,
|
279 | * and the output is a framed and possibly compressed message. For this
|
280 | * reason, this filter should be at the bottom of the filter stack */
|
281 | const resolvedMessage: WriteObject = await message;
|
282 | let compress: boolean;
|
283 | if (this.sendCompression instanceof IdentityHandler) {
|
284 | compress = false;
|
285 | } else {
|
286 | compress = ((resolvedMessage.flags ?? 0) & WriteFlags.NoCompress) === 0;
|
287 | }
|
288 |
|
289 | return {
|
290 | message: await this.sendCompression.writeMessage(
|
291 | resolvedMessage.message,
|
292 | compress
|
293 | ),
|
294 | flags: resolvedMessage.flags,
|
295 | };
|
296 | }
|
297 |
|
298 | async receiveMessage(message: Promise<Buffer>) {
|
299 | /* This filter is also special. The input message is framed and possibly
|
300 | * compressed, and the output message is deframed and uncompressed. So
|
301 | * this is another reason that this filter should be at the bottom of the
|
302 | * filter stack. */
|
303 | return this.receiveCompression.readMessage(await message);
|
304 | }
|
305 | }
|
306 |
|
307 | export class CompressionFilterFactory
|
308 | implements FilterFactory<CompressionFilter>
|
309 | {
|
310 | private sharedFilterConfig: SharedCompressionFilterConfig = {};
|
311 | constructor(channel: Channel, private readonly options: ChannelOptions) {}
|
312 | createFilter(): CompressionFilter {
|
313 | return new CompressionFilter(this.options, this.sharedFilterConfig);
|
314 | }
|
315 | }
|
316 |
|
\ | No newline at end of file |