UNPKG

27.9 kBPlain TextView Raw
1/*
2 * Copyright 2024 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { PartialStatusObject } from './call-interface';
19import { ServerMethodDefinition } from './make-client';
20import { Metadata } from './metadata';
21import { ChannelOptions } from './channel-options';
22import { Handler, ServerErrorResponse } from './server-call';
23import { Deadline } from './deadline';
24import {
25 DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
26 DEFAULT_MAX_SEND_MESSAGE_LENGTH,
27 LogVerbosity,
28 Status,
29} from './constants';
30import * as http2 from 'http2';
31import { getErrorMessage } from './error';
32import * as zlib from 'zlib';
33import { promisify } from 'util';
34import { StreamDecoder } from './stream-decoder';
35import { CallEventTracker } from './transport';
36import * as logging from './logging';
37
38const unzip = promisify(zlib.unzip);
39const inflate = promisify(zlib.inflate);
40
41const TRACER_NAME = 'server_call';
42
43function trace(text: string) {
44 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
45}
46
47export interface ServerMetadataListener {
48 (metadata: Metadata, next: (metadata: Metadata) => void): void;
49}
50
51export interface ServerMessageListener {
52 // eslint-disable-next-line @typescript-eslint/no-explicit-any
53 (message: any, next: (message: any) => void): void;
54}
55
56export interface ServerHalfCloseListener {
57 (next: () => void): void;
58}
59
60export interface ServerCancelListener {
61 (): void;
62}
63
64export interface FullServerListener {
65 onReceiveMetadata: ServerMetadataListener;
66 onReceiveMessage: ServerMessageListener;
67 onReceiveHalfClose: ServerHalfCloseListener;
68 onCancel: ServerCancelListener;
69}
70
71export type ServerListener = Partial<FullServerListener>;
72
73export class ServerListenerBuilder {
74 private metadata: ServerMetadataListener | undefined = undefined;
75 private message: ServerMessageListener | undefined = undefined;
76 private halfClose: ServerHalfCloseListener | undefined = undefined;
77 private cancel: ServerCancelListener | undefined = undefined;
78
79 withOnReceiveMetadata(onReceiveMetadata: ServerMetadataListener): this {
80 this.metadata = onReceiveMetadata;
81 return this;
82 }
83
84 withOnReceiveMessage(onReceiveMessage: ServerMessageListener): this {
85 this.message = onReceiveMessage;
86 return this;
87 }
88
89 withOnReceiveHalfClose(onReceiveHalfClose: ServerHalfCloseListener): this {
90 this.halfClose = onReceiveHalfClose;
91 return this;
92 }
93
94 withOnCancel(onCancel: ServerCancelListener): this {
95 this.cancel = onCancel;
96 return this;
97 }
98
99 build(): ServerListener {
100 return {
101 onReceiveMetadata: this.metadata,
102 onReceiveMessage: this.message,
103 onReceiveHalfClose: this.halfClose,
104 onCancel: this.cancel,
105 };
106 }
107}
108
109export interface InterceptingServerListener {
110 onReceiveMetadata(metadata: Metadata): void;
111 // eslint-disable-next-line @typescript-eslint/no-explicit-any
112 onReceiveMessage(message: any): void;
113 onReceiveHalfClose(): void;
114 onCancel(): void;
115}
116
117export function isInterceptingServerListener(
118 listener: ServerListener | InterceptingServerListener
119): listener is InterceptingServerListener {
120 return (
121 listener.onReceiveMetadata !== undefined &&
122 listener.onReceiveMetadata.length === 1
123 );
124}
125
126class InterceptingServerListenerImpl implements InterceptingServerListener {
127 /**
128 * Once the call is cancelled, ignore all other events.
129 */
130 private cancelled = false;
131 private processingMetadata = false;
132 private hasPendingMessage = false;
133 private pendingMessage: any = null;
134 private processingMessage = false;
135 private hasPendingHalfClose = false;
136
137 constructor(
138 private listener: FullServerListener,
139 private nextListener: InterceptingServerListener
140 ) {}
141
142 private processPendingMessage() {
143 if (this.hasPendingMessage) {
144 this.nextListener.onReceiveMessage(this.pendingMessage);
145 this.pendingMessage = null;
146 this.hasPendingMessage = false;
147 }
148 }
149
150 private processPendingHalfClose() {
151 if (this.hasPendingHalfClose) {
152 this.nextListener.onReceiveHalfClose();
153 this.hasPendingHalfClose = false;
154 }
155 }
156
157 onReceiveMetadata(metadata: Metadata): void {
158 if (this.cancelled) {
159 return;
160 }
161 this.processingMetadata = true;
162 this.listener.onReceiveMetadata(metadata, interceptedMetadata => {
163 this.processingMetadata = false;
164 if (this.cancelled) {
165 return;
166 }
167 this.nextListener.onReceiveMetadata(interceptedMetadata);
168 this.processPendingMessage();
169 this.processPendingHalfClose();
170 });
171 }
172 onReceiveMessage(message: any): void {
173 if (this.cancelled) {
174 return;
175 }
176 this.processingMessage = true;
177 this.listener.onReceiveMessage(message, msg => {
178 this.processingMessage = false;
179 if (this.cancelled) {
180 return;
181 }
182 if (this.processingMetadata) {
183 this.pendingMessage = msg;
184 this.hasPendingMessage = true;
185 } else {
186 this.nextListener.onReceiveMessage(msg);
187 this.processPendingHalfClose();
188 }
189 });
190 }
191 onReceiveHalfClose(): void {
192 if (this.cancelled) {
193 return;
194 }
195 this.listener.onReceiveHalfClose(() => {
196 if (this.cancelled) {
197 return;
198 }
199 if (this.processingMetadata || this.processingMessage) {
200 this.hasPendingHalfClose = true;
201 } else {
202 this.nextListener.onReceiveHalfClose();
203 }
204 });
205 }
206 onCancel(): void {
207 this.cancelled = true;
208 this.listener.onCancel();
209 this.nextListener.onCancel();
210 }
211}
212
213export interface StartResponder {
214 (next: (listener?: ServerListener) => void): void;
215}
216
217export interface MetadataResponder {
218 (metadata: Metadata, next: (metadata: Metadata) => void): void;
219}
220
221export interface MessageResponder {
222 // eslint-disable-next-line @typescript-eslint/no-explicit-any
223 (message: any, next: (message: any) => void): void;
224}
225
226export interface StatusResponder {
227 (
228 status: PartialStatusObject,
229 next: (status: PartialStatusObject) => void
230 ): void;
231}
232
233export interface FullResponder {
234 start: StartResponder;
235 sendMetadata: MetadataResponder;
236 sendMessage: MessageResponder;
237 sendStatus: StatusResponder;
238}
239
240export type Responder = Partial<FullResponder>;
241
242export class ResponderBuilder {
243 private start: StartResponder | undefined = undefined;
244 private metadata: MetadataResponder | undefined = undefined;
245 private message: MessageResponder | undefined = undefined;
246 private status: StatusResponder | undefined = undefined;
247
248 withStart(start: StartResponder): this {
249 this.start = start;
250 return this;
251 }
252
253 withSendMetadata(sendMetadata: MetadataResponder): this {
254 this.metadata = sendMetadata;
255 return this;
256 }
257
258 withSendMessage(sendMessage: MessageResponder): this {
259 this.message = sendMessage;
260 return this;
261 }
262
263 withSendStatus(sendStatus: StatusResponder): this {
264 this.status = sendStatus;
265 return this;
266 }
267
268 build(): Responder {
269 return {
270 start: this.start,
271 sendMetadata: this.metadata,
272 sendMessage: this.message,
273 sendStatus: this.status,
274 };
275 }
276}
277
278const defaultServerListener: FullServerListener = {
279 onReceiveMetadata: (metadata, next) => {
280 next(metadata);
281 },
282 onReceiveMessage: (message, next) => {
283 next(message);
284 },
285 onReceiveHalfClose: next => {
286 next();
287 },
288 onCancel: () => {},
289};
290
291const defaultResponder: FullResponder = {
292 start: next => {
293 next();
294 },
295 sendMetadata: (metadata, next) => {
296 next(metadata);
297 },
298 sendMessage: (message, next) => {
299 next(message);
300 },
301 sendStatus: (status, next) => {
302 next(status);
303 },
304};
305
306export interface ServerInterceptingCallInterface {
307 /**
308 * Register the listener to handle inbound events.
309 */
310 start(listener: InterceptingServerListener): void;
311 /**
312 * Send response metadata.
313 */
314 sendMetadata(metadata: Metadata): void;
315 /**
316 * Send a response message.
317 */
318 sendMessage(message: any, callback: () => void): void;
319 /**
320 * End the call by sending this status.
321 */
322 sendStatus(status: PartialStatusObject): void;
323 /**
324 * Start a single read, eventually triggering either listener.onReceiveMessage or listener.onReceiveHalfClose.
325 */
326 startRead(): void;
327 /**
328 * Return the peer address of the client making the request, if known, or "unknown" otherwise
329 */
330 getPeer(): string;
331 /**
332 * Return the call deadline set by the client. The value is Infinity if there is no deadline.
333 */
334 getDeadline(): Deadline;
335}
336
337export class ServerInterceptingCall implements ServerInterceptingCallInterface {
338 private responder: FullResponder;
339 private processingMetadata = false;
340 private processingMessage = false;
341 private pendingMessage: any = null;
342 private pendingMessageCallback: (() => void) | null = null;
343 private pendingStatus: PartialStatusObject | null = null;
344 constructor(
345 private nextCall: ServerInterceptingCallInterface,
346 responder?: Responder
347 ) {
348 this.responder = {
349 start: responder?.start ?? defaultResponder.start,
350 sendMetadata: responder?.sendMetadata ?? defaultResponder.sendMetadata,
351 sendMessage: responder?.sendMessage ?? defaultResponder.sendMessage,
352 sendStatus: responder?.sendStatus ?? defaultResponder.sendStatus,
353 };
354 }
355
356 private processPendingMessage() {
357 if (this.pendingMessageCallback) {
358 this.nextCall.sendMessage(
359 this.pendingMessage,
360 this.pendingMessageCallback
361 );
362 this.pendingMessage = null;
363 this.pendingMessageCallback = null;
364 }
365 }
366
367 private processPendingStatus() {
368 if (this.pendingStatus) {
369 this.nextCall.sendStatus(this.pendingStatus);
370 this.pendingStatus = null;
371 }
372 }
373
374 start(listener: InterceptingServerListener): void {
375 this.responder.start(interceptedListener => {
376 const fullInterceptedListener: FullServerListener = {
377 onReceiveMetadata:
378 interceptedListener?.onReceiveMetadata ??
379 defaultServerListener.onReceiveMetadata,
380 onReceiveMessage:
381 interceptedListener?.onReceiveMessage ??
382 defaultServerListener.onReceiveMessage,
383 onReceiveHalfClose:
384 interceptedListener?.onReceiveHalfClose ??
385 defaultServerListener.onReceiveHalfClose,
386 onCancel:
387 interceptedListener?.onCancel ?? defaultServerListener.onCancel,
388 };
389 const finalInterceptingListener = new InterceptingServerListenerImpl(
390 fullInterceptedListener,
391 listener
392 );
393 this.nextCall.start(finalInterceptingListener);
394 });
395 }
396 sendMetadata(metadata: Metadata): void {
397 this.processingMetadata = true;
398 this.responder.sendMetadata(metadata, interceptedMetadata => {
399 this.processingMetadata = false;
400 this.nextCall.sendMetadata(interceptedMetadata);
401 this.processPendingMessage();
402 this.processPendingStatus();
403 });
404 }
405 sendMessage(message: any, callback: () => void): void {
406 this.processingMessage = true;
407 this.responder.sendMessage(message, interceptedMessage => {
408 this.processingMessage = false;
409 if (this.processingMetadata) {
410 this.pendingMessage = interceptedMessage;
411 this.pendingMessageCallback = callback;
412 } else {
413 this.nextCall.sendMessage(interceptedMessage, callback);
414 }
415 });
416 }
417 sendStatus(status: PartialStatusObject): void {
418 this.responder.sendStatus(status, interceptedStatus => {
419 if (this.processingMetadata || this.processingMessage) {
420 this.pendingStatus = interceptedStatus;
421 } else {
422 this.nextCall.sendStatus(interceptedStatus);
423 }
424 });
425 }
426 startRead(): void {
427 this.nextCall.startRead();
428 }
429 getPeer(): string {
430 return this.nextCall.getPeer();
431 }
432 getDeadline(): Deadline {
433 return this.nextCall.getDeadline();
434 }
435}
436
437export interface ServerInterceptor {
438 (
439 methodDescriptor: ServerMethodDefinition<any, any>,
440 call: ServerInterceptingCallInterface
441 ): ServerInterceptingCall;
442}
443
444interface DeadlineUnitIndexSignature {
445 [name: string]: number;
446}
447
448const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
449const GRPC_ENCODING_HEADER = 'grpc-encoding';
450const GRPC_MESSAGE_HEADER = 'grpc-message';
451const GRPC_STATUS_HEADER = 'grpc-status';
452const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
453const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
454const deadlineUnitsToMs: DeadlineUnitIndexSignature = {
455 H: 3600000,
456 M: 60000,
457 S: 1000,
458 m: 1,
459 u: 0.001,
460 n: 0.000001,
461};
462
463const defaultCompressionHeaders = {
464 // TODO(cjihrig): Remove these encoding headers from the default response
465 // once compression is integrated.
466 [GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
467 [GRPC_ENCODING_HEADER]: 'identity',
468};
469const defaultResponseHeaders = {
470 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
471 [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
472};
473const defaultResponseOptions = {
474 waitForTrailers: true,
475} as http2.ServerStreamResponseOptions;
476
477type ReadQueueEntryType = 'COMPRESSED' | 'READABLE' | 'HALF_CLOSE';
478
479interface ReadQueueEntry {
480 type: ReadQueueEntryType;
481 compressedMessage: Buffer | null;
482 parsedMessage: any;
483}
484
485export class BaseServerInterceptingCall
486 implements ServerInterceptingCallInterface
487{
488 private listener: InterceptingServerListener | null = null;
489 private metadata: Metadata;
490 private deadlineTimer: NodeJS.Timeout | null = null;
491 private deadline: Deadline = Infinity;
492 private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
493 private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
494 private cancelled = false;
495 private metadataSent = false;
496 private wantTrailers = false;
497 private cancelNotified = false;
498 private incomingEncoding = 'identity';
499 private decoder = new StreamDecoder();
500 private readQueue: ReadQueueEntry[] = [];
501 private isReadPending = false;
502 private receivedHalfClose = false;
503 private streamEnded = false;
504
505 constructor(
506 private readonly stream: http2.ServerHttp2Stream,
507 headers: http2.IncomingHttpHeaders,
508 private readonly callEventTracker: CallEventTracker | null,
509 private readonly handler: Handler<any, any>,
510 options: ChannelOptions
511 ) {
512 this.stream.once('error', (err: ServerErrorResponse) => {
513 /* We need an error handler to avoid uncaught error event exceptions, but
514 * there is nothing we can reasonably do here. Any error event should
515 * have a corresponding close event, which handles emitting the cancelled
516 * event. And the stream is now in a bad state, so we can't reasonably
517 * expect to be able to send an error over it. */
518 });
519
520 this.stream.once('close', () => {
521 trace(
522 'Request to method ' +
523 this.handler?.path +
524 ' stream closed with rstCode ' +
525 this.stream.rstCode
526 );
527
528 if (this.callEventTracker && !this.streamEnded) {
529 this.streamEnded = true;
530 this.callEventTracker.onStreamEnd(false);
531 this.callEventTracker.onCallEnd({
532 code: Status.CANCELLED,
533 details: 'Stream closed before sending status',
534 metadata: null,
535 });
536 }
537
538 this.notifyOnCancel();
539 });
540
541 this.stream.on('data', (data: Buffer) => {
542 this.handleDataFrame(data);
543 });
544 this.stream.pause();
545
546 this.stream.on('end', () => {
547 this.handleEndEvent();
548 });
549
550 if ('grpc.max_send_message_length' in options) {
551 this.maxSendMessageSize = options['grpc.max_send_message_length']!;
552 }
553 if ('grpc.max_receive_message_length' in options) {
554 this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
555 }
556
557 const metadata = Metadata.fromHttp2Headers(headers);
558
559 if (logging.isTracerEnabled(TRACER_NAME)) {
560 trace(
561 'Request to ' +
562 this.handler.path +
563 ' received headers ' +
564 JSON.stringify(metadata.toJSON())
565 );
566 }
567
568 const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
569
570 if (timeoutHeader.length > 0) {
571 this.handleTimeoutHeader(timeoutHeader[0] as string);
572 }
573
574 const encodingHeader = metadata.get(GRPC_ENCODING_HEADER);
575
576 if (encodingHeader.length > 0) {
577 this.incomingEncoding = encodingHeader[0] as string;
578 }
579
580 // Remove several headers that should not be propagated to the application
581 metadata.remove(GRPC_TIMEOUT_HEADER);
582 metadata.remove(GRPC_ENCODING_HEADER);
583 metadata.remove(GRPC_ACCEPT_ENCODING_HEADER);
584 metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
585 metadata.remove(http2.constants.HTTP2_HEADER_TE);
586 metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
587 this.metadata = metadata;
588 }
589
590 private handleTimeoutHeader(timeoutHeader: string) {
591 const match = timeoutHeader.toString().match(DEADLINE_REGEX);
592
593 if (match === null) {
594 const status: PartialStatusObject = {
595 code: Status.INTERNAL,
596 details: `Invalid ${GRPC_TIMEOUT_HEADER} value "${timeoutHeader}"`,
597 metadata: null,
598 };
599 // Wait for the constructor to complete before sending the error.
600 process.nextTick(() => {
601 this.sendStatus(status);
602 });
603 return;
604 }
605
606 const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
607
608 const now = new Date();
609 this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
610 this.deadlineTimer = setTimeout(() => {
611 const status: PartialStatusObject = {
612 code: Status.DEADLINE_EXCEEDED,
613 details: 'Deadline exceeded',
614 metadata: null,
615 };
616 this.sendStatus(status);
617 }, timeout);
618 }
619
620 private checkCancelled(): boolean {
621 /* In some cases the stream can become destroyed before the close event
622 * fires. That creates a race condition that this check works around */
623 if (!this.cancelled && (this.stream.destroyed || this.stream.closed)) {
624 this.notifyOnCancel();
625 this.cancelled = true;
626 }
627 return this.cancelled;
628 }
629 private notifyOnCancel() {
630 if (this.cancelNotified) {
631 return;
632 }
633 this.cancelNotified = true;
634 this.cancelled = true;
635 process.nextTick(() => {
636 this.listener?.onCancel();
637 });
638 if (this.deadlineTimer) {
639 clearTimeout(this.deadlineTimer);
640 }
641 // Flush incoming data frames
642 this.stream.resume();
643 }
644
645 /**
646 * A server handler can start sending messages without explicitly sending
647 * metadata. In that case, we need to send headers before sending any
648 * messages. This function does that if necessary.
649 */
650 private maybeSendMetadata() {
651 if (!this.metadataSent) {
652 this.sendMetadata(new Metadata());
653 }
654 }
655
656 /**
657 * Serialize a message to a length-delimited byte string.
658 * @param value
659 * @returns
660 */
661 private serializeMessage(value: any) {
662 const messageBuffer = this.handler.serialize(value);
663 const byteLength = messageBuffer.byteLength;
664 const output = Buffer.allocUnsafe(byteLength + 5);
665 /* Note: response compression is currently not supported, so this
666 * compressed bit is always 0. */
667 output.writeUInt8(0, 0);
668 output.writeUInt32BE(byteLength, 1);
669 messageBuffer.copy(output, 5);
670 return output;
671 }
672
673 private decompressMessage(
674 message: Buffer,
675 encoding: string
676 ): Buffer | Promise<Buffer> {
677 switch (encoding) {
678 case 'deflate':
679 return inflate(message.subarray(5));
680 case 'gzip':
681 return unzip(message.subarray(5));
682 case 'identity':
683 return message.subarray(5);
684 default:
685 return Promise.reject({
686 code: Status.UNIMPLEMENTED,
687 details: `Received message compressed with unsupported encoding "${encoding}"`,
688 });
689 }
690 }
691
692 private async decompressAndMaybePush(queueEntry: ReadQueueEntry) {
693 if (queueEntry.type !== 'COMPRESSED') {
694 throw new Error(`Invalid queue entry type: ${queueEntry.type}`);
695 }
696
697 const compressed = queueEntry.compressedMessage!.readUInt8(0) === 1;
698 const compressedMessageEncoding = compressed
699 ? this.incomingEncoding
700 : 'identity';
701 const decompressedMessage = await this.decompressMessage(
702 queueEntry.compressedMessage!,
703 compressedMessageEncoding
704 );
705 try {
706 queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage);
707 } catch (err) {
708 this.sendStatus({
709 code: Status.INTERNAL,
710 details: `Error deserializing request: ${(err as Error).message}`,
711 });
712 return;
713 }
714 queueEntry.type = 'READABLE';
715 this.maybePushNextMessage();
716 }
717
718 private maybePushNextMessage() {
719 if (
720 this.listener &&
721 this.isReadPending &&
722 this.readQueue.length > 0 &&
723 this.readQueue[0].type !== 'COMPRESSED'
724 ) {
725 this.isReadPending = false;
726 const nextQueueEntry = this.readQueue.shift()!;
727 if (nextQueueEntry.type === 'READABLE') {
728 this.listener.onReceiveMessage(nextQueueEntry.parsedMessage);
729 } else {
730 // nextQueueEntry.type === 'HALF_CLOSE'
731 this.listener.onReceiveHalfClose();
732 }
733 }
734 }
735
736 private handleDataFrame(data: Buffer) {
737 if (this.checkCancelled()) {
738 return;
739 }
740 trace(
741 'Request to ' +
742 this.handler.path +
743 ' received data frame of size ' +
744 data.length
745 );
746 const rawMessages = this.decoder.write(data);
747
748 for (const messageBytes of rawMessages) {
749 this.stream.pause();
750 if (
751 this.maxReceiveMessageSize !== -1 &&
752 messageBytes.length - 5 > this.maxReceiveMessageSize
753 ) {
754 this.sendStatus({
755 code: Status.RESOURCE_EXHAUSTED,
756 details: `Received message larger than max (${
757 messageBytes.length - 5
758 } vs. ${this.maxReceiveMessageSize})`,
759 metadata: null,
760 });
761 return;
762 }
763 const queueEntry: ReadQueueEntry = {
764 type: 'COMPRESSED',
765 compressedMessage: messageBytes,
766 parsedMessage: null,
767 };
768 this.readQueue.push(queueEntry);
769 this.decompressAndMaybePush(queueEntry);
770 this.callEventTracker?.addMessageReceived();
771 }
772 }
773 private handleEndEvent() {
774 this.readQueue.push({
775 type: 'HALF_CLOSE',
776 compressedMessage: null,
777 parsedMessage: null,
778 });
779 this.receivedHalfClose = true;
780 this.maybePushNextMessage();
781 }
782 start(listener: InterceptingServerListener): void {
783 trace('Request to ' + this.handler.path + ' start called');
784 if (this.checkCancelled()) {
785 return;
786 }
787 this.listener = listener;
788 listener.onReceiveMetadata(this.metadata);
789 }
790 sendMetadata(metadata: Metadata): void {
791 if (this.checkCancelled()) {
792 return;
793 }
794
795 if (this.metadataSent) {
796 return;
797 }
798
799 this.metadataSent = true;
800 const custom = metadata ? metadata.toHttp2Headers() : null;
801 const headers = {
802 ...defaultResponseHeaders,
803 ...defaultCompressionHeaders,
804 ...custom,
805 };
806 this.stream.respond(headers, defaultResponseOptions);
807 }
808 sendMessage(message: any, callback: () => void): void {
809 if (this.checkCancelled()) {
810 return;
811 }
812 let response: Buffer;
813 try {
814 response = this.serializeMessage(message);
815 } catch (e) {
816 this.sendStatus({
817 code: Status.INTERNAL,
818 details: `Error serializing response: ${getErrorMessage(e)}`,
819 metadata: null,
820 });
821 return;
822 }
823
824 if (
825 this.maxSendMessageSize !== -1 &&
826 response.length - 5 > this.maxSendMessageSize
827 ) {
828 this.sendStatus({
829 code: Status.RESOURCE_EXHAUSTED,
830 details: `Sent message larger than max (${response.length} vs. ${this.maxSendMessageSize})`,
831 metadata: null,
832 });
833 return;
834 }
835 this.maybeSendMetadata();
836 trace(
837 'Request to ' +
838 this.handler.path +
839 ' sent data frame of size ' +
840 response.length
841 );
842 this.stream.write(response, error => {
843 if (error) {
844 this.sendStatus({
845 code: Status.INTERNAL,
846 details: `Error writing message: ${getErrorMessage(error)}`,
847 metadata: null,
848 });
849 return;
850 }
851 this.callEventTracker?.addMessageSent();
852 callback();
853 });
854 }
855 sendStatus(status: PartialStatusObject): void {
856 if (this.checkCancelled()) {
857 return;
858 }
859
860 trace(
861 'Request to method ' +
862 this.handler?.path +
863 ' ended with status code: ' +
864 Status[status.code] +
865 ' details: ' +
866 status.details
867 );
868
869 if (this.metadataSent) {
870 if (!this.wantTrailers) {
871 this.wantTrailers = true;
872 this.stream.once('wantTrailers', () => {
873 if (this.callEventTracker && !this.streamEnded) {
874 this.streamEnded = true;
875 this.callEventTracker.onStreamEnd(true);
876 this.callEventTracker.onCallEnd(status);
877 }
878 const trailersToSend = {
879 [GRPC_STATUS_HEADER]: status.code,
880 [GRPC_MESSAGE_HEADER]: encodeURI(status.details),
881 ...status.metadata?.toHttp2Headers(),
882 };
883
884 this.stream.sendTrailers(trailersToSend);
885 this.notifyOnCancel();
886 });
887 this.stream.end();
888 } else {
889 this.notifyOnCancel();
890 }
891 } else {
892 if (this.callEventTracker && !this.streamEnded) {
893 this.streamEnded = true;
894 this.callEventTracker.onStreamEnd(true);
895 this.callEventTracker.onCallEnd(status);
896 }
897 // Trailers-only response
898 const trailersToSend = {
899 [GRPC_STATUS_HEADER]: status.code,
900 [GRPC_MESSAGE_HEADER]: encodeURI(status.details),
901 ...defaultResponseHeaders,
902 ...status.metadata?.toHttp2Headers(),
903 };
904 this.stream.respond(trailersToSend, { endStream: true });
905 this.notifyOnCancel();
906 }
907 }
908 startRead(): void {
909 trace('Request to ' + this.handler.path + ' startRead called');
910 if (this.checkCancelled()) {
911 return;
912 }
913 this.isReadPending = true;
914 if (this.readQueue.length === 0) {
915 if (!this.receivedHalfClose) {
916 this.stream.resume();
917 }
918 } else {
919 this.maybePushNextMessage();
920 }
921 }
922 getPeer(): string {
923 const socket = this.stream.session?.socket;
924 if (socket?.remoteAddress) {
925 if (socket.remotePort) {
926 return `${socket.remoteAddress}:${socket.remotePort}`;
927 } else {
928 return socket.remoteAddress;
929 }
930 } else {
931 return 'unknown';
932 }
933 }
934 getDeadline(): Deadline {
935 return this.deadline;
936 }
937}
938
939export function getServerInterceptingCall(
940 interceptors: ServerInterceptor[],
941 stream: http2.ServerHttp2Stream,
942 headers: http2.IncomingHttpHeaders,
943 callEventTracker: CallEventTracker | null,
944 handler: Handler<any, any>,
945 options: ChannelOptions
946) {
947 const methodDefinition: ServerMethodDefinition<any, any> = {
948 path: handler.path,
949 requestStream: handler.type === 'clientStream' || handler.type === 'bidi',
950 responseStream: handler.type === 'serverStream' || handler.type === 'bidi',
951 requestDeserialize: handler.deserialize,
952 responseSerialize: handler.serialize,
953 };
954 const baseCall = new BaseServerInterceptingCall(
955 stream,
956 headers,
957 callEventTracker,
958 handler,
959 options
960 );
961 return interceptors.reduce(
962 (call: ServerInterceptingCallInterface, interceptor: ServerInterceptor) => {
963 return interceptor(methodDefinition, call);
964 },
965 baseCall
966 );
967}
968
\No newline at end of file