UNPKG

26.6 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { EventEmitter } from 'events';
19import * as http2 from 'http2';
20import { Duplex, Readable, Writable } from 'stream';
21import * as zlib from 'zlib';
22import { promisify } from 'util';
23
24import { Deadline, StatusObject, PartialStatusObject } from './call-stream';
25import {
26 Status,
27 DEFAULT_MAX_SEND_MESSAGE_LENGTH,
28 DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
29 LogVerbosity,
30} from './constants';
31import { Deserialize, Serialize } from './make-client';
32import { Metadata } from './metadata';
33import { StreamDecoder } from './stream-decoder';
34import { ObjectReadable, ObjectWritable } from './object-stream';
35import { ChannelOptions } from './channel-options';
36import * as logging from './logging';
37
38const TRACER_NAME = 'server_call';
39const unzip = promisify(zlib.unzip);
40const inflate = promisify(zlib.inflate);
41
42function trace(text: string): void {
43 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
44}
45
46interface DeadlineUnitIndexSignature {
47 [name: string]: number;
48}
49
50const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
51const GRPC_ENCODING_HEADER = 'grpc-encoding';
52const GRPC_MESSAGE_HEADER = 'grpc-message';
53const GRPC_STATUS_HEADER = 'grpc-status';
54const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
55const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
56const deadlineUnitsToMs: DeadlineUnitIndexSignature = {
57 H: 3600000,
58 M: 60000,
59 S: 1000,
60 m: 1,
61 u: 0.001,
62 n: 0.000001,
63};
64const defaultResponseHeaders = {
65 // TODO(cjihrig): Remove these encoding headers from the default response
66 // once compression is integrated.
67 [GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
68 [GRPC_ENCODING_HEADER]: 'identity',
69 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
70 [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
71};
72const defaultResponseOptions = {
73 waitForTrailers: true,
74} as http2.ServerStreamResponseOptions;
75
76export type ServerStatusResponse = Partial<StatusObject>;
77
78export type ServerErrorResponse = ServerStatusResponse & Error;
79
80export type ServerSurfaceCall = {
81 cancelled: boolean;
82 readonly metadata: Metadata;
83 getPeer(): string;
84 sendMetadata(responseMetadata: Metadata): void;
85 getDeadline(): Deadline;
86 getPath(): string;
87} & EventEmitter;
88
89export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
90 request: RequestType;
91};
92export type ServerReadableStream<RequestType, ResponseType> =
93 ServerSurfaceCall & ObjectReadable<RequestType>;
94export type ServerWritableStream<RequestType, ResponseType> =
95 ServerSurfaceCall &
96 ObjectWritable<ResponseType> & {
97 request: RequestType;
98 end: (metadata?: Metadata) => void;
99 };
100export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
101 ObjectReadable<RequestType> &
102 ObjectWritable<ResponseType> & { end: (metadata?: Metadata) => void };
103
104export class ServerUnaryCallImpl<RequestType, ResponseType>
105 extends EventEmitter
106 implements ServerUnaryCall<RequestType, ResponseType>
107{
108 cancelled: boolean;
109
110 constructor(
111 private call: Http2ServerCallStream<RequestType, ResponseType>,
112 public metadata: Metadata,
113 public request: RequestType
114 ) {
115 super();
116 this.cancelled = false;
117 this.call.setupSurfaceCall(this);
118 }
119
120 getPeer(): string {
121 return this.call.getPeer();
122 }
123
124 sendMetadata(responseMetadata: Metadata): void {
125 this.call.sendMetadata(responseMetadata);
126 }
127
128 getDeadline(): Deadline {
129 return this.call.getDeadline();
130 }
131
132 getPath(): string {
133 return this.call.getPath();
134 }
135}
136
137export class ServerReadableStreamImpl<RequestType, ResponseType>
138 extends Readable
139 implements ServerReadableStream<RequestType, ResponseType>
140{
141 cancelled: boolean;
142
143 constructor(
144 private call: Http2ServerCallStream<RequestType, ResponseType>,
145 public metadata: Metadata,
146 public deserialize: Deserialize<RequestType>,
147 encoding: string
148 ) {
149 super({ objectMode: true });
150 this.cancelled = false;
151 this.call.setupSurfaceCall(this);
152 this.call.setupReadable(this, encoding);
153 }
154
155 _read(size: number) {
156 if (!this.call.consumeUnpushedMessages(this)) {
157 return;
158 }
159
160 this.call.resume();
161 }
162
163 getPeer(): string {
164 return this.call.getPeer();
165 }
166
167 sendMetadata(responseMetadata: Metadata): void {
168 this.call.sendMetadata(responseMetadata);
169 }
170
171 getDeadline(): Deadline {
172 return this.call.getDeadline();
173 }
174
175 getPath(): string {
176 return this.call.getPath();
177 }
178}
179
180export class ServerWritableStreamImpl<RequestType, ResponseType>
181 extends Writable
182 implements ServerWritableStream<RequestType, ResponseType>
183{
184 cancelled: boolean;
185 private trailingMetadata: Metadata;
186
187 constructor(
188 private call: Http2ServerCallStream<RequestType, ResponseType>,
189 public metadata: Metadata,
190 public serialize: Serialize<ResponseType>,
191 public request: RequestType
192 ) {
193 super({ objectMode: true });
194 this.cancelled = false;
195 this.trailingMetadata = new Metadata();
196 this.call.setupSurfaceCall(this);
197
198 this.on('error', (err) => {
199 this.call.sendError(err);
200 this.end();
201 });
202 }
203
204 getPeer(): string {
205 return this.call.getPeer();
206 }
207
208 sendMetadata(responseMetadata: Metadata): void {
209 this.call.sendMetadata(responseMetadata);
210 }
211
212 getDeadline(): Deadline {
213 return this.call.getDeadline();
214 }
215
216 getPath(): string {
217 return this.call.getPath();
218 }
219
220 _write(
221 chunk: ResponseType,
222 encoding: string,
223 // eslint-disable-next-line @typescript-eslint/no-explicit-any
224 callback: (...args: any[]) => void
225 ) {
226 try {
227 const response = this.call.serializeMessage(chunk);
228
229 if (!this.call.write(response)) {
230 this.call.once('drain', callback);
231 return;
232 }
233 } catch (err) {
234 err.code = Status.INTERNAL;
235 this.emit('error', err);
236 }
237
238 callback();
239 }
240
241 _final(callback: Function): void {
242 this.call.sendStatus({
243 code: Status.OK,
244 details: 'OK',
245 metadata: this.trailingMetadata,
246 });
247 callback(null);
248 }
249
250 // eslint-disable-next-line @typescript-eslint/no-explicit-any
251 end(metadata?: any) {
252 if (metadata) {
253 this.trailingMetadata = metadata;
254 }
255
256 return super.end();
257 }
258}
259
260export class ServerDuplexStreamImpl<RequestType, ResponseType>
261 extends Duplex
262 implements ServerDuplexStream<RequestType, ResponseType>
263{
264 cancelled: boolean;
265 private trailingMetadata: Metadata;
266
267 constructor(
268 private call: Http2ServerCallStream<RequestType, ResponseType>,
269 public metadata: Metadata,
270 public serialize: Serialize<ResponseType>,
271 public deserialize: Deserialize<RequestType>,
272 encoding: string
273 ) {
274 super({ objectMode: true });
275 this.cancelled = false;
276 this.trailingMetadata = new Metadata();
277 this.call.setupSurfaceCall(this);
278 this.call.setupReadable(this, encoding);
279
280 this.on('error', (err) => {
281 this.call.sendError(err);
282 this.end();
283 });
284 }
285
286 getPeer(): string {
287 return this.call.getPeer();
288 }
289
290 sendMetadata(responseMetadata: Metadata): void {
291 this.call.sendMetadata(responseMetadata);
292 }
293
294 getDeadline(): Deadline {
295 return this.call.getDeadline();
296 }
297
298 getPath(): string {
299 return this.call.getPath();
300 }
301
302 // eslint-disable-next-line @typescript-eslint/no-explicit-any
303 end(metadata?: any) {
304 if (metadata) {
305 this.trailingMetadata = metadata;
306 }
307
308 return super.end();
309 }
310}
311
312ServerDuplexStreamImpl.prototype._read =
313 ServerReadableStreamImpl.prototype._read;
314ServerDuplexStreamImpl.prototype._write =
315 ServerWritableStreamImpl.prototype._write;
316ServerDuplexStreamImpl.prototype._final =
317 ServerWritableStreamImpl.prototype._final;
318
319// Unary response callback signature.
320export type sendUnaryData<ResponseType> = (
321 error: ServerErrorResponse | ServerStatusResponse | null,
322 value?: ResponseType | null,
323 trailer?: Metadata,
324 flags?: number
325) => void;
326
327// User provided handler for unary calls.
328export type handleUnaryCall<RequestType, ResponseType> = (
329 call: ServerUnaryCall<RequestType, ResponseType>,
330 callback: sendUnaryData<ResponseType>
331) => void;
332
333// User provided handler for client streaming calls.
334export type handleClientStreamingCall<RequestType, ResponseType> = (
335 call: ServerReadableStream<RequestType, ResponseType>,
336 callback: sendUnaryData<ResponseType>
337) => void;
338
339// User provided handler for server streaming calls.
340export type handleServerStreamingCall<RequestType, ResponseType> = (
341 call: ServerWritableStream<RequestType, ResponseType>
342) => void;
343
344// User provided handler for bidirectional streaming calls.
345export type handleBidiStreamingCall<RequestType, ResponseType> = (
346 call: ServerDuplexStream<RequestType, ResponseType>
347) => void;
348
349export type HandleCall<RequestType, ResponseType> =
350 | handleUnaryCall<RequestType, ResponseType>
351 | handleClientStreamingCall<RequestType, ResponseType>
352 | handleServerStreamingCall<RequestType, ResponseType>
353 | handleBidiStreamingCall<RequestType, ResponseType>;
354
355export interface UnaryHandler<RequestType, ResponseType> {
356 func: handleUnaryCall<RequestType, ResponseType>;
357 serialize: Serialize<ResponseType>;
358 deserialize: Deserialize<RequestType>;
359 type: HandlerType;
360 path: string;
361}
362
363export interface ClientStreamingHandler<RequestType, ResponseType> {
364 func: handleClientStreamingCall<RequestType, ResponseType>;
365 serialize: Serialize<ResponseType>;
366 deserialize: Deserialize<RequestType>;
367 type: HandlerType;
368 path: string;
369}
370
371export interface ServerStreamingHandler<RequestType, ResponseType> {
372 func: handleServerStreamingCall<RequestType, ResponseType>;
373 serialize: Serialize<ResponseType>;
374 deserialize: Deserialize<RequestType>;
375 type: HandlerType;
376 path: string;
377}
378
379export interface BidiStreamingHandler<RequestType, ResponseType> {
380 func: handleBidiStreamingCall<RequestType, ResponseType>;
381 serialize: Serialize<ResponseType>;
382 deserialize: Deserialize<RequestType>;
383 type: HandlerType;
384 path: string;
385}
386
387export type Handler<RequestType, ResponseType> =
388 | UnaryHandler<RequestType, ResponseType>
389 | ClientStreamingHandler<RequestType, ResponseType>
390 | ServerStreamingHandler<RequestType, ResponseType>
391 | BidiStreamingHandler<RequestType, ResponseType>;
392
393export type HandlerType = 'bidi' | 'clientStream' | 'serverStream' | 'unary';
394
395// Internal class that wraps the HTTP2 request.
396export class Http2ServerCallStream<
397 RequestType,
398 ResponseType
399> extends EventEmitter {
400 cancelled = false;
401 deadlineTimer: NodeJS.Timer | null = null;
402 private statusSent = false;
403 private deadline: Deadline = Infinity;
404 private wantTrailers = false;
405 private metadataSent = false;
406 private canPush = false;
407 private isPushPending = false;
408 private bufferedMessages: Array<Buffer | null> = [];
409 private messagesToPush: Array<RequestType | null> = [];
410 private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
411 private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
412
413 constructor(
414 private stream: http2.ServerHttp2Stream,
415 private handler: Handler<RequestType, ResponseType>,
416 private options: ChannelOptions
417 ) {
418 super();
419
420 this.stream.once('error', (err: ServerErrorResponse) => {
421 /* We need an error handler to avoid uncaught error event exceptions, but
422 * there is nothing we can reasonably do here. Any error event should
423 * have a corresponding close event, which handles emitting the cancelled
424 * event. And the stream is now in a bad state, so we can't reasonably
425 * expect to be able to send an error over it. */
426 });
427
428 this.stream.once('close', () => {
429 trace(
430 'Request to method ' +
431 this.handler?.path +
432 ' stream closed with rstCode ' +
433 this.stream.rstCode
434 );
435
436 if (!this.statusSent) {
437 this.cancelled = true;
438 this.emit('cancelled', 'cancelled');
439 this.emit('streamEnd', false);
440 this.sendStatus({
441 code: Status.CANCELLED,
442 details: 'Cancelled by client',
443 metadata: null,
444 });
445 }
446 });
447
448 this.stream.on('drain', () => {
449 this.emit('drain');
450 });
451
452 if ('grpc.max_send_message_length' in options) {
453 this.maxSendMessageSize = options['grpc.max_send_message_length']!;
454 }
455 if ('grpc.max_receive_message_length' in options) {
456 this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
457 }
458 }
459
460 private checkCancelled(): boolean {
461 /* In some cases the stream can become destroyed before the close event
462 * fires. That creates a race condition that this check works around */
463 if (this.stream.destroyed || this.stream.closed) {
464 this.cancelled = true;
465 }
466 return this.cancelled;
467 }
468
469 private getDecompressedMessage(
470 message: Buffer,
471 encoding: string
472 ): Buffer | Promise<Buffer> {
473 if (encoding === 'deflate') {
474 return inflate(message.subarray(5));
475 } else if (encoding === 'gzip') {
476 return unzip(message.subarray(5));
477 } else if (encoding === 'identity') {
478 return message.subarray(5);
479 }
480
481 return Promise.reject({
482 code: Status.UNIMPLEMENTED,
483 details: `Received message compressed with unsupported encoding "${encoding}"`,
484 });
485 }
486
487 sendMetadata(customMetadata?: Metadata) {
488 if (this.checkCancelled()) {
489 return;
490 }
491
492 if (this.metadataSent) {
493 return;
494 }
495
496 this.metadataSent = true;
497 const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
498 // TODO(cjihrig): Include compression headers.
499 const headers = { ...defaultResponseHeaders, ...custom };
500 this.stream.respond(headers, defaultResponseOptions);
501 }
502
503 receiveMetadata(headers: http2.IncomingHttpHeaders) {
504 const metadata = Metadata.fromHttp2Headers(headers);
505
506 if (logging.isTracerEnabled(TRACER_NAME)) {
507 trace(
508 'Request to ' +
509 this.handler.path +
510 ' received headers ' +
511 JSON.stringify(metadata.toJSON())
512 );
513 }
514
515 // TODO(cjihrig): Receive compression metadata.
516
517 const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
518
519 if (timeoutHeader.length > 0) {
520 const match = timeoutHeader[0].toString().match(DEADLINE_REGEX);
521
522 if (match === null) {
523 const err = new Error('Invalid deadline') as ServerErrorResponse;
524 err.code = Status.OUT_OF_RANGE;
525 this.sendError(err);
526 return metadata;
527 }
528
529 const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
530
531 const now = new Date();
532 this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
533 this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this);
534 metadata.remove(GRPC_TIMEOUT_HEADER);
535 }
536
537 // Remove several headers that should not be propagated to the application
538 metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
539 metadata.remove(http2.constants.HTTP2_HEADER_TE);
540 metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
541 metadata.remove('grpc-accept-encoding');
542
543 return metadata;
544 }
545
546 receiveUnaryMessage(
547 encoding: string,
548 next: (
549 err: Partial<ServerStatusResponse> | null,
550 request?: RequestType
551 ) => void
552 ): void {
553 const { stream } = this;
554
555 let receivedLength = 0;
556 const call = this;
557 const body: Buffer[] = [];
558 const limit = this.maxReceiveMessageSize;
559
560 stream.on('data', onData);
561 stream.on('end', onEnd);
562 stream.on('error', onEnd);
563
564 function onData(chunk: Buffer) {
565 receivedLength += chunk.byteLength;
566
567 if (limit !== -1 && receivedLength > limit) {
568 stream.removeListener('data', onData);
569 stream.removeListener('end', onEnd);
570 stream.removeListener('error', onEnd);
571 next({
572 code: Status.RESOURCE_EXHAUSTED,
573 details: `Received message larger than max (${receivedLength} vs. ${limit})`,
574 });
575 return;
576 }
577
578 body.push(chunk);
579 }
580
581 function onEnd(err?: Error) {
582 stream.removeListener('data', onData);
583 stream.removeListener('end', onEnd);
584 stream.removeListener('error', onEnd);
585
586 if (err !== undefined) {
587 next({ code: Status.INTERNAL, details: err.message });
588 return;
589 }
590
591 if (receivedLength === 0) {
592 next({ code: Status.INTERNAL, details: 'received empty unary message' })
593 return;
594 }
595
596 call.emit('receiveMessage');
597
598 const requestBytes = Buffer.concat(body, receivedLength);
599 const compressed = requestBytes.readUInt8(0) === 1;
600 const compressedMessageEncoding = compressed ? encoding : 'identity';
601 const decompressedMessage = call.getDecompressedMessage(
602 requestBytes,
603 compressedMessageEncoding
604 );
605
606 if (Buffer.isBuffer(decompressedMessage)) {
607 call.safeDeserializeMessage(decompressedMessage, next);
608 return;
609 }
610
611 decompressedMessage.then(
612 (decompressed) => call.safeDeserializeMessage(decompressed, next),
613 (err: any) => next(
614 err.code
615 ? err
616 : {
617 code: Status.INTERNAL,
618 details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
619 }
620 )
621 )
622 }
623 }
624
625 private safeDeserializeMessage(
626 buffer: Buffer,
627 next: (err: Partial<ServerStatusResponse> | null, request?: RequestType) => void
628 ) {
629 try {
630 next(null, this.deserializeMessage(buffer));
631 } catch (err) {
632 err.code = Status.INTERNAL;
633 next(err);
634 }
635 }
636
637 serializeMessage(value: ResponseType) {
638 const messageBuffer = this.handler.serialize(value);
639
640 // TODO(cjihrig): Call compression aware serializeMessage().
641 const byteLength = messageBuffer.byteLength;
642 const output = Buffer.allocUnsafe(byteLength + 5);
643 output.writeUInt8(0, 0);
644 output.writeUInt32BE(byteLength, 1);
645 messageBuffer.copy(output, 5);
646 return output;
647 }
648
649 deserializeMessage(bytes: Buffer) {
650 return this.handler.deserialize(bytes);
651 }
652
653 async sendUnaryMessage(
654 err: ServerErrorResponse | ServerStatusResponse | null,
655 value?: ResponseType | null,
656 metadata?: Metadata | null,
657 flags?: number
658 ) {
659 if (this.checkCancelled()) {
660 return;
661 }
662
663 if (metadata === undefined) {
664 metadata = null;
665 }
666
667 if (err) {
668 if (!Object.prototype.hasOwnProperty.call(err, 'metadata') && metadata) {
669 err.metadata = metadata;
670 }
671 this.sendError(err);
672 return;
673 }
674
675 try {
676 const response = this.serializeMessage(value!);
677
678 this.write(response);
679 this.sendStatus({ code: Status.OK, details: 'OK', metadata });
680 } catch (err) {
681 err.code = Status.INTERNAL;
682 this.sendError(err);
683 }
684 }
685
686 sendStatus(statusObj: PartialStatusObject) {
687 this.emit('callEnd', statusObj.code);
688 this.emit('streamEnd', statusObj.code === Status.OK);
689 if (this.checkCancelled()) {
690 return;
691 }
692
693 trace(
694 'Request to method ' +
695 this.handler?.path +
696 ' ended with status code: ' +
697 Status[statusObj.code] +
698 ' details: ' +
699 statusObj.details
700 );
701
702 if (this.deadlineTimer) clearTimeout(this.deadlineTimer);
703
704 if (!this.wantTrailers) {
705 this.wantTrailers = true;
706 this.stream.once('wantTrailers', () => {
707 const trailersToSend = {
708 [GRPC_STATUS_HEADER]: statusObj.code,
709 [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
710 ...statusObj.metadata?.toHttp2Headers(),
711 };
712
713 this.stream.sendTrailers(trailersToSend);
714 this.statusSent = true;
715 });
716 this.sendMetadata();
717 this.stream.end();
718 }
719 }
720
721 sendError(error: ServerErrorResponse | ServerStatusResponse) {
722 const status: PartialStatusObject = {
723 code: Status.UNKNOWN,
724 details: 'message' in error ? error.message : 'Unknown Error',
725 metadata:
726 'metadata' in error && error.metadata !== undefined
727 ? error.metadata
728 : null,
729 };
730
731 if (
732 'code' in error &&
733 typeof error.code === 'number' &&
734 Number.isInteger(error.code)
735 ) {
736 status.code = error.code;
737
738 if ('details' in error && typeof error.details === 'string') {
739 status.details = error.details!;
740 }
741 }
742
743 this.sendStatus(status);
744 }
745
746 write(chunk: Buffer) {
747 if (this.checkCancelled()) {
748 return;
749 }
750
751 if (
752 this.maxSendMessageSize !== -1 &&
753 chunk.length > this.maxSendMessageSize
754 ) {
755 this.sendError({
756 code: Status.RESOURCE_EXHAUSTED,
757 details: `Sent message larger than max (${chunk.length} vs. ${this.maxSendMessageSize})`,
758 });
759 return;
760 }
761
762 this.sendMetadata();
763 this.emit('sendMessage');
764 return this.stream.write(chunk);
765 }
766
767 resume() {
768 this.stream.resume();
769 }
770
771 setupSurfaceCall(call: ServerSurfaceCall) {
772 this.once('cancelled', (reason) => {
773 call.cancelled = true;
774 call.emit('cancelled', reason);
775 });
776
777 this.once('callEnd', (status) => call.emit('callEnd', status));
778 }
779
780 setupReadable(
781 readable:
782 | ServerReadableStream<RequestType, ResponseType>
783 | ServerDuplexStream<RequestType, ResponseType>,
784 encoding: string
785 ) {
786 const decoder = new StreamDecoder();
787
788 let readsDone = false;
789
790 let pendingMessageProcessing = false;
791
792 let pushedEnd = false;
793
794 const maybePushEnd = () => {
795 if (!pushedEnd && readsDone && !pendingMessageProcessing) {
796 pushedEnd = true;
797 this.pushOrBufferMessage(readable, null);
798 }
799 };
800
801 this.stream.on('data', async (data: Buffer) => {
802 const messages = decoder.write(data);
803
804 pendingMessageProcessing = true;
805 this.stream.pause();
806 for (const message of messages) {
807 if (
808 this.maxReceiveMessageSize !== -1 &&
809 message.length > this.maxReceiveMessageSize
810 ) {
811 this.sendError({
812 code: Status.RESOURCE_EXHAUSTED,
813 details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
814 });
815 return;
816 }
817 this.emit('receiveMessage');
818
819 const compressed = message.readUInt8(0) === 1;
820 const compressedMessageEncoding = compressed ? encoding : 'identity';
821 const decompressedMessage = await this.getDecompressedMessage(
822 message,
823 compressedMessageEncoding
824 );
825
826 // Encountered an error with decompression; it'll already have been propogated back
827 // Just return early
828 if (!decompressedMessage) return;
829
830 this.pushOrBufferMessage(readable, decompressedMessage);
831 }
832 pendingMessageProcessing = false;
833 this.stream.resume();
834 maybePushEnd();
835 });
836
837 this.stream.once('end', () => {
838 readsDone = true;
839 maybePushEnd();
840 });
841 }
842
843 consumeUnpushedMessages(
844 readable:
845 | ServerReadableStream<RequestType, ResponseType>
846 | ServerDuplexStream<RequestType, ResponseType>
847 ): boolean {
848 this.canPush = true;
849
850 while (this.messagesToPush.length > 0) {
851 const nextMessage = this.messagesToPush.shift();
852 const canPush = readable.push(nextMessage);
853
854 if (nextMessage === null || canPush === false) {
855 this.canPush = false;
856 break;
857 }
858 }
859
860 return this.canPush;
861 }
862
863 private pushOrBufferMessage(
864 readable:
865 | ServerReadableStream<RequestType, ResponseType>
866 | ServerDuplexStream<RequestType, ResponseType>,
867 messageBytes: Buffer | null
868 ): void {
869 if (this.isPushPending) {
870 this.bufferedMessages.push(messageBytes);
871 } else {
872 this.pushMessage(readable, messageBytes);
873 }
874 }
875
876 private async pushMessage(
877 readable:
878 | ServerReadableStream<RequestType, ResponseType>
879 | ServerDuplexStream<RequestType, ResponseType>,
880 messageBytes: Buffer | null
881 ) {
882 if (messageBytes === null) {
883 trace('Received end of stream');
884 if (this.canPush) {
885 readable.push(null);
886 } else {
887 this.messagesToPush.push(null);
888 }
889
890 return;
891 }
892
893 trace('Received message of length ' + messageBytes.length);
894
895 this.isPushPending = true;
896
897 try {
898 const deserialized = await this.deserializeMessage(messageBytes);
899
900 if (this.canPush) {
901 if (!readable.push(deserialized)) {
902 this.canPush = false;
903 this.stream.pause();
904 }
905 } else {
906 this.messagesToPush.push(deserialized);
907 }
908 } catch (error) {
909 // Ignore any remaining messages when errors occur.
910 this.bufferedMessages.length = 0;
911
912 if (
913 !(
914 'code' in error &&
915 typeof error.code === 'number' &&
916 Number.isInteger(error.code) &&
917 error.code >= Status.OK &&
918 error.code <= Status.UNAUTHENTICATED
919 )
920 ) {
921 // The error code is not a valid gRPC code so its being overwritten.
922 error.code = Status.INTERNAL;
923 }
924
925 readable.emit('error', error);
926 }
927
928 this.isPushPending = false;
929
930 if (this.bufferedMessages.length > 0) {
931 this.pushMessage(
932 readable,
933 this.bufferedMessages.shift() as Buffer | null
934 );
935 }
936 }
937
938 getPeer(): string {
939 const socket = this.stream.session.socket;
940 if (socket.remoteAddress) {
941 if (socket.remotePort) {
942 return `${socket.remoteAddress}:${socket.remotePort}`;
943 } else {
944 return socket.remoteAddress;
945 }
946 } else {
947 return 'unknown';
948 }
949 }
950
951 getDeadline(): Deadline {
952 return this.deadline;
953 }
954
955 getPath(): string {
956 return this.handler.path;
957 }
958}
959
960/* eslint-disable @typescript-eslint/no-explicit-any */
961type UntypedServerCall = Http2ServerCallStream<any, any>;
962
963function handleExpiredDeadline(call: UntypedServerCall) {
964 const err = new Error('Deadline exceeded') as ServerErrorResponse;
965 err.code = Status.DEADLINE_EXCEEDED;
966
967 call.sendError(err);
968 call.cancelled = true;
969 call.emit('cancelled', 'deadline');
970}