/* * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ import * as zlib from 'zlib'; import { Call, WriteObject, WriteFlags } from './call-stream'; import { Channel } from './channel'; import { ChannelOptions } from './channel-options'; import { CompressionAlgorithms } from './compression-algorithms'; import { LogVerbosity } from './constants'; import { BaseFilter, Filter, FilterFactory } from './filter'; import * as logging from './logging'; import { Metadata, MetadataValue } from './metadata'; const isCompressionAlgorithmKey = (key: number): key is CompressionAlgorithms => { return typeof key === 'number' && typeof CompressionAlgorithms[key] === 'string'; } type CompressionAlgorithm = keyof typeof CompressionAlgorithms; type SharedCompressionFilterConfig = { serverSupportedEncodingHeader?: string; }; abstract class CompressionHandler { protected abstract compressMessage(message: Buffer): Promise; protected abstract decompressMessage(data: Buffer): Promise; /** * @param message Raw uncompressed message bytes * @param compress Indicates whether the message should be compressed * @return Framed message, compressed if applicable */ async writeMessage(message: Buffer, compress: boolean): Promise { let messageBuffer = message; if (compress) { messageBuffer = await this.compressMessage(messageBuffer); } const output = Buffer.allocUnsafe(messageBuffer.length + 5); output.writeUInt8(compress ? 1 : 0, 0); output.writeUInt32BE(messageBuffer.length, 1); messageBuffer.copy(output, 5); return output; } /** * @param data Framed message, possibly compressed * @return Uncompressed message */ async readMessage(data: Buffer): Promise { const compressed = data.readUInt8(0) === 1; let messageBuffer = data.slice(5); if (compressed) { messageBuffer = await this.decompressMessage(messageBuffer); } return messageBuffer; } } class IdentityHandler extends CompressionHandler { async compressMessage(message: Buffer) { return message; } async writeMessage(message: Buffer, compress: boolean): Promise { const output = Buffer.allocUnsafe(message.length + 5); /* With "identity" compression, messages should always be marked as * uncompressed */ output.writeUInt8(0, 0); output.writeUInt32BE(message.length, 1); message.copy(output, 5); return output; } decompressMessage(message: Buffer): Promise { return Promise.reject( new Error( 'Received compressed message but "grpc-encoding" header was identity' ) ); } } class DeflateHandler extends CompressionHandler { compressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.deflate(message, (err, output) => { if (err) { reject(err); } else { resolve(output); } }); }); } decompressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.inflate(message, (err, output) => { if (err) { reject(err); } else { resolve(output); } }); }); } } class GzipHandler extends CompressionHandler { compressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.gzip(message, (err, output) => { if (err) { reject(err); } else { resolve(output); } }); }); } decompressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.unzip(message, (err, output) => { if (err) { reject(err); } else { resolve(output); } }); }); } } class UnknownHandler extends CompressionHandler { constructor(private readonly compressionName: string) { super(); } compressMessage(message: Buffer): Promise { return Promise.reject( new Error( `Received message compressed with unsupported compression method ${this.compressionName}` ) ); } decompressMessage(message: Buffer): Promise { // This should be unreachable return Promise.reject( new Error(`Compression method not supported: ${this.compressionName}`) ); } } function getCompressionHandler(compressionName: string): CompressionHandler { switch (compressionName) { case 'identity': return new IdentityHandler(); case 'deflate': return new DeflateHandler(); case 'gzip': return new GzipHandler(); default: return new UnknownHandler(compressionName); } } export class CompressionFilter extends BaseFilter implements Filter { private sendCompression: CompressionHandler = new IdentityHandler(); private receiveCompression: CompressionHandler = new IdentityHandler(); private currentCompressionAlgorithm: CompressionAlgorithm = 'identity'; constructor(channelOptions: ChannelOptions, private sharedFilterConfig: SharedCompressionFilterConfig) { super(); const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm']; if (compressionAlgorithmKey !== undefined) { if (isCompressionAlgorithmKey(compressionAlgorithmKey)) { const clientSelectedEncoding = CompressionAlgorithms[compressionAlgorithmKey] as CompressionAlgorithm; const serverSupportedEncodings = sharedFilterConfig.serverSupportedEncodingHeader?.split(','); /** * There are two possible situations here: * 1) We don't have any info yet from the server about what compression it supports * In that case we should just use what the client tells us to use * 2) We've previously received a response from the server including a grpc-accept-encoding header * In that case we only want to use the encoding chosen by the client if the server supports it */ if (!serverSupportedEncodings || serverSupportedEncodings.includes(clientSelectedEncoding)) { this.currentCompressionAlgorithm = clientSelectedEncoding; this.sendCompression = getCompressionHandler(this.currentCompressionAlgorithm); } } else { logging.log(LogVerbosity.ERROR, `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`); } } } async sendMetadata(metadata: Promise): Promise { const headers: Metadata = await metadata; headers.set('grpc-accept-encoding', 'identity,deflate,gzip'); headers.set('accept-encoding', 'identity'); // No need to send the header if it's "identity" - behavior is identical; save the bandwidth if (this.currentCompressionAlgorithm === 'identity') { headers.remove('grpc-encoding'); } else { headers.set('grpc-encoding', this.currentCompressionAlgorithm); } return headers; } receiveMetadata(metadata: Metadata): Metadata { const receiveEncoding: MetadataValue[] = metadata.get('grpc-encoding'); if (receiveEncoding.length > 0) { const encoding: MetadataValue = receiveEncoding[0]; if (typeof encoding === 'string') { this.receiveCompression = getCompressionHandler(encoding); } } metadata.remove('grpc-encoding'); /* Check to see if the compression we're using to send messages is supported by the server * If not, reset the sendCompression filter and have it use the default IdentityHandler */ const serverSupportedEncodingsHeader = metadata.get('grpc-accept-encoding')[0] as string | undefined; if (serverSupportedEncodingsHeader) { this.sharedFilterConfig.serverSupportedEncodingHeader = serverSupportedEncodingsHeader; const serverSupportedEncodings = serverSupportedEncodingsHeader.split(','); if (!serverSupportedEncodings.includes(this.currentCompressionAlgorithm)) { this.sendCompression = new IdentityHandler(); this.currentCompressionAlgorithm = 'identity'; } } metadata.remove('grpc-accept-encoding'); return metadata; } async sendMessage(message: Promise): Promise { /* This filter is special. The input message is the bare message bytes, * and the output is a framed and possibly compressed message. For this * reason, this filter should be at the bottom of the filter stack */ const resolvedMessage: WriteObject = await message; let compress: boolean; if (this.sendCompression instanceof IdentityHandler) { compress = false; } else { compress = ((resolvedMessage.flags ?? 0) & WriteFlags.NoCompress) === 0; } return { message: await this.sendCompression.writeMessage( resolvedMessage.message, compress ), flags: resolvedMessage.flags, }; } async receiveMessage(message: Promise) { /* This filter is also special. The input message is framed and possibly * compressed, and the output message is deframed and uncompressed. So * this is another reason that this filter should be at the bottom of the * filter stack. */ return this.receiveCompression.readMessage(await message); } } export class CompressionFilterFactory implements FilterFactory { private sharedFilterConfig: SharedCompressionFilterConfig = {}; constructor(private readonly channel: Channel, private readonly options: ChannelOptions) {} createFilter(callStream: Call): CompressionFilter { return new CompressionFilter(this.options, this.sharedFilterConfig); } }