1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import * as zlib from 'zlib';
|
19 |
|
20 | import { Call, WriteObject, WriteFlags } from './call-stream';
|
21 | import { Channel } from './channel';
|
22 | import { ChannelOptions } from './channel-options';
|
23 | import { CompressionAlgorithms } from './compression-algorithms';
|
24 | import { LogVerbosity } from './constants';
|
25 | import { BaseFilter, Filter, FilterFactory } from './filter';
|
26 | import * as logging from './logging';
|
27 | import { Metadata, MetadataValue } from './metadata';
|
28 |
|
29 | const isCompressionAlgorithmKey = (key: number): key is CompressionAlgorithms => {
|
30 | return typeof key === 'number' && typeof CompressionAlgorithms[key] === 'string';
|
31 | }
|
32 |
|
33 | type CompressionAlgorithm = keyof typeof CompressionAlgorithms;
|
34 |
|
35 | type SharedCompressionFilterConfig = {
|
36 | serverSupportedEncodingHeader?: string;
|
37 | };
|
38 |
|
39 | abstract class CompressionHandler {
|
40 | protected abstract compressMessage(message: Buffer): Promise<Buffer>;
|
41 | protected abstract decompressMessage(data: Buffer): Promise<Buffer>;
|
42 | |
43 |
|
44 |
|
45 |
|
46 |
|
47 | async writeMessage(message: Buffer, compress: boolean): Promise<Buffer> {
|
48 | let messageBuffer = message;
|
49 | if (compress) {
|
50 | messageBuffer = await this.compressMessage(messageBuffer);
|
51 | }
|
52 | const output = Buffer.allocUnsafe(messageBuffer.length + 5);
|
53 | output.writeUInt8(compress ? 1 : 0, 0);
|
54 | output.writeUInt32BE(messageBuffer.length, 1);
|
55 | messageBuffer.copy(output, 5);
|
56 | return output;
|
57 | }
|
58 | |
59 |
|
60 |
|
61 |
|
62 | async readMessage(data: Buffer): Promise<Buffer> {
|
63 | const compressed = data.readUInt8(0) === 1;
|
64 | let messageBuffer = data.slice(5);
|
65 | if (compressed) {
|
66 | messageBuffer = await this.decompressMessage(messageBuffer);
|
67 | }
|
68 | return messageBuffer;
|
69 | }
|
70 | }
|
71 |
|
72 | class IdentityHandler extends CompressionHandler {
|
73 | async compressMessage(message: Buffer) {
|
74 | return message;
|
75 | }
|
76 |
|
77 | async writeMessage(message: Buffer, compress: boolean): Promise<Buffer> {
|
78 | const output = Buffer.allocUnsafe(message.length + 5);
|
79 | |
80 |
|
81 | output.writeUInt8(0, 0);
|
82 | output.writeUInt32BE(message.length, 1);
|
83 | message.copy(output, 5);
|
84 | return output;
|
85 | }
|
86 |
|
87 | decompressMessage(message: Buffer): Promise<Buffer> {
|
88 | return Promise.reject<Buffer>(
|
89 | new Error(
|
90 | 'Received compressed message but "grpc-encoding" header was identity'
|
91 | )
|
92 | );
|
93 | }
|
94 | }
|
95 |
|
96 | class DeflateHandler extends CompressionHandler {
|
97 | compressMessage(message: Buffer) {
|
98 | return new Promise<Buffer>((resolve, reject) => {
|
99 | zlib.deflate(message, (err, output) => {
|
100 | if (err) {
|
101 | reject(err);
|
102 | } else {
|
103 | resolve(output);
|
104 | }
|
105 | });
|
106 | });
|
107 | }
|
108 |
|
109 | decompressMessage(message: Buffer) {
|
110 | return new Promise<Buffer>((resolve, reject) => {
|
111 | zlib.inflate(message, (err, output) => {
|
112 | if (err) {
|
113 | reject(err);
|
114 | } else {
|
115 | resolve(output);
|
116 | }
|
117 | });
|
118 | });
|
119 | }
|
120 | }
|
121 |
|
122 | class GzipHandler extends CompressionHandler {
|
123 | compressMessage(message: Buffer) {
|
124 | return new Promise<Buffer>((resolve, reject) => {
|
125 | zlib.gzip(message, (err, output) => {
|
126 | if (err) {
|
127 | reject(err);
|
128 | } else {
|
129 | resolve(output);
|
130 | }
|
131 | });
|
132 | });
|
133 | }
|
134 |
|
135 | decompressMessage(message: Buffer) {
|
136 | return new Promise<Buffer>((resolve, reject) => {
|
137 | zlib.unzip(message, (err, output) => {
|
138 | if (err) {
|
139 | reject(err);
|
140 | } else {
|
141 | resolve(output);
|
142 | }
|
143 | });
|
144 | });
|
145 | }
|
146 | }
|
147 |
|
148 | class UnknownHandler extends CompressionHandler {
|
149 | constructor(private readonly compressionName: string) {
|
150 | super();
|
151 | }
|
152 | compressMessage(message: Buffer): Promise<Buffer> {
|
153 | return Promise.reject<Buffer>(
|
154 | new Error(
|
155 | `Received message compressed with unsupported compression method ${this.compressionName}`
|
156 | )
|
157 | );
|
158 | }
|
159 |
|
160 | decompressMessage(message: Buffer): Promise<Buffer> {
|
161 | // This should be unreachable
|
162 | return Promise.reject<Buffer>(
|
163 | new Error(`Compression method not supported: ${this.compressionName}`)
|
164 | );
|
165 | }
|
166 | }
|
167 |
|
168 | function getCompressionHandler(compressionName: string): CompressionHandler {
|
169 | switch (compressionName) {
|
170 | case 'identity':
|
171 | return new IdentityHandler();
|
172 | case 'deflate':
|
173 | return new DeflateHandler();
|
174 | case 'gzip':
|
175 | return new GzipHandler();
|
176 | default:
|
177 | return new UnknownHandler(compressionName);
|
178 | }
|
179 | }
|
180 |
|
181 | export class CompressionFilter extends BaseFilter implements Filter {
|
182 | private sendCompression: CompressionHandler = new IdentityHandler();
|
183 | private receiveCompression: CompressionHandler = new IdentityHandler();
|
184 | private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
|
185 |
|
186 | constructor(channelOptions: ChannelOptions, private sharedFilterConfig: SharedCompressionFilterConfig) {
|
187 | super();
|
188 |
|
189 | const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm'];
|
190 | if (compressionAlgorithmKey !== undefined) {
|
191 | if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
|
192 | const clientSelectedEncoding = CompressionAlgorithms[compressionAlgorithmKey] as CompressionAlgorithm;
|
193 | const serverSupportedEncodings = sharedFilterConfig.serverSupportedEncodingHeader?.split(',');
|
194 | /**
|
195 | * There are two possible situations here:
|
196 | * 1) We don't have any info yet from the server about what compression it supports
|
197 | * In that case we should just use what the client tells us to use
|
198 | * 2) We've previously received a response from the server including a grpc-accept-encoding header
|
199 | * In that case we only want to use the encoding chosen by the client if the server supports it
|
200 | */
|
201 | if (!serverSupportedEncodings || serverSupportedEncodings.includes(clientSelectedEncoding)) {
|
202 | this.currentCompressionAlgorithm = clientSelectedEncoding;
|
203 | this.sendCompression = getCompressionHandler(this.currentCompressionAlgorithm);
|
204 | }
|
205 | } else {
|
206 | logging.log(LogVerbosity.ERROR, `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`);
|
207 | }
|
208 | }
|
209 | }
|
210 |
|
211 | async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
|
212 | const headers: Metadata = await metadata;
|
213 | headers.set('grpc-accept-encoding', 'identity,deflate,gzip');
|
214 | headers.set('accept-encoding', 'identity');
|
215 |
|
216 | // No need to send the header if it's "identity" - behavior is identical; save the bandwidth
|
217 | if (this.currentCompressionAlgorithm === 'identity') {
|
218 | headers.remove('grpc-encoding');
|
219 | } else {
|
220 | headers.set('grpc-encoding', this.currentCompressionAlgorithm);
|
221 | }
|
222 |
|
223 | return headers;
|
224 | }
|
225 |
|
226 | receiveMetadata(metadata: Metadata): Metadata {
|
227 | const receiveEncoding: MetadataValue[] = metadata.get('grpc-encoding');
|
228 | if (receiveEncoding.length > 0) {
|
229 | const encoding: MetadataValue = receiveEncoding[0];
|
230 | if (typeof encoding === 'string') {
|
231 | this.receiveCompression = getCompressionHandler(encoding);
|
232 | }
|
233 | }
|
234 | metadata.remove('grpc-encoding');
|
235 |
|
236 | /* Check to see if the compression we're using to send messages is supported by the server
|
237 | * If not, reset the sendCompression filter and have it use the default IdentityHandler */
|
238 | const serverSupportedEncodingsHeader = metadata.get('grpc-accept-encoding')[0] as string | undefined;
|
239 | if (serverSupportedEncodingsHeader) {
|
240 | this.sharedFilterConfig.serverSupportedEncodingHeader = serverSupportedEncodingsHeader;
|
241 | const serverSupportedEncodings = serverSupportedEncodingsHeader.split(',');
|
242 |
|
243 | if (!serverSupportedEncodings.includes(this.currentCompressionAlgorithm)) {
|
244 | this.sendCompression = new IdentityHandler();
|
245 | this.currentCompressionAlgorithm = 'identity';
|
246 | }
|
247 | }
|
248 | metadata.remove('grpc-accept-encoding');
|
249 | return metadata;
|
250 | }
|
251 |
|
252 | async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
|
253 | /* This filter is special. The input message is the bare message bytes,
|
254 | * and the output is a framed and possibly compressed message. For this
|
255 | * reason, this filter should be at the bottom of the filter stack */
|
256 | const resolvedMessage: WriteObject = await message;
|
257 | let compress: boolean;
|
258 | if (this.sendCompression instanceof IdentityHandler) {
|
259 | compress = false;
|
260 | } else {
|
261 | compress = ((resolvedMessage.flags ?? 0) & WriteFlags.NoCompress) === 0;
|
262 | }
|
263 |
|
264 | return {
|
265 | message: await this.sendCompression.writeMessage(
|
266 | resolvedMessage.message,
|
267 | compress
|
268 | ),
|
269 | flags: resolvedMessage.flags,
|
270 | };
|
271 | }
|
272 |
|
273 | async receiveMessage(message: Promise<Buffer>) {
|
274 | /* This filter is also special. The input message is framed and possibly
|
275 | * compressed, and the output message is deframed and uncompressed. So
|
276 | * this is another reason that this filter should be at the bottom of the
|
277 | * filter stack. */
|
278 | return this.receiveCompression.readMessage(await message);
|
279 | }
|
280 | }
|
281 |
|
282 | export class CompressionFilterFactory
|
283 | implements FilterFactory<CompressionFilter> {
|
284 | private sharedFilterConfig: SharedCompressionFilterConfig = {};
|
285 | constructor(private readonly channel: Channel, private readonly options: ChannelOptions) {}
|
286 | createFilter(callStream: Call): CompressionFilter {
|
287 | return new CompressionFilter(this.options, this.sharedFilterConfig);
|
288 | }
|
289 | }
|
290 |
|
\ | No newline at end of file |