UNPKG

27.8 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { EventEmitter } from 'events';
19import * as http2 from 'http2';
20import { Duplex, Readable, Writable } from 'stream';
21import * as zlib from 'zlib';
22import { promisify } from 'util';
23
24import {
25 Status,
26 DEFAULT_MAX_SEND_MESSAGE_LENGTH,
27 DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
28 LogVerbosity,
29} from './constants';
30import { Deserialize, Serialize } from './make-client';
31import { Metadata } from './metadata';
32import { StreamDecoder } from './stream-decoder';
33import { ObjectReadable, ObjectWritable } from './object-stream';
34import { ChannelOptions } from './channel-options';
35import * as logging from './logging';
36import { StatusObject, PartialStatusObject } from './call-interface';
37import { Deadline } from './deadline';
38import { getErrorCode, getErrorMessage } from './error';
39
40const TRACER_NAME = 'server_call';
41const unzip = promisify(zlib.unzip);
42const inflate = promisify(zlib.inflate);
43
44function trace(text: string): void {
45 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
46}
47
48interface DeadlineUnitIndexSignature {
49 [name: string]: number;
50}
51
52const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
53const GRPC_ENCODING_HEADER = 'grpc-encoding';
54const GRPC_MESSAGE_HEADER = 'grpc-message';
55const GRPC_STATUS_HEADER = 'grpc-status';
56const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
57const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
58const deadlineUnitsToMs: DeadlineUnitIndexSignature = {
59 H: 3600000,
60 M: 60000,
61 S: 1000,
62 m: 1,
63 u: 0.001,
64 n: 0.000001,
65};
66const defaultCompressionHeaders = {
67 // TODO(cjihrig): Remove these encoding headers from the default response
68 // once compression is integrated.
69 [GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
70 [GRPC_ENCODING_HEADER]: 'identity',
71};
72const defaultResponseHeaders = {
73 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
74 [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
75};
76const defaultResponseOptions = {
77 waitForTrailers: true,
78} as http2.ServerStreamResponseOptions;
79
80export type ServerStatusResponse = Partial<StatusObject>;
81
82export type ServerErrorResponse = ServerStatusResponse & Error;
83
84export type ServerSurfaceCall = {
85 cancelled: boolean;
86 readonly metadata: Metadata;
87 getPeer(): string;
88 sendMetadata(responseMetadata: Metadata): void;
89 getDeadline(): Deadline;
90 getPath(): string;
91} & EventEmitter;
92
93export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
94 request: RequestType;
95};
96export type ServerReadableStream<RequestType, ResponseType> =
97 ServerSurfaceCall & ObjectReadable<RequestType>;
98export type ServerWritableStream<RequestType, ResponseType> =
99 ServerSurfaceCall &
100 ObjectWritable<ResponseType> & {
101 request: RequestType;
102 end: (metadata?: Metadata) => void;
103 };
104export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
105 ObjectReadable<RequestType> &
106 ObjectWritable<ResponseType> & { end: (metadata?: Metadata) => void };
107
108export class ServerUnaryCallImpl<RequestType, ResponseType>
109 extends EventEmitter
110 implements ServerUnaryCall<RequestType, ResponseType>
111{
112 cancelled: boolean;
113
114 constructor(
115 private call: Http2ServerCallStream<RequestType, ResponseType>,
116 public metadata: Metadata,
117 public request: RequestType
118 ) {
119 super();
120 this.cancelled = false;
121 this.call.setupSurfaceCall(this);
122 }
123
124 getPeer(): string {
125 return this.call.getPeer();
126 }
127
128 sendMetadata(responseMetadata: Metadata): void {
129 this.call.sendMetadata(responseMetadata);
130 }
131
132 getDeadline(): Deadline {
133 return this.call.getDeadline();
134 }
135
136 getPath(): string {
137 return this.call.getPath();
138 }
139}
140
141export class ServerReadableStreamImpl<RequestType, ResponseType>
142 extends Readable
143 implements ServerReadableStream<RequestType, ResponseType>
144{
145 cancelled: boolean;
146
147 constructor(
148 private call: Http2ServerCallStream<RequestType, ResponseType>,
149 public metadata: Metadata,
150 public deserialize: Deserialize<RequestType>,
151 encoding: string
152 ) {
153 super({ objectMode: true });
154 this.cancelled = false;
155 this.call.setupSurfaceCall(this);
156 this.call.setupReadable(this, encoding);
157 }
158
159 _read(size: number) {
160 if (!this.call.consumeUnpushedMessages(this)) {
161 return;
162 }
163
164 this.call.resume();
165 }
166
167 getPeer(): string {
168 return this.call.getPeer();
169 }
170
171 sendMetadata(responseMetadata: Metadata): void {
172 this.call.sendMetadata(responseMetadata);
173 }
174
175 getDeadline(): Deadline {
176 return this.call.getDeadline();
177 }
178
179 getPath(): string {
180 return this.call.getPath();
181 }
182}
183
184export class ServerWritableStreamImpl<RequestType, ResponseType>
185 extends Writable
186 implements ServerWritableStream<RequestType, ResponseType>
187{
188 cancelled: boolean;
189 private trailingMetadata: Metadata;
190
191 constructor(
192 private call: Http2ServerCallStream<RequestType, ResponseType>,
193 public metadata: Metadata,
194 public serialize: Serialize<ResponseType>,
195 public request: RequestType
196 ) {
197 super({ objectMode: true });
198 this.cancelled = false;
199 this.trailingMetadata = new Metadata();
200 this.call.setupSurfaceCall(this);
201
202 this.on('error', err => {
203 this.call.sendError(err);
204 this.end();
205 });
206 }
207
208 getPeer(): string {
209 return this.call.getPeer();
210 }
211
212 sendMetadata(responseMetadata: Metadata): void {
213 this.call.sendMetadata(responseMetadata);
214 }
215
216 getDeadline(): Deadline {
217 return this.call.getDeadline();
218 }
219
220 getPath(): string {
221 return this.call.getPath();
222 }
223
224 _write(
225 chunk: ResponseType,
226 encoding: string,
227 // eslint-disable-next-line @typescript-eslint/no-explicit-any
228 callback: (...args: any[]) => void
229 ) {
230 try {
231 const response = this.call.serializeMessage(chunk);
232
233 if (!this.call.write(response)) {
234 this.call.once('drain', callback);
235 return;
236 }
237 } catch (err) {
238 this.emit('error', {
239 details: getErrorMessage(err),
240 code: Status.INTERNAL,
241 });
242 }
243
244 callback();
245 }
246
247 _final(callback: Function): void {
248 this.call.sendStatus({
249 code: Status.OK,
250 details: 'OK',
251 metadata: this.trailingMetadata,
252 });
253 callback(null);
254 }
255
256 // eslint-disable-next-line @typescript-eslint/no-explicit-any
257 end(metadata?: any) {
258 if (metadata) {
259 this.trailingMetadata = metadata;
260 }
261
262 return super.end();
263 }
264}
265
266export class ServerDuplexStreamImpl<RequestType, ResponseType>
267 extends Duplex
268 implements ServerDuplexStream<RequestType, ResponseType>
269{
270 cancelled: boolean;
271 /* This field appears to be unsued, but it is actually used in _final, which is assiged from
272 * ServerWritableStreamImpl.prototype._final below. */
273 // eslint-disable-next-line @typescript-eslint/ban-ts-comment
274 // @ts-ignore noUnusedLocals
275 private trailingMetadata: Metadata;
276
277 constructor(
278 private call: Http2ServerCallStream<RequestType, ResponseType>,
279 public metadata: Metadata,
280 public serialize: Serialize<ResponseType>,
281 public deserialize: Deserialize<RequestType>,
282 encoding: string
283 ) {
284 super({ objectMode: true });
285 this.cancelled = false;
286 this.trailingMetadata = new Metadata();
287 this.call.setupSurfaceCall(this);
288 this.call.setupReadable(this, encoding);
289
290 this.on('error', err => {
291 this.call.sendError(err);
292 this.end();
293 });
294 }
295
296 getPeer(): string {
297 return this.call.getPeer();
298 }
299
300 sendMetadata(responseMetadata: Metadata): void {
301 this.call.sendMetadata(responseMetadata);
302 }
303
304 getDeadline(): Deadline {
305 return this.call.getDeadline();
306 }
307
308 getPath(): string {
309 return this.call.getPath();
310 }
311
312 // eslint-disable-next-line @typescript-eslint/no-explicit-any
313 end(metadata?: any) {
314 if (metadata) {
315 this.trailingMetadata = metadata;
316 }
317
318 return super.end();
319 }
320}
321
322ServerDuplexStreamImpl.prototype._read =
323 ServerReadableStreamImpl.prototype._read;
324ServerDuplexStreamImpl.prototype._write =
325 ServerWritableStreamImpl.prototype._write;
326ServerDuplexStreamImpl.prototype._final =
327 ServerWritableStreamImpl.prototype._final;
328
329// Unary response callback signature.
330export type sendUnaryData<ResponseType> = (
331 error: ServerErrorResponse | ServerStatusResponse | null,
332 value?: ResponseType | null,
333 trailer?: Metadata,
334 flags?: number
335) => void;
336
337// User provided handler for unary calls.
338export type handleUnaryCall<RequestType, ResponseType> = (
339 call: ServerUnaryCall<RequestType, ResponseType>,
340 callback: sendUnaryData<ResponseType>
341) => void;
342
343// User provided handler for client streaming calls.
344export type handleClientStreamingCall<RequestType, ResponseType> = (
345 call: ServerReadableStream<RequestType, ResponseType>,
346 callback: sendUnaryData<ResponseType>
347) => void;
348
349// User provided handler for server streaming calls.
350export type handleServerStreamingCall<RequestType, ResponseType> = (
351 call: ServerWritableStream<RequestType, ResponseType>
352) => void;
353
354// User provided handler for bidirectional streaming calls.
355export type handleBidiStreamingCall<RequestType, ResponseType> = (
356 call: ServerDuplexStream<RequestType, ResponseType>
357) => void;
358
359export type HandleCall<RequestType, ResponseType> =
360 | handleUnaryCall<RequestType, ResponseType>
361 | handleClientStreamingCall<RequestType, ResponseType>
362 | handleServerStreamingCall<RequestType, ResponseType>
363 | handleBidiStreamingCall<RequestType, ResponseType>;
364
365export interface UnaryHandler<RequestType, ResponseType> {
366 func: handleUnaryCall<RequestType, ResponseType>;
367 serialize: Serialize<ResponseType>;
368 deserialize: Deserialize<RequestType>;
369 type: HandlerType;
370 path: string;
371}
372
373export interface ClientStreamingHandler<RequestType, ResponseType> {
374 func: handleClientStreamingCall<RequestType, ResponseType>;
375 serialize: Serialize<ResponseType>;
376 deserialize: Deserialize<RequestType>;
377 type: HandlerType;
378 path: string;
379}
380
381export interface ServerStreamingHandler<RequestType, ResponseType> {
382 func: handleServerStreamingCall<RequestType, ResponseType>;
383 serialize: Serialize<ResponseType>;
384 deserialize: Deserialize<RequestType>;
385 type: HandlerType;
386 path: string;
387}
388
389export interface BidiStreamingHandler<RequestType, ResponseType> {
390 func: handleBidiStreamingCall<RequestType, ResponseType>;
391 serialize: Serialize<ResponseType>;
392 deserialize: Deserialize<RequestType>;
393 type: HandlerType;
394 path: string;
395}
396
397export type Handler<RequestType, ResponseType> =
398 | UnaryHandler<RequestType, ResponseType>
399 | ClientStreamingHandler<RequestType, ResponseType>
400 | ServerStreamingHandler<RequestType, ResponseType>
401 | BidiStreamingHandler<RequestType, ResponseType>;
402
403export type HandlerType = 'bidi' | 'clientStream' | 'serverStream' | 'unary';
404
405// Internal class that wraps the HTTP2 request.
406export class Http2ServerCallStream<
407 RequestType,
408 ResponseType
409> extends EventEmitter {
410 cancelled = false;
411 deadlineTimer: NodeJS.Timeout | null = null;
412 private statusSent = false;
413 private deadline: Deadline = Infinity;
414 private wantTrailers = false;
415 private metadataSent = false;
416 private canPush = false;
417 private isPushPending = false;
418 private bufferedMessages: Array<Buffer | null> = [];
419 private messagesToPush: Array<RequestType | null> = [];
420 private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
421 private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
422
423 constructor(
424 private stream: http2.ServerHttp2Stream,
425 private handler: Handler<RequestType, ResponseType>,
426 options: ChannelOptions
427 ) {
428 super();
429
430 this.stream.once('error', (err: ServerErrorResponse) => {
431 /* We need an error handler to avoid uncaught error event exceptions, but
432 * there is nothing we can reasonably do here. Any error event should
433 * have a corresponding close event, which handles emitting the cancelled
434 * event. And the stream is now in a bad state, so we can't reasonably
435 * expect to be able to send an error over it. */
436 });
437
438 this.stream.once('close', () => {
439 trace(
440 'Request to method ' +
441 this.handler?.path +
442 ' stream closed with rstCode ' +
443 this.stream.rstCode
444 );
445
446 if (!this.statusSent) {
447 this.cancelled = true;
448 this.emit('cancelled', 'cancelled');
449 this.emit('streamEnd', false);
450 this.sendStatus({
451 code: Status.CANCELLED,
452 details: 'Cancelled by client',
453 metadata: null,
454 });
455 if (this.deadlineTimer) clearTimeout(this.deadlineTimer);
456 }
457 });
458
459 this.stream.on('drain', () => {
460 this.emit('drain');
461 });
462
463 if ('grpc.max_send_message_length' in options) {
464 this.maxSendMessageSize = options['grpc.max_send_message_length']!;
465 }
466 if ('grpc.max_receive_message_length' in options) {
467 this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
468 }
469 }
470
471 private checkCancelled(): boolean {
472 /* In some cases the stream can become destroyed before the close event
473 * fires. That creates a race condition that this check works around */
474 if (this.stream.destroyed || this.stream.closed) {
475 this.cancelled = true;
476 }
477 return this.cancelled;
478 }
479
480 private getDecompressedMessage(
481 message: Buffer,
482 encoding: string
483 ): Buffer | Promise<Buffer> {
484 if (encoding === 'deflate') {
485 return inflate(message.subarray(5));
486 } else if (encoding === 'gzip') {
487 return unzip(message.subarray(5));
488 } else if (encoding === 'identity') {
489 return message.subarray(5);
490 }
491
492 return Promise.reject({
493 code: Status.UNIMPLEMENTED,
494 details: `Received message compressed with unsupported encoding "${encoding}"`,
495 });
496 }
497
498 sendMetadata(customMetadata?: Metadata) {
499 if (this.checkCancelled()) {
500 return;
501 }
502
503 if (this.metadataSent) {
504 return;
505 }
506
507 this.metadataSent = true;
508 const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
509 // TODO(cjihrig): Include compression headers.
510 const headers = {
511 ...defaultResponseHeaders,
512 ...defaultCompressionHeaders,
513 ...custom,
514 };
515 this.stream.respond(headers, defaultResponseOptions);
516 }
517
518 receiveMetadata(headers: http2.IncomingHttpHeaders) {
519 const metadata = Metadata.fromHttp2Headers(headers);
520
521 if (logging.isTracerEnabled(TRACER_NAME)) {
522 trace(
523 'Request to ' +
524 this.handler.path +
525 ' received headers ' +
526 JSON.stringify(metadata.toJSON())
527 );
528 }
529
530 // TODO(cjihrig): Receive compression metadata.
531
532 const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
533
534 if (timeoutHeader.length > 0) {
535 const match = timeoutHeader[0].toString().match(DEADLINE_REGEX);
536
537 if (match === null) {
538 const err = new Error('Invalid deadline') as ServerErrorResponse;
539 err.code = Status.OUT_OF_RANGE;
540 this.sendError(err);
541 return metadata;
542 }
543
544 const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
545
546 const now = new Date();
547 this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
548 this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this);
549 metadata.remove(GRPC_TIMEOUT_HEADER);
550 }
551
552 // Remove several headers that should not be propagated to the application
553 metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
554 metadata.remove(http2.constants.HTTP2_HEADER_TE);
555 metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
556 metadata.remove('grpc-accept-encoding');
557
558 return metadata;
559 }
560
561 receiveUnaryMessage(encoding: string): Promise<RequestType | void> {
562 return new Promise((resolve, reject) => {
563 const { stream } = this;
564
565 let receivedLength = 0;
566
567 // eslint-disable-next-line @typescript-eslint/no-this-alias
568 const call = this;
569 const body: Buffer[] = [];
570 const limit = this.maxReceiveMessageSize;
571
572 this.stream.on('data', onData);
573 this.stream.on('end', onEnd);
574 this.stream.on('error', onEnd);
575
576 function onData(chunk: Buffer) {
577 receivedLength += chunk.byteLength;
578
579 if (limit !== -1 && receivedLength > limit) {
580 stream.removeListener('data', onData);
581 stream.removeListener('end', onEnd);
582 stream.removeListener('error', onEnd);
583
584 reject({
585 code: Status.RESOURCE_EXHAUSTED,
586 details: `Received message larger than max (${receivedLength} vs. ${limit})`,
587 });
588 return;
589 }
590
591 body.push(chunk);
592 }
593
594 function onEnd(err?: Error) {
595 stream.removeListener('data', onData);
596 stream.removeListener('end', onEnd);
597 stream.removeListener('error', onEnd);
598
599 if (err !== undefined) {
600 reject({ code: Status.INTERNAL, details: err.message });
601 return;
602 }
603
604 if (receivedLength === 0) {
605 reject({
606 code: Status.INTERNAL,
607 details: 'received empty unary message',
608 });
609 return;
610 }
611
612 call.emit('receiveMessage');
613
614 const requestBytes = Buffer.concat(body, receivedLength);
615 const compressed = requestBytes.readUInt8(0) === 1;
616 const compressedMessageEncoding = compressed ? encoding : 'identity';
617 const decompressedMessage = call.getDecompressedMessage(
618 requestBytes,
619 compressedMessageEncoding
620 );
621
622 if (Buffer.isBuffer(decompressedMessage)) {
623 resolve(
624 call.deserializeMessageWithInternalError(decompressedMessage)
625 );
626 return;
627 }
628
629 decompressedMessage.then(
630 decompressed =>
631 resolve(call.deserializeMessageWithInternalError(decompressed)),
632 (err: any) =>
633 reject(
634 err.code
635 ? err
636 : {
637 code: Status.INTERNAL,
638 details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
639 }
640 )
641 );
642 }
643 });
644 }
645
646 private async deserializeMessageWithInternalError(buffer: Buffer) {
647 try {
648 return this.deserializeMessage(buffer);
649 } catch (err) {
650 throw {
651 details: getErrorMessage(err),
652 code: Status.INTERNAL,
653 };
654 }
655 }
656
657 serializeMessage(value: ResponseType) {
658 const messageBuffer = this.handler.serialize(value);
659
660 // TODO(cjihrig): Call compression aware serializeMessage().
661 const byteLength = messageBuffer.byteLength;
662 const output = Buffer.allocUnsafe(byteLength + 5);
663 output.writeUInt8(0, 0);
664 output.writeUInt32BE(byteLength, 1);
665 messageBuffer.copy(output, 5);
666 return output;
667 }
668
669 deserializeMessage(bytes: Buffer) {
670 return this.handler.deserialize(bytes);
671 }
672
673 async sendUnaryMessage(
674 err: ServerErrorResponse | ServerStatusResponse | null,
675 value?: ResponseType | null,
676 metadata?: Metadata | null,
677 flags?: number
678 ) {
679 if (this.checkCancelled()) {
680 return;
681 }
682
683 if (metadata === undefined) {
684 metadata = null;
685 }
686
687 if (err) {
688 if (!Object.prototype.hasOwnProperty.call(err, 'metadata') && metadata) {
689 err.metadata = metadata;
690 }
691 this.sendError(err);
692 return;
693 }
694
695 try {
696 const response = this.serializeMessage(value!);
697
698 this.write(response);
699 this.sendStatus({ code: Status.OK, details: 'OK', metadata });
700 } catch (err) {
701 this.sendError({
702 details: getErrorMessage(err),
703 code: Status.INTERNAL,
704 });
705 }
706 }
707
708 sendStatus(statusObj: PartialStatusObject) {
709 this.emit('callEnd', statusObj.code);
710 this.emit('streamEnd', statusObj.code === Status.OK);
711 if (this.checkCancelled()) {
712 return;
713 }
714
715 trace(
716 'Request to method ' +
717 this.handler?.path +
718 ' ended with status code: ' +
719 Status[statusObj.code] +
720 ' details: ' +
721 statusObj.details
722 );
723
724 if (this.deadlineTimer) clearTimeout(this.deadlineTimer);
725
726 if (this.stream.headersSent) {
727 if (!this.wantTrailers) {
728 this.wantTrailers = true;
729 this.stream.once('wantTrailers', () => {
730 const trailersToSend = {
731 [GRPC_STATUS_HEADER]: statusObj.code,
732 [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
733 ...statusObj.metadata?.toHttp2Headers(),
734 };
735
736 this.stream.sendTrailers(trailersToSend);
737 this.statusSent = true;
738 });
739 this.stream.end();
740 }
741 } else {
742 // Trailers-only response
743 const trailersToSend = {
744 [GRPC_STATUS_HEADER]: statusObj.code,
745 [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
746 ...defaultResponseHeaders,
747 ...statusObj.metadata?.toHttp2Headers(),
748 };
749 this.stream.respond(trailersToSend, { endStream: true });
750 this.statusSent = true;
751 }
752 }
753
754 sendError(error: ServerErrorResponse | ServerStatusResponse) {
755 const status: PartialStatusObject = {
756 code: Status.UNKNOWN,
757 details: 'message' in error ? error.message : 'Unknown Error',
758 metadata:
759 'metadata' in error && error.metadata !== undefined
760 ? error.metadata
761 : null,
762 };
763
764 if (
765 'code' in error &&
766 typeof error.code === 'number' &&
767 Number.isInteger(error.code)
768 ) {
769 status.code = error.code;
770
771 if ('details' in error && typeof error.details === 'string') {
772 status.details = error.details!;
773 }
774 }
775
776 this.sendStatus(status);
777 }
778
779 write(chunk: Buffer) {
780 if (this.checkCancelled()) {
781 return;
782 }
783
784 if (
785 this.maxSendMessageSize !== -1 &&
786 chunk.length > this.maxSendMessageSize
787 ) {
788 this.sendError({
789 code: Status.RESOURCE_EXHAUSTED,
790 details: `Sent message larger than max (${chunk.length} vs. ${this.maxSendMessageSize})`,
791 });
792 return;
793 }
794
795 this.sendMetadata();
796 this.emit('sendMessage');
797 return this.stream.write(chunk);
798 }
799
800 resume() {
801 this.stream.resume();
802 }
803
804 setupSurfaceCall(call: ServerSurfaceCall) {
805 this.once('cancelled', reason => {
806 call.cancelled = true;
807 call.emit('cancelled', reason);
808 });
809
810 this.once('callEnd', status => call.emit('callEnd', status));
811 }
812
813 setupReadable(
814 readable:
815 | ServerReadableStream<RequestType, ResponseType>
816 | ServerDuplexStream<RequestType, ResponseType>,
817 encoding: string
818 ) {
819 const decoder = new StreamDecoder();
820
821 let readsDone = false;
822
823 let pendingMessageProcessing = false;
824
825 let pushedEnd = false;
826
827 const maybePushEnd = async () => {
828 if (!pushedEnd && readsDone && !pendingMessageProcessing) {
829 pushedEnd = true;
830 await this.pushOrBufferMessage(readable, null);
831 }
832 };
833
834 this.stream.on('data', async (data: Buffer) => {
835 const messages = decoder.write(data);
836
837 pendingMessageProcessing = true;
838 this.stream.pause();
839 for (const message of messages) {
840 if (
841 this.maxReceiveMessageSize !== -1 &&
842 message.length > this.maxReceiveMessageSize
843 ) {
844 this.sendError({
845 code: Status.RESOURCE_EXHAUSTED,
846 details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
847 });
848 return;
849 }
850 this.emit('receiveMessage');
851
852 const compressed = message.readUInt8(0) === 1;
853 const compressedMessageEncoding = compressed ? encoding : 'identity';
854 const decompressedMessage = await this.getDecompressedMessage(
855 message,
856 compressedMessageEncoding
857 );
858
859 // Encountered an error with decompression; it'll already have been propogated back
860 // Just return early
861 if (!decompressedMessage) return;
862
863 await this.pushOrBufferMessage(readable, decompressedMessage);
864 }
865 pendingMessageProcessing = false;
866 this.stream.resume();
867 await maybePushEnd();
868 });
869
870 this.stream.once('end', async () => {
871 readsDone = true;
872 await maybePushEnd();
873 });
874 }
875
876 consumeUnpushedMessages(
877 readable:
878 | ServerReadableStream<RequestType, ResponseType>
879 | ServerDuplexStream<RequestType, ResponseType>
880 ): boolean {
881 this.canPush = true;
882
883 while (this.messagesToPush.length > 0) {
884 const nextMessage = this.messagesToPush.shift();
885 const canPush = readable.push(nextMessage);
886
887 if (nextMessage === null || canPush === false) {
888 this.canPush = false;
889 break;
890 }
891 }
892
893 return this.canPush;
894 }
895
896 private async pushOrBufferMessage(
897 readable:
898 | ServerReadableStream<RequestType, ResponseType>
899 | ServerDuplexStream<RequestType, ResponseType>,
900 messageBytes: Buffer | null
901 ): Promise<void> {
902 if (this.isPushPending) {
903 this.bufferedMessages.push(messageBytes);
904 } else {
905 await this.pushMessage(readable, messageBytes);
906 }
907 }
908
909 private async pushMessage(
910 readable:
911 | ServerReadableStream<RequestType, ResponseType>
912 | ServerDuplexStream<RequestType, ResponseType>,
913 messageBytes: Buffer | null
914 ) {
915 if (messageBytes === null) {
916 trace('Received end of stream');
917 if (this.canPush) {
918 readable.push(null);
919 } else {
920 this.messagesToPush.push(null);
921 }
922
923 return;
924 }
925
926 trace('Received message of length ' + messageBytes.length);
927
928 this.isPushPending = true;
929
930 try {
931 const deserialized = await this.deserializeMessage(messageBytes);
932
933 if (this.canPush) {
934 if (!readable.push(deserialized)) {
935 this.canPush = false;
936 this.stream.pause();
937 }
938 } else {
939 this.messagesToPush.push(deserialized);
940 }
941 } catch (error) {
942 // Ignore any remaining messages when errors occur.
943 this.bufferedMessages.length = 0;
944 let code = getErrorCode(error);
945 if (code === null || code < Status.OK || code > Status.UNAUTHENTICATED) {
946 code = Status.INTERNAL;
947 }
948
949 readable.emit('error', {
950 details: getErrorMessage(error),
951 code: code,
952 });
953 }
954
955 this.isPushPending = false;
956
957 if (this.bufferedMessages.length > 0) {
958 await this.pushMessage(
959 readable,
960 this.bufferedMessages.shift() as Buffer | null
961 );
962 }
963 }
964
965 getPeer(): string {
966 const socket = this.stream.session?.socket;
967 if (socket?.remoteAddress) {
968 if (socket.remotePort) {
969 return `${socket.remoteAddress}:${socket.remotePort}`;
970 } else {
971 return socket.remoteAddress;
972 }
973 } else {
974 return 'unknown';
975 }
976 }
977
978 getDeadline(): Deadline {
979 return this.deadline;
980 }
981
982 getPath(): string {
983 return this.handler.path;
984 }
985}
986
987/* eslint-disable @typescript-eslint/no-explicit-any */
988type UntypedServerCall = Http2ServerCallStream<any, any>;
989
990function handleExpiredDeadline(call: UntypedServerCall) {
991 const err = new Error('Deadline exceeded') as ServerErrorResponse;
992 err.code = Status.DEADLINE_EXCEEDED;
993
994 call.sendError(err);
995 call.cancelled = true;
996 call.emit('cancelled', 'deadline');
997}