UNPKG

27.3 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { EventEmitter } from 'events';
19import * as http2 from 'http2';
20import { Duplex, Readable, Writable } from 'stream';
21import * as zlib from 'zlib';
22import { promisify } from 'util';
23
24import {
25 Status,
26 DEFAULT_MAX_SEND_MESSAGE_LENGTH,
27 DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
28 LogVerbosity,
29} from './constants';
30import { Deserialize, Serialize } from './make-client';
31import { Metadata } from './metadata';
32import { StreamDecoder } from './stream-decoder';
33import { ObjectReadable, ObjectWritable } from './object-stream';
34import { ChannelOptions } from './channel-options';
35import * as logging from './logging';
36import { StatusObject, PartialStatusObject } from './call-interface';
37import { Deadline } from './deadline';
38import { getErrorCode, getErrorMessage } from './error';
39
40const TRACER_NAME = 'server_call';
41const unzip = promisify(zlib.unzip);
42const inflate = promisify(zlib.inflate);
43
44function trace(text: string): void {
45 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
46}
47
48interface DeadlineUnitIndexSignature {
49 [name: string]: number;
50}
51
52const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
53const GRPC_ENCODING_HEADER = 'grpc-encoding';
54const GRPC_MESSAGE_HEADER = 'grpc-message';
55const GRPC_STATUS_HEADER = 'grpc-status';
56const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
57const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
58const deadlineUnitsToMs: DeadlineUnitIndexSignature = {
59 H: 3600000,
60 M: 60000,
61 S: 1000,
62 m: 1,
63 u: 0.001,
64 n: 0.000001,
65};
66const defaultCompressionHeaders = {
67 // TODO(cjihrig): Remove these encoding headers from the default response
68 // once compression is integrated.
69 [GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
70 [GRPC_ENCODING_HEADER]: 'identity',
71}
72const defaultResponseHeaders = {
73 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
74 [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
75};
76const defaultResponseOptions = {
77 waitForTrailers: true,
78} as http2.ServerStreamResponseOptions;
79
80export type ServerStatusResponse = Partial<StatusObject>;
81
82export type ServerErrorResponse = ServerStatusResponse & Error;
83
84export type ServerSurfaceCall = {
85 cancelled: boolean;
86 readonly metadata: Metadata;
87 getPeer(): string;
88 sendMetadata(responseMetadata: Metadata): void;
89 getDeadline(): Deadline;
90 getPath(): string;
91} & EventEmitter;
92
93export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
94 request: RequestType;
95};
96export type ServerReadableStream<RequestType, ResponseType> =
97 ServerSurfaceCall & ObjectReadable<RequestType>;
98export type ServerWritableStream<RequestType, ResponseType> =
99 ServerSurfaceCall &
100 ObjectWritable<ResponseType> & {
101 request: RequestType;
102 end: (metadata?: Metadata) => void;
103 };
104export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
105 ObjectReadable<RequestType> &
106 ObjectWritable<ResponseType> & { end: (metadata?: Metadata) => void };
107
108export class ServerUnaryCallImpl<RequestType, ResponseType>
109 extends EventEmitter
110 implements ServerUnaryCall<RequestType, ResponseType>
111{
112 cancelled: boolean;
113
114 constructor(
115 private call: Http2ServerCallStream<RequestType, ResponseType>,
116 public metadata: Metadata,
117 public request: RequestType
118 ) {
119 super();
120 this.cancelled = false;
121 this.call.setupSurfaceCall(this);
122 }
123
124 getPeer(): string {
125 return this.call.getPeer();
126 }
127
128 sendMetadata(responseMetadata: Metadata): void {
129 this.call.sendMetadata(responseMetadata);
130 }
131
132 getDeadline(): Deadline {
133 return this.call.getDeadline();
134 }
135
136 getPath(): string {
137 return this.call.getPath();
138 }
139}
140
141export class ServerReadableStreamImpl<RequestType, ResponseType>
142 extends Readable
143 implements ServerReadableStream<RequestType, ResponseType>
144{
145 cancelled: boolean;
146
147 constructor(
148 private call: Http2ServerCallStream<RequestType, ResponseType>,
149 public metadata: Metadata,
150 public deserialize: Deserialize<RequestType>,
151 encoding: string
152 ) {
153 super({ objectMode: true });
154 this.cancelled = false;
155 this.call.setupSurfaceCall(this);
156 this.call.setupReadable(this, encoding);
157 }
158
159 _read(size: number) {
160 if (!this.call.consumeUnpushedMessages(this)) {
161 return;
162 }
163
164 this.call.resume();
165 }
166
167 getPeer(): string {
168 return this.call.getPeer();
169 }
170
171 sendMetadata(responseMetadata: Metadata): void {
172 this.call.sendMetadata(responseMetadata);
173 }
174
175 getDeadline(): Deadline {
176 return this.call.getDeadline();
177 }
178
179 getPath(): string {
180 return this.call.getPath();
181 }
182}
183
184export class ServerWritableStreamImpl<RequestType, ResponseType>
185 extends Writable
186 implements ServerWritableStream<RequestType, ResponseType>
187{
188 cancelled: boolean;
189 private trailingMetadata: Metadata;
190
191 constructor(
192 private call: Http2ServerCallStream<RequestType, ResponseType>,
193 public metadata: Metadata,
194 public serialize: Serialize<ResponseType>,
195 public request: RequestType
196 ) {
197 super({ objectMode: true });
198 this.cancelled = false;
199 this.trailingMetadata = new Metadata();
200 this.call.setupSurfaceCall(this);
201
202 this.on('error', (err) => {
203 this.call.sendError(err);
204 this.end();
205 });
206 }
207
208 getPeer(): string {
209 return this.call.getPeer();
210 }
211
212 sendMetadata(responseMetadata: Metadata): void {
213 this.call.sendMetadata(responseMetadata);
214 }
215
216 getDeadline(): Deadline {
217 return this.call.getDeadline();
218 }
219
220 getPath(): string {
221 return this.call.getPath();
222 }
223
224 _write(
225 chunk: ResponseType,
226 encoding: string,
227 // eslint-disable-next-line @typescript-eslint/no-explicit-any
228 callback: (...args: any[]) => void
229 ) {
230 try {
231 const response = this.call.serializeMessage(chunk);
232
233 if (!this.call.write(response)) {
234 this.call.once('drain', callback);
235 return;
236 }
237 } catch (err) {
238 this.emit('error', {
239 details: getErrorMessage(err),
240 code: Status.INTERNAL
241 });
242 }
243
244 callback();
245 }
246
247 _final(callback: Function): void {
248 this.call.sendStatus({
249 code: Status.OK,
250 details: 'OK',
251 metadata: this.trailingMetadata,
252 });
253 callback(null);
254 }
255
256 // eslint-disable-next-line @typescript-eslint/no-explicit-any
257 end(metadata?: any) {
258 if (metadata) {
259 this.trailingMetadata = metadata;
260 }
261
262 return super.end();
263 }
264}
265
266export class ServerDuplexStreamImpl<RequestType, ResponseType>
267 extends Duplex
268 implements ServerDuplexStream<RequestType, ResponseType>
269{
270 cancelled: boolean;
271 private trailingMetadata: Metadata;
272
273 constructor(
274 private call: Http2ServerCallStream<RequestType, ResponseType>,
275 public metadata: Metadata,
276 public serialize: Serialize<ResponseType>,
277 public deserialize: Deserialize<RequestType>,
278 encoding: string
279 ) {
280 super({ objectMode: true });
281 this.cancelled = false;
282 this.trailingMetadata = new Metadata();
283 this.call.setupSurfaceCall(this);
284 this.call.setupReadable(this, encoding);
285
286 this.on('error', (err) => {
287 this.call.sendError(err);
288 this.end();
289 });
290 }
291
292 getPeer(): string {
293 return this.call.getPeer();
294 }
295
296 sendMetadata(responseMetadata: Metadata): void {
297 this.call.sendMetadata(responseMetadata);
298 }
299
300 getDeadline(): Deadline {
301 return this.call.getDeadline();
302 }
303
304 getPath(): string {
305 return this.call.getPath();
306 }
307
308 // eslint-disable-next-line @typescript-eslint/no-explicit-any
309 end(metadata?: any) {
310 if (metadata) {
311 this.trailingMetadata = metadata;
312 }
313
314 return super.end();
315 }
316}
317
318ServerDuplexStreamImpl.prototype._read =
319 ServerReadableStreamImpl.prototype._read;
320ServerDuplexStreamImpl.prototype._write =
321 ServerWritableStreamImpl.prototype._write;
322ServerDuplexStreamImpl.prototype._final =
323 ServerWritableStreamImpl.prototype._final;
324
325// Unary response callback signature.
326export type sendUnaryData<ResponseType> = (
327 error: ServerErrorResponse | ServerStatusResponse | null,
328 value?: ResponseType | null,
329 trailer?: Metadata,
330 flags?: number
331) => void;
332
333// User provided handler for unary calls.
334export type handleUnaryCall<RequestType, ResponseType> = (
335 call: ServerUnaryCall<RequestType, ResponseType>,
336 callback: sendUnaryData<ResponseType>
337) => void;
338
339// User provided handler for client streaming calls.
340export type handleClientStreamingCall<RequestType, ResponseType> = (
341 call: ServerReadableStream<RequestType, ResponseType>,
342 callback: sendUnaryData<ResponseType>
343) => void;
344
345// User provided handler for server streaming calls.
346export type handleServerStreamingCall<RequestType, ResponseType> = (
347 call: ServerWritableStream<RequestType, ResponseType>
348) => void;
349
350// User provided handler for bidirectional streaming calls.
351export type handleBidiStreamingCall<RequestType, ResponseType> = (
352 call: ServerDuplexStream<RequestType, ResponseType>
353) => void;
354
355export type HandleCall<RequestType, ResponseType> =
356 | handleUnaryCall<RequestType, ResponseType>
357 | handleClientStreamingCall<RequestType, ResponseType>
358 | handleServerStreamingCall<RequestType, ResponseType>
359 | handleBidiStreamingCall<RequestType, ResponseType>;
360
361export interface UnaryHandler<RequestType, ResponseType> {
362 func: handleUnaryCall<RequestType, ResponseType>;
363 serialize: Serialize<ResponseType>;
364 deserialize: Deserialize<RequestType>;
365 type: HandlerType;
366 path: string;
367}
368
369export interface ClientStreamingHandler<RequestType, ResponseType> {
370 func: handleClientStreamingCall<RequestType, ResponseType>;
371 serialize: Serialize<ResponseType>;
372 deserialize: Deserialize<RequestType>;
373 type: HandlerType;
374 path: string;
375}
376
377export interface ServerStreamingHandler<RequestType, ResponseType> {
378 func: handleServerStreamingCall<RequestType, ResponseType>;
379 serialize: Serialize<ResponseType>;
380 deserialize: Deserialize<RequestType>;
381 type: HandlerType;
382 path: string;
383}
384
385export interface BidiStreamingHandler<RequestType, ResponseType> {
386 func: handleBidiStreamingCall<RequestType, ResponseType>;
387 serialize: Serialize<ResponseType>;
388 deserialize: Deserialize<RequestType>;
389 type: HandlerType;
390 path: string;
391}
392
393export type Handler<RequestType, ResponseType> =
394 | UnaryHandler<RequestType, ResponseType>
395 | ClientStreamingHandler<RequestType, ResponseType>
396 | ServerStreamingHandler<RequestType, ResponseType>
397 | BidiStreamingHandler<RequestType, ResponseType>;
398
399export type HandlerType = 'bidi' | 'clientStream' | 'serverStream' | 'unary';
400
401// Internal class that wraps the HTTP2 request.
402export class Http2ServerCallStream<
403 RequestType,
404 ResponseType
405> extends EventEmitter {
406 cancelled = false;
407 deadlineTimer: NodeJS.Timer | null = null;
408 private statusSent = false;
409 private deadline: Deadline = Infinity;
410 private wantTrailers = false;
411 private metadataSent = false;
412 private canPush = false;
413 private isPushPending = false;
414 private bufferedMessages: Array<Buffer | null> = [];
415 private messagesToPush: Array<RequestType | null> = [];
416 private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
417 private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
418
419 constructor(
420 private stream: http2.ServerHttp2Stream,
421 private handler: Handler<RequestType, ResponseType>,
422 private options: ChannelOptions
423 ) {
424 super();
425
426 this.stream.once('error', (err: ServerErrorResponse) => {
427 /* We need an error handler to avoid uncaught error event exceptions, but
428 * there is nothing we can reasonably do here. Any error event should
429 * have a corresponding close event, which handles emitting the cancelled
430 * event. And the stream is now in a bad state, so we can't reasonably
431 * expect to be able to send an error over it. */
432 });
433
434 this.stream.once('close', () => {
435 trace(
436 'Request to method ' +
437 this.handler?.path +
438 ' stream closed with rstCode ' +
439 this.stream.rstCode
440 );
441
442 if (!this.statusSent) {
443 this.cancelled = true;
444 this.emit('cancelled', 'cancelled');
445 this.emit('streamEnd', false);
446 this.sendStatus({
447 code: Status.CANCELLED,
448 details: 'Cancelled by client',
449 metadata: null,
450 });
451 }
452 });
453
454 this.stream.on('drain', () => {
455 this.emit('drain');
456 });
457
458 if ('grpc.max_send_message_length' in options) {
459 this.maxSendMessageSize = options['grpc.max_send_message_length']!;
460 }
461 if ('grpc.max_receive_message_length' in options) {
462 this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
463 }
464 }
465
466 private checkCancelled(): boolean {
467 /* In some cases the stream can become destroyed before the close event
468 * fires. That creates a race condition that this check works around */
469 if (this.stream.destroyed || this.stream.closed) {
470 this.cancelled = true;
471 }
472 return this.cancelled;
473 }
474
475 private getDecompressedMessage(
476 message: Buffer,
477 encoding: string
478 ): Buffer | Promise<Buffer> {
479 if (encoding === 'deflate') {
480 return inflate(message.subarray(5));
481 } else if (encoding === 'gzip') {
482 return unzip(message.subarray(5));
483 } else if (encoding === 'identity') {
484 return message.subarray(5);
485 }
486
487 return Promise.reject({
488 code: Status.UNIMPLEMENTED,
489 details: `Received message compressed with unsupported encoding "${encoding}"`,
490 });
491 }
492
493 sendMetadata(customMetadata?: Metadata) {
494 if (this.checkCancelled()) {
495 return;
496 }
497
498 if (this.metadataSent) {
499 return;
500 }
501
502 this.metadataSent = true;
503 const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
504 // TODO(cjihrig): Include compression headers.
505 const headers = { ...defaultResponseHeaders, ...defaultCompressionHeaders, ...custom };
506 this.stream.respond(headers, defaultResponseOptions);
507 }
508
509 receiveMetadata(headers: http2.IncomingHttpHeaders) {
510 const metadata = Metadata.fromHttp2Headers(headers);
511
512 if (logging.isTracerEnabled(TRACER_NAME)) {
513 trace(
514 'Request to ' +
515 this.handler.path +
516 ' received headers ' +
517 JSON.stringify(metadata.toJSON())
518 );
519 }
520
521 // TODO(cjihrig): Receive compression metadata.
522
523 const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
524
525 if (timeoutHeader.length > 0) {
526 const match = timeoutHeader[0].toString().match(DEADLINE_REGEX);
527
528 if (match === null) {
529 const err = new Error('Invalid deadline') as ServerErrorResponse;
530 err.code = Status.OUT_OF_RANGE;
531 this.sendError(err);
532 return metadata;
533 }
534
535 const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
536
537 const now = new Date();
538 this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
539 this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this);
540 metadata.remove(GRPC_TIMEOUT_HEADER);
541 }
542
543 // Remove several headers that should not be propagated to the application
544 metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
545 metadata.remove(http2.constants.HTTP2_HEADER_TE);
546 metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
547 metadata.remove('grpc-accept-encoding');
548
549 return metadata;
550 }
551
552 receiveUnaryMessage(
553 encoding: string,
554 next: (
555 err: Partial<ServerStatusResponse> | null,
556 request?: RequestType
557 ) => void
558 ): void {
559 const { stream } = this;
560
561 let receivedLength = 0;
562 const call = this;
563 const body: Buffer[] = [];
564 const limit = this.maxReceiveMessageSize;
565
566 stream.on('data', onData);
567 stream.on('end', onEnd);
568 stream.on('error', onEnd);
569
570 function onData(chunk: Buffer) {
571 receivedLength += chunk.byteLength;
572
573 if (limit !== -1 && receivedLength > limit) {
574 stream.removeListener('data', onData);
575 stream.removeListener('end', onEnd);
576 stream.removeListener('error', onEnd);
577 next({
578 code: Status.RESOURCE_EXHAUSTED,
579 details: `Received message larger than max (${receivedLength} vs. ${limit})`,
580 });
581 return;
582 }
583
584 body.push(chunk);
585 }
586
587 function onEnd(err?: Error) {
588 stream.removeListener('data', onData);
589 stream.removeListener('end', onEnd);
590 stream.removeListener('error', onEnd);
591
592 if (err !== undefined) {
593 next({ code: Status.INTERNAL, details: err.message });
594 return;
595 }
596
597 if (receivedLength === 0) {
598 next({ code: Status.INTERNAL, details: 'received empty unary message' })
599 return;
600 }
601
602 call.emit('receiveMessage');
603
604 const requestBytes = Buffer.concat(body, receivedLength);
605 const compressed = requestBytes.readUInt8(0) === 1;
606 const compressedMessageEncoding = compressed ? encoding : 'identity';
607 const decompressedMessage = call.getDecompressedMessage(
608 requestBytes,
609 compressedMessageEncoding
610 );
611
612 if (Buffer.isBuffer(decompressedMessage)) {
613 call.safeDeserializeMessage(decompressedMessage, next);
614 return;
615 }
616
617 decompressedMessage.then(
618 (decompressed) => call.safeDeserializeMessage(decompressed, next),
619 (err: any) => next(
620 err.code
621 ? err
622 : {
623 code: Status.INTERNAL,
624 details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
625 }
626 )
627 )
628 }
629 }
630
631 private safeDeserializeMessage(
632 buffer: Buffer,
633 next: (err: Partial<ServerStatusResponse> | null, request?: RequestType) => void
634 ) {
635 try {
636 next(null, this.deserializeMessage(buffer));
637 } catch (err) {
638 next({
639 details: getErrorMessage(err),
640 code: Status.INTERNAL
641 });
642 }
643 }
644
645 serializeMessage(value: ResponseType) {
646 const messageBuffer = this.handler.serialize(value);
647
648 // TODO(cjihrig): Call compression aware serializeMessage().
649 const byteLength = messageBuffer.byteLength;
650 const output = Buffer.allocUnsafe(byteLength + 5);
651 output.writeUInt8(0, 0);
652 output.writeUInt32BE(byteLength, 1);
653 messageBuffer.copy(output, 5);
654 return output;
655 }
656
657 deserializeMessage(bytes: Buffer) {
658 return this.handler.deserialize(bytes);
659 }
660
661 async sendUnaryMessage(
662 err: ServerErrorResponse | ServerStatusResponse | null,
663 value?: ResponseType | null,
664 metadata?: Metadata | null,
665 flags?: number
666 ) {
667 if (this.checkCancelled()) {
668 return;
669 }
670
671 if (metadata === undefined) {
672 metadata = null;
673 }
674
675 if (err) {
676 if (!Object.prototype.hasOwnProperty.call(err, 'metadata') && metadata) {
677 err.metadata = metadata;
678 }
679 this.sendError(err);
680 return;
681 }
682
683 try {
684 const response = this.serializeMessage(value!);
685
686 this.write(response);
687 this.sendStatus({ code: Status.OK, details: 'OK', metadata });
688 } catch (err) {
689 this.sendError({
690 details: getErrorMessage(err),
691 code: Status.INTERNAL
692 });
693 }
694 }
695
696 sendStatus(statusObj: PartialStatusObject) {
697 this.emit('callEnd', statusObj.code);
698 this.emit('streamEnd', statusObj.code === Status.OK);
699 if (this.checkCancelled()) {
700 return;
701 }
702
703 trace(
704 'Request to method ' +
705 this.handler?.path +
706 ' ended with status code: ' +
707 Status[statusObj.code] +
708 ' details: ' +
709 statusObj.details
710 );
711
712 if (this.deadlineTimer) clearTimeout(this.deadlineTimer);
713
714 if (this.stream.headersSent) {
715 if (!this.wantTrailers) {
716 this.wantTrailers = true;
717 this.stream.once('wantTrailers', () => {
718 const trailersToSend = {
719 [GRPC_STATUS_HEADER]: statusObj.code,
720 [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
721 ...statusObj.metadata?.toHttp2Headers(),
722 };
723
724 this.stream.sendTrailers(trailersToSend);
725 this.statusSent = true;
726 });
727 this.stream.end();
728 }
729 } else {
730 // Trailers-only response
731 const trailersToSend = {
732 [GRPC_STATUS_HEADER]: statusObj.code,
733 [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
734 ...defaultResponseHeaders,
735 ...statusObj.metadata?.toHttp2Headers(),
736 };
737 this.stream.respond(trailersToSend, {endStream: true});
738 this.statusSent = true;
739 }
740 }
741
742 sendError(error: ServerErrorResponse | ServerStatusResponse) {
743 const status: PartialStatusObject = {
744 code: Status.UNKNOWN,
745 details: 'message' in error ? error.message : 'Unknown Error',
746 metadata:
747 'metadata' in error && error.metadata !== undefined
748 ? error.metadata
749 : null,
750 };
751
752 if (
753 'code' in error &&
754 typeof error.code === 'number' &&
755 Number.isInteger(error.code)
756 ) {
757 status.code = error.code;
758
759 if ('details' in error && typeof error.details === 'string') {
760 status.details = error.details!;
761 }
762 }
763
764 this.sendStatus(status);
765 }
766
767 write(chunk: Buffer) {
768 if (this.checkCancelled()) {
769 return;
770 }
771
772 if (
773 this.maxSendMessageSize !== -1 &&
774 chunk.length > this.maxSendMessageSize
775 ) {
776 this.sendError({
777 code: Status.RESOURCE_EXHAUSTED,
778 details: `Sent message larger than max (${chunk.length} vs. ${this.maxSendMessageSize})`,
779 });
780 return;
781 }
782
783 this.sendMetadata();
784 this.emit('sendMessage');
785 return this.stream.write(chunk);
786 }
787
788 resume() {
789 this.stream.resume();
790 }
791
792 setupSurfaceCall(call: ServerSurfaceCall) {
793 this.once('cancelled', (reason) => {
794 call.cancelled = true;
795 call.emit('cancelled', reason);
796 });
797
798 this.once('callEnd', (status) => call.emit('callEnd', status));
799 }
800
801 setupReadable(
802 readable:
803 | ServerReadableStream<RequestType, ResponseType>
804 | ServerDuplexStream<RequestType, ResponseType>,
805 encoding: string
806 ) {
807 const decoder = new StreamDecoder();
808
809 let readsDone = false;
810
811 let pendingMessageProcessing = false;
812
813 let pushedEnd = false;
814
815 const maybePushEnd = async () => {
816 if (!pushedEnd && readsDone && !pendingMessageProcessing) {
817 pushedEnd = true;
818 await this.pushOrBufferMessage(readable, null);
819 }
820 };
821
822 this.stream.on('data', async (data: Buffer) => {
823 const messages = decoder.write(data);
824
825 pendingMessageProcessing = true;
826 this.stream.pause();
827 for (const message of messages) {
828 if (
829 this.maxReceiveMessageSize !== -1 &&
830 message.length > this.maxReceiveMessageSize
831 ) {
832 this.sendError({
833 code: Status.RESOURCE_EXHAUSTED,
834 details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
835 });
836 return;
837 }
838 this.emit('receiveMessage');
839
840 const compressed = message.readUInt8(0) === 1;
841 const compressedMessageEncoding = compressed ? encoding : 'identity';
842 const decompressedMessage = await this.getDecompressedMessage(
843 message,
844 compressedMessageEncoding
845 );
846
847 // Encountered an error with decompression; it'll already have been propogated back
848 // Just return early
849 if (!decompressedMessage) return;
850
851 await this.pushOrBufferMessage(readable, decompressedMessage);
852 }
853 pendingMessageProcessing = false;
854 this.stream.resume();
855 await maybePushEnd();
856 });
857
858 this.stream.once('end', async () => {
859 readsDone = true;
860 await maybePushEnd();
861 });
862 }
863
864 consumeUnpushedMessages(
865 readable:
866 | ServerReadableStream<RequestType, ResponseType>
867 | ServerDuplexStream<RequestType, ResponseType>
868 ): boolean {
869 this.canPush = true;
870
871 while (this.messagesToPush.length > 0) {
872 const nextMessage = this.messagesToPush.shift();
873 const canPush = readable.push(nextMessage);
874
875 if (nextMessage === null || canPush === false) {
876 this.canPush = false;
877 break;
878 }
879 }
880
881 return this.canPush;
882 }
883
884 private async pushOrBufferMessage(
885 readable:
886 | ServerReadableStream<RequestType, ResponseType>
887 | ServerDuplexStream<RequestType, ResponseType>,
888 messageBytes: Buffer | null
889 ): Promise<void> {
890 if (this.isPushPending) {
891 this.bufferedMessages.push(messageBytes);
892 } else {
893 await this.pushMessage(readable, messageBytes);
894 }
895 }
896
897 private async pushMessage(
898 readable:
899 | ServerReadableStream<RequestType, ResponseType>
900 | ServerDuplexStream<RequestType, ResponseType>,
901 messageBytes: Buffer | null
902 ) {
903 if (messageBytes === null) {
904 trace('Received end of stream');
905 if (this.canPush) {
906 readable.push(null);
907 } else {
908 this.messagesToPush.push(null);
909 }
910
911 return;
912 }
913
914 trace('Received message of length ' + messageBytes.length);
915
916 this.isPushPending = true;
917
918 try {
919 const deserialized = await this.deserializeMessage(messageBytes);
920
921 if (this.canPush) {
922 if (!readable.push(deserialized)) {
923 this.canPush = false;
924 this.stream.pause();
925 }
926 } else {
927 this.messagesToPush.push(deserialized);
928 }
929 } catch (error) {
930 // Ignore any remaining messages when errors occur.
931 this.bufferedMessages.length = 0;
932 let code = getErrorCode(error);
933 if (code === null || code < Status.OK || code > Status.UNAUTHENTICATED) {
934 code = Status.INTERNAL
935 }
936
937 readable.emit('error', {
938 details: getErrorMessage(error),
939 code: code
940 });
941 }
942
943 this.isPushPending = false;
944
945 if (this.bufferedMessages.length > 0) {
946 await this.pushMessage(
947 readable,
948 this.bufferedMessages.shift() as Buffer | null
949 );
950 }
951 }
952
953 getPeer(): string {
954 const socket = this.stream.session.socket;
955 if (socket.remoteAddress) {
956 if (socket.remotePort) {
957 return `${socket.remoteAddress}:${socket.remotePort}`;
958 } else {
959 return socket.remoteAddress;
960 }
961 } else {
962 return 'unknown';
963 }
964 }
965
966 getDeadline(): Deadline {
967 return this.deadline;
968 }
969
970 getPath(): string {
971 return this.handler.path;
972 }
973}
974
975/* eslint-disable @typescript-eslint/no-explicit-any */
976type UntypedServerCall = Http2ServerCallStream<any, any>;
977
978function handleExpiredDeadline(call: UntypedServerCall) {
979 const err = new Error('Deadline exceeded') as ServerErrorResponse;
980 err.code = Status.DEADLINE_EXCEEDED;
981
982 call.sendError(err);
983 call.cancelled = true;
984 call.emit('cancelled', 'deadline');
985}