1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { PartialStatusObject } from './call-interface';
|
19 | import { ServerMethodDefinition } from './make-client';
|
20 | import { Metadata } from './metadata';
|
21 | import { ChannelOptions } from './channel-options';
|
22 | import { Handler, ServerErrorResponse } from './server-call';
|
23 | import { Deadline } from './deadline';
|
24 | import {
|
25 | DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
|
26 | DEFAULT_MAX_SEND_MESSAGE_LENGTH,
|
27 | LogVerbosity,
|
28 | Status,
|
29 | } from './constants';
|
30 | import * as http2 from 'http2';
|
31 | import { getErrorMessage } from './error';
|
32 | import * as zlib from 'zlib';
|
33 | import { promisify } from 'util';
|
34 | import { StreamDecoder } from './stream-decoder';
|
35 | import { CallEventTracker } from './transport';
|
36 | import * as logging from './logging';
|
37 |
|
38 | const unzip = promisify(zlib.unzip);
|
39 | const inflate = promisify(zlib.inflate);
|
40 |
|
41 | const TRACER_NAME = 'server_call';
|
42 |
|
43 | function trace(text: string) {
|
44 | logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
45 | }
|
46 |
|
47 | export interface ServerMetadataListener {
|
48 | (metadata: Metadata, next: (metadata: Metadata) => void): void;
|
49 | }
|
50 |
|
51 | export interface ServerMessageListener {
|
52 |
|
53 | (message: any, next: (message: any) => void): void;
|
54 | }
|
55 |
|
56 | export interface ServerHalfCloseListener {
|
57 | (next: () => void): void;
|
58 | }
|
59 |
|
60 | export interface ServerCancelListener {
|
61 | (): void;
|
62 | }
|
63 |
|
64 | export interface FullServerListener {
|
65 | onReceiveMetadata: ServerMetadataListener;
|
66 | onReceiveMessage: ServerMessageListener;
|
67 | onReceiveHalfClose: ServerHalfCloseListener;
|
68 | onCancel: ServerCancelListener;
|
69 | }
|
70 |
|
71 | export type ServerListener = Partial<FullServerListener>;
|
72 |
|
73 | export class ServerListenerBuilder {
|
74 | private metadata: ServerMetadataListener | undefined = undefined;
|
75 | private message: ServerMessageListener | undefined = undefined;
|
76 | private halfClose: ServerHalfCloseListener | undefined = undefined;
|
77 | private cancel: ServerCancelListener | undefined = undefined;
|
78 |
|
79 | withOnReceiveMetadata(onReceiveMetadata: ServerMetadataListener): this {
|
80 | this.metadata = onReceiveMetadata;
|
81 | return this;
|
82 | }
|
83 |
|
84 | withOnReceiveMessage(onReceiveMessage: ServerMessageListener): this {
|
85 | this.message = onReceiveMessage;
|
86 | return this;
|
87 | }
|
88 |
|
89 | withOnReceiveHalfClose(onReceiveHalfClose: ServerHalfCloseListener): this {
|
90 | this.halfClose = onReceiveHalfClose;
|
91 | return this;
|
92 | }
|
93 |
|
94 | withOnCancel(onCancel: ServerCancelListener): this {
|
95 | this.cancel = onCancel;
|
96 | return this;
|
97 | }
|
98 |
|
99 | build(): ServerListener {
|
100 | return {
|
101 | onReceiveMetadata: this.metadata,
|
102 | onReceiveMessage: this.message,
|
103 | onReceiveHalfClose: this.halfClose,
|
104 | onCancel: this.cancel,
|
105 | };
|
106 | }
|
107 | }
|
108 |
|
109 | export interface InterceptingServerListener {
|
110 | onReceiveMetadata(metadata: Metadata): void;
|
111 |
|
112 | onReceiveMessage(message: any): void;
|
113 | onReceiveHalfClose(): void;
|
114 | onCancel(): void;
|
115 | }
|
116 |
|
117 | export function isInterceptingServerListener(
|
118 | listener: ServerListener | InterceptingServerListener
|
119 | ): listener is InterceptingServerListener {
|
120 | return (
|
121 | listener.onReceiveMetadata !== undefined &&
|
122 | listener.onReceiveMetadata.length === 1
|
123 | );
|
124 | }
|
125 |
|
126 | class InterceptingServerListenerImpl implements InterceptingServerListener {
|
127 | |
128 |
|
129 |
|
130 | private cancelled = false;
|
131 | private processingMetadata = false;
|
132 | private hasPendingMessage = false;
|
133 | private pendingMessage: any = null;
|
134 | private processingMessage = false;
|
135 | private hasPendingHalfClose = false;
|
136 |
|
137 | constructor(
|
138 | private listener: FullServerListener,
|
139 | private nextListener: InterceptingServerListener
|
140 | ) {}
|
141 |
|
142 | private processPendingMessage() {
|
143 | if (this.hasPendingMessage) {
|
144 | this.nextListener.onReceiveMessage(this.pendingMessage);
|
145 | this.pendingMessage = null;
|
146 | this.hasPendingMessage = false;
|
147 | }
|
148 | }
|
149 |
|
150 | private processPendingHalfClose() {
|
151 | if (this.hasPendingHalfClose) {
|
152 | this.nextListener.onReceiveHalfClose();
|
153 | this.hasPendingHalfClose = false;
|
154 | }
|
155 | }
|
156 |
|
157 | onReceiveMetadata(metadata: Metadata): void {
|
158 | if (this.cancelled) {
|
159 | return;
|
160 | }
|
161 | this.processingMetadata = true;
|
162 | this.listener.onReceiveMetadata(metadata, interceptedMetadata => {
|
163 | this.processingMetadata = false;
|
164 | if (this.cancelled) {
|
165 | return;
|
166 | }
|
167 | this.nextListener.onReceiveMetadata(interceptedMetadata);
|
168 | this.processPendingMessage();
|
169 | this.processPendingHalfClose();
|
170 | });
|
171 | }
|
172 | onReceiveMessage(message: any): void {
|
173 | if (this.cancelled) {
|
174 | return;
|
175 | }
|
176 | this.processingMessage = true;
|
177 | this.listener.onReceiveMessage(message, msg => {
|
178 | this.processingMessage = false;
|
179 | if (this.cancelled) {
|
180 | return;
|
181 | }
|
182 | if (this.processingMetadata) {
|
183 | this.pendingMessage = msg;
|
184 | this.hasPendingMessage = true;
|
185 | } else {
|
186 | this.nextListener.onReceiveMessage(msg);
|
187 | this.processPendingHalfClose();
|
188 | }
|
189 | });
|
190 | }
|
191 | onReceiveHalfClose(): void {
|
192 | if (this.cancelled) {
|
193 | return;
|
194 | }
|
195 | this.listener.onReceiveHalfClose(() => {
|
196 | if (this.cancelled) {
|
197 | return;
|
198 | }
|
199 | if (this.processingMetadata || this.processingMessage) {
|
200 | this.hasPendingHalfClose = true;
|
201 | } else {
|
202 | this.nextListener.onReceiveHalfClose();
|
203 | }
|
204 | });
|
205 | }
|
206 | onCancel(): void {
|
207 | this.cancelled = true;
|
208 | this.listener.onCancel();
|
209 | this.nextListener.onCancel();
|
210 | }
|
211 | }
|
212 |
|
213 | export interface StartResponder {
|
214 | (next: (listener?: ServerListener) => void): void;
|
215 | }
|
216 |
|
217 | export interface MetadataResponder {
|
218 | (metadata: Metadata, next: (metadata: Metadata) => void): void;
|
219 | }
|
220 |
|
221 | export interface MessageResponder {
|
222 |
|
223 | (message: any, next: (message: any) => void): void;
|
224 | }
|
225 |
|
226 | export interface StatusResponder {
|
227 | (
|
228 | status: PartialStatusObject,
|
229 | next: (status: PartialStatusObject) => void
|
230 | ): void;
|
231 | }
|
232 |
|
233 | export interface FullResponder {
|
234 | start: StartResponder;
|
235 | sendMetadata: MetadataResponder;
|
236 | sendMessage: MessageResponder;
|
237 | sendStatus: StatusResponder;
|
238 | }
|
239 |
|
240 | export type Responder = Partial<FullResponder>;
|
241 |
|
242 | export class ResponderBuilder {
|
243 | private start: StartResponder | undefined = undefined;
|
244 | private metadata: MetadataResponder | undefined = undefined;
|
245 | private message: MessageResponder | undefined = undefined;
|
246 | private status: StatusResponder | undefined = undefined;
|
247 |
|
248 | withStart(start: StartResponder): this {
|
249 | this.start = start;
|
250 | return this;
|
251 | }
|
252 |
|
253 | withSendMetadata(sendMetadata: MetadataResponder): this {
|
254 | this.metadata = sendMetadata;
|
255 | return this;
|
256 | }
|
257 |
|
258 | withSendMessage(sendMessage: MessageResponder): this {
|
259 | this.message = sendMessage;
|
260 | return this;
|
261 | }
|
262 |
|
263 | withSendStatus(sendStatus: StatusResponder): this {
|
264 | this.status = sendStatus;
|
265 | return this;
|
266 | }
|
267 |
|
268 | build(): Responder {
|
269 | return {
|
270 | start: this.start,
|
271 | sendMetadata: this.metadata,
|
272 | sendMessage: this.message,
|
273 | sendStatus: this.status,
|
274 | };
|
275 | }
|
276 | }
|
277 |
|
278 | const defaultServerListener: FullServerListener = {
|
279 | onReceiveMetadata: (metadata, next) => {
|
280 | next(metadata);
|
281 | },
|
282 | onReceiveMessage: (message, next) => {
|
283 | next(message);
|
284 | },
|
285 | onReceiveHalfClose: next => {
|
286 | next();
|
287 | },
|
288 | onCancel: () => {},
|
289 | };
|
290 |
|
291 | const defaultResponder: FullResponder = {
|
292 | start: next => {
|
293 | next();
|
294 | },
|
295 | sendMetadata: (metadata, next) => {
|
296 | next(metadata);
|
297 | },
|
298 | sendMessage: (message, next) => {
|
299 | next(message);
|
300 | },
|
301 | sendStatus: (status, next) => {
|
302 | next(status);
|
303 | },
|
304 | };
|
305 |
|
306 | export interface ServerInterceptingCallInterface {
|
307 | |
308 |
|
309 |
|
310 | start(listener: InterceptingServerListener): void;
|
311 | |
312 |
|
313 |
|
314 | sendMetadata(metadata: Metadata): void;
|
315 | |
316 |
|
317 |
|
318 | sendMessage(message: any, callback: () => void): void;
|
319 | |
320 |
|
321 |
|
322 | sendStatus(status: PartialStatusObject): void;
|
323 | |
324 |
|
325 |
|
326 | startRead(): void;
|
327 | |
328 |
|
329 |
|
330 | getPeer(): string;
|
331 | |
332 |
|
333 |
|
334 | getDeadline(): Deadline;
|
335 | }
|
336 |
|
337 | export class ServerInterceptingCall implements ServerInterceptingCallInterface {
|
338 | private responder: FullResponder;
|
339 | private processingMetadata = false;
|
340 | private processingMessage = false;
|
341 | private pendingMessage: any = null;
|
342 | private pendingMessageCallback: (() => void) | null = null;
|
343 | private pendingStatus: PartialStatusObject | null = null;
|
344 | constructor(
|
345 | private nextCall: ServerInterceptingCallInterface,
|
346 | responder?: Responder
|
347 | ) {
|
348 | this.responder = {
|
349 | start: responder?.start ?? defaultResponder.start,
|
350 | sendMetadata: responder?.sendMetadata ?? defaultResponder.sendMetadata,
|
351 | sendMessage: responder?.sendMessage ?? defaultResponder.sendMessage,
|
352 | sendStatus: responder?.sendStatus ?? defaultResponder.sendStatus,
|
353 | };
|
354 | }
|
355 |
|
356 | private processPendingMessage() {
|
357 | if (this.pendingMessageCallback) {
|
358 | this.nextCall.sendMessage(
|
359 | this.pendingMessage,
|
360 | this.pendingMessageCallback
|
361 | );
|
362 | this.pendingMessage = null;
|
363 | this.pendingMessageCallback = null;
|
364 | }
|
365 | }
|
366 |
|
367 | private processPendingStatus() {
|
368 | if (this.pendingStatus) {
|
369 | this.nextCall.sendStatus(this.pendingStatus);
|
370 | this.pendingStatus = null;
|
371 | }
|
372 | }
|
373 |
|
374 | start(listener: InterceptingServerListener): void {
|
375 | this.responder.start(interceptedListener => {
|
376 | const fullInterceptedListener: FullServerListener = {
|
377 | onReceiveMetadata:
|
378 | interceptedListener?.onReceiveMetadata ??
|
379 | defaultServerListener.onReceiveMetadata,
|
380 | onReceiveMessage:
|
381 | interceptedListener?.onReceiveMessage ??
|
382 | defaultServerListener.onReceiveMessage,
|
383 | onReceiveHalfClose:
|
384 | interceptedListener?.onReceiveHalfClose ??
|
385 | defaultServerListener.onReceiveHalfClose,
|
386 | onCancel:
|
387 | interceptedListener?.onCancel ?? defaultServerListener.onCancel,
|
388 | };
|
389 | const finalInterceptingListener = new InterceptingServerListenerImpl(
|
390 | fullInterceptedListener,
|
391 | listener
|
392 | );
|
393 | this.nextCall.start(finalInterceptingListener);
|
394 | });
|
395 | }
|
396 | sendMetadata(metadata: Metadata): void {
|
397 | this.processingMetadata = true;
|
398 | this.responder.sendMetadata(metadata, interceptedMetadata => {
|
399 | this.processingMetadata = false;
|
400 | this.nextCall.sendMetadata(interceptedMetadata);
|
401 | this.processPendingMessage();
|
402 | this.processPendingStatus();
|
403 | });
|
404 | }
|
405 | sendMessage(message: any, callback: () => void): void {
|
406 | this.processingMessage = true;
|
407 | this.responder.sendMessage(message, interceptedMessage => {
|
408 | this.processingMessage = false;
|
409 | if (this.processingMetadata) {
|
410 | this.pendingMessage = interceptedMessage;
|
411 | this.pendingMessageCallback = callback;
|
412 | } else {
|
413 | this.nextCall.sendMessage(interceptedMessage, callback);
|
414 | }
|
415 | });
|
416 | }
|
417 | sendStatus(status: PartialStatusObject): void {
|
418 | this.responder.sendStatus(status, interceptedStatus => {
|
419 | if (this.processingMetadata || this.processingMessage) {
|
420 | this.pendingStatus = interceptedStatus;
|
421 | } else {
|
422 | this.nextCall.sendStatus(interceptedStatus);
|
423 | }
|
424 | });
|
425 | }
|
426 | startRead(): void {
|
427 | this.nextCall.startRead();
|
428 | }
|
429 | getPeer(): string {
|
430 | return this.nextCall.getPeer();
|
431 | }
|
432 | getDeadline(): Deadline {
|
433 | return this.nextCall.getDeadline();
|
434 | }
|
435 | }
|
436 |
|
437 | export interface ServerInterceptor {
|
438 | (
|
439 | methodDescriptor: ServerMethodDefinition<any, any>,
|
440 | call: ServerInterceptingCallInterface
|
441 | ): ServerInterceptingCall;
|
442 | }
|
443 |
|
444 | interface DeadlineUnitIndexSignature {
|
445 | [name: string]: number;
|
446 | }
|
447 |
|
448 | const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
|
449 | const GRPC_ENCODING_HEADER = 'grpc-encoding';
|
450 | const GRPC_MESSAGE_HEADER = 'grpc-message';
|
451 | const GRPC_STATUS_HEADER = 'grpc-status';
|
452 | const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
|
453 | const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
|
454 | const deadlineUnitsToMs: DeadlineUnitIndexSignature = {
|
455 | H: 3600000,
|
456 | M: 60000,
|
457 | S: 1000,
|
458 | m: 1,
|
459 | u: 0.001,
|
460 | n: 0.000001,
|
461 | };
|
462 |
|
463 | const defaultCompressionHeaders = {
|
464 | // TODO(cjihrig): Remove these encoding headers from the default response
|
465 | // once compression is integrated.
|
466 | [GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
|
467 | [GRPC_ENCODING_HEADER]: 'identity',
|
468 | };
|
469 | const defaultResponseHeaders = {
|
470 | [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
|
471 | [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
|
472 | };
|
473 | const defaultResponseOptions = {
|
474 | waitForTrailers: true,
|
475 | } as http2.ServerStreamResponseOptions;
|
476 |
|
477 | type ReadQueueEntryType = 'COMPRESSED' | 'READABLE' | 'HALF_CLOSE';
|
478 |
|
479 | interface ReadQueueEntry {
|
480 | type: ReadQueueEntryType;
|
481 | compressedMessage: Buffer | null;
|
482 | parsedMessage: any;
|
483 | }
|
484 |
|
485 | export class BaseServerInterceptingCall
|
486 | implements ServerInterceptingCallInterface
|
487 | {
|
488 | private listener: InterceptingServerListener | null = null;
|
489 | private metadata: Metadata;
|
490 | private deadlineTimer: NodeJS.Timeout | null = null;
|
491 | private deadline: Deadline = Infinity;
|
492 | private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
|
493 | private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
|
494 | private cancelled = false;
|
495 | private metadataSent = false;
|
496 | private wantTrailers = false;
|
497 | private cancelNotified = false;
|
498 | private incomingEncoding = 'identity';
|
499 | private decoder = new StreamDecoder();
|
500 | private readQueue: ReadQueueEntry[] = [];
|
501 | private isReadPending = false;
|
502 | private receivedHalfClose = false;
|
503 | private streamEnded = false;
|
504 |
|
505 | constructor(
|
506 | private readonly stream: http2.ServerHttp2Stream,
|
507 | headers: http2.IncomingHttpHeaders,
|
508 | private readonly callEventTracker: CallEventTracker | null,
|
509 | private readonly handler: Handler<any, any>,
|
510 | options: ChannelOptions
|
511 | ) {
|
512 | this.stream.once('error', (err: ServerErrorResponse) => {
|
513 | |
514 |
|
515 |
|
516 |
|
517 |
|
518 | });
|
519 |
|
520 | this.stream.once('close', () => {
|
521 | trace(
|
522 | 'Request to method ' +
|
523 | this.handler?.path +
|
524 | ' stream closed with rstCode ' +
|
525 | this.stream.rstCode
|
526 | );
|
527 |
|
528 | if (this.callEventTracker && !this.streamEnded) {
|
529 | this.streamEnded = true;
|
530 | this.callEventTracker.onStreamEnd(false);
|
531 | this.callEventTracker.onCallEnd({
|
532 | code: Status.CANCELLED,
|
533 | details: 'Stream closed before sending status',
|
534 | metadata: null,
|
535 | });
|
536 | }
|
537 |
|
538 | this.notifyOnCancel();
|
539 | });
|
540 |
|
541 | this.stream.on('data', (data: Buffer) => {
|
542 | this.handleDataFrame(data);
|
543 | });
|
544 | this.stream.pause();
|
545 |
|
546 | this.stream.on('end', () => {
|
547 | this.handleEndEvent();
|
548 | });
|
549 |
|
550 | if ('grpc.max_send_message_length' in options) {
|
551 | this.maxSendMessageSize = options['grpc.max_send_message_length']!;
|
552 | }
|
553 | if ('grpc.max_receive_message_length' in options) {
|
554 | this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
|
555 | }
|
556 |
|
557 | const metadata = Metadata.fromHttp2Headers(headers);
|
558 |
|
559 | if (logging.isTracerEnabled(TRACER_NAME)) {
|
560 | trace(
|
561 | 'Request to ' +
|
562 | this.handler.path +
|
563 | ' received headers ' +
|
564 | JSON.stringify(metadata.toJSON())
|
565 | );
|
566 | }
|
567 |
|
568 | const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
|
569 |
|
570 | if (timeoutHeader.length > 0) {
|
571 | this.handleTimeoutHeader(timeoutHeader[0] as string);
|
572 | }
|
573 |
|
574 | const encodingHeader = metadata.get(GRPC_ENCODING_HEADER);
|
575 |
|
576 | if (encodingHeader.length > 0) {
|
577 | this.incomingEncoding = encodingHeader[0] as string;
|
578 | }
|
579 |
|
580 | // Remove several headers that should not be propagated to the application
|
581 | metadata.remove(GRPC_TIMEOUT_HEADER);
|
582 | metadata.remove(GRPC_ENCODING_HEADER);
|
583 | metadata.remove(GRPC_ACCEPT_ENCODING_HEADER);
|
584 | metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
|
585 | metadata.remove(http2.constants.HTTP2_HEADER_TE);
|
586 | metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
|
587 | this.metadata = metadata;
|
588 | }
|
589 |
|
590 | private handleTimeoutHeader(timeoutHeader: string) {
|
591 | const match = timeoutHeader.toString().match(DEADLINE_REGEX);
|
592 |
|
593 | if (match === null) {
|
594 | const status: PartialStatusObject = {
|
595 | code: Status.INTERNAL,
|
596 | details: `Invalid ${GRPC_TIMEOUT_HEADER} value "${timeoutHeader}"`,
|
597 | metadata: null,
|
598 | };
|
599 | // Wait for the constructor to complete before sending the error.
|
600 | process.nextTick(() => {
|
601 | this.sendStatus(status);
|
602 | });
|
603 | return;
|
604 | }
|
605 |
|
606 | const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
|
607 |
|
608 | const now = new Date();
|
609 | this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
|
610 | this.deadlineTimer = setTimeout(() => {
|
611 | const status: PartialStatusObject = {
|
612 | code: Status.DEADLINE_EXCEEDED,
|
613 | details: 'Deadline exceeded',
|
614 | metadata: null,
|
615 | };
|
616 | this.sendStatus(status);
|
617 | }, timeout);
|
618 | }
|
619 |
|
620 | private checkCancelled(): boolean {
|
621 | /* In some cases the stream can become destroyed before the close event
|
622 | * fires. That creates a race condition that this check works around */
|
623 | if (!this.cancelled && (this.stream.destroyed || this.stream.closed)) {
|
624 | this.notifyOnCancel();
|
625 | this.cancelled = true;
|
626 | }
|
627 | return this.cancelled;
|
628 | }
|
629 | private notifyOnCancel() {
|
630 | if (this.cancelNotified) {
|
631 | return;
|
632 | }
|
633 | this.cancelNotified = true;
|
634 | this.cancelled = true;
|
635 | process.nextTick(() => {
|
636 | this.listener?.onCancel();
|
637 | });
|
638 | if (this.deadlineTimer) {
|
639 | clearTimeout(this.deadlineTimer);
|
640 | }
|
641 | // Flush incoming data frames
|
642 | this.stream.resume();
|
643 | }
|
644 |
|
645 | /**
|
646 | * A server handler can start sending messages without explicitly sending
|
647 | * metadata. In that case, we need to send headers before sending any
|
648 | * messages. This function does that if necessary.
|
649 | */
|
650 | private maybeSendMetadata() {
|
651 | if (!this.metadataSent) {
|
652 | this.sendMetadata(new Metadata());
|
653 | }
|
654 | }
|
655 |
|
656 | /**
|
657 | * Serialize a message to a length-delimited byte string.
|
658 | * @param value
|
659 | * @returns
|
660 | */
|
661 | private serializeMessage(value: any) {
|
662 | const messageBuffer = this.handler.serialize(value);
|
663 | const byteLength = messageBuffer.byteLength;
|
664 | const output = Buffer.allocUnsafe(byteLength + 5);
|
665 | /* Note: response compression is currently not supported, so this
|
666 | * compressed bit is always 0. */
|
667 | output.writeUInt8(0, 0);
|
668 | output.writeUInt32BE(byteLength, 1);
|
669 | messageBuffer.copy(output, 5);
|
670 | return output;
|
671 | }
|
672 |
|
673 | private decompressMessage(
|
674 | message: Buffer,
|
675 | encoding: string
|
676 | ): Buffer | Promise<Buffer> {
|
677 | switch (encoding) {
|
678 | case 'deflate':
|
679 | return inflate(message.subarray(5));
|
680 | case 'gzip':
|
681 | return unzip(message.subarray(5));
|
682 | case 'identity':
|
683 | return message.subarray(5);
|
684 | default:
|
685 | return Promise.reject({
|
686 | code: Status.UNIMPLEMENTED,
|
687 | details: `Received message compressed with unsupported encoding "${encoding}"`,
|
688 | });
|
689 | }
|
690 | }
|
691 |
|
692 | private async decompressAndMaybePush(queueEntry: ReadQueueEntry) {
|
693 | if (queueEntry.type !== 'COMPRESSED') {
|
694 | throw new Error(`Invalid queue entry type: ${queueEntry.type}`);
|
695 | }
|
696 |
|
697 | const compressed = queueEntry.compressedMessage!.readUInt8(0) === 1;
|
698 | const compressedMessageEncoding = compressed
|
699 | ? this.incomingEncoding
|
700 | : 'identity';
|
701 | const decompressedMessage = await this.decompressMessage(
|
702 | queueEntry.compressedMessage!,
|
703 | compressedMessageEncoding
|
704 | );
|
705 | try {
|
706 | queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage);
|
707 | } catch (err) {
|
708 | this.sendStatus({
|
709 | code: Status.INTERNAL,
|
710 | details: `Error deserializing request: ${(err as Error).message}`,
|
711 | });
|
712 | return;
|
713 | }
|
714 | queueEntry.type = 'READABLE';
|
715 | this.maybePushNextMessage();
|
716 | }
|
717 |
|
718 | private maybePushNextMessage() {
|
719 | if (
|
720 | this.listener &&
|
721 | this.isReadPending &&
|
722 | this.readQueue.length > 0 &&
|
723 | this.readQueue[0].type !== 'COMPRESSED'
|
724 | ) {
|
725 | this.isReadPending = false;
|
726 | const nextQueueEntry = this.readQueue.shift()!;
|
727 | if (nextQueueEntry.type === 'READABLE') {
|
728 | this.listener.onReceiveMessage(nextQueueEntry.parsedMessage);
|
729 | } else {
|
730 | // nextQueueEntry.type === 'HALF_CLOSE'
|
731 | this.listener.onReceiveHalfClose();
|
732 | }
|
733 | }
|
734 | }
|
735 |
|
736 | private handleDataFrame(data: Buffer) {
|
737 | if (this.checkCancelled()) {
|
738 | return;
|
739 | }
|
740 | trace(
|
741 | 'Request to ' +
|
742 | this.handler.path +
|
743 | ' received data frame of size ' +
|
744 | data.length
|
745 | );
|
746 | const rawMessages = this.decoder.write(data);
|
747 |
|
748 | for (const messageBytes of rawMessages) {
|
749 | this.stream.pause();
|
750 | if (
|
751 | this.maxReceiveMessageSize !== -1 &&
|
752 | messageBytes.length - 5 > this.maxReceiveMessageSize
|
753 | ) {
|
754 | this.sendStatus({
|
755 | code: Status.RESOURCE_EXHAUSTED,
|
756 | details: `Received message larger than max (${
|
757 | messageBytes.length - 5
|
758 | } vs. ${this.maxReceiveMessageSize})`,
|
759 | metadata: null,
|
760 | });
|
761 | return;
|
762 | }
|
763 | const queueEntry: ReadQueueEntry = {
|
764 | type: 'COMPRESSED',
|
765 | compressedMessage: messageBytes,
|
766 | parsedMessage: null,
|
767 | };
|
768 | this.readQueue.push(queueEntry);
|
769 | this.decompressAndMaybePush(queueEntry);
|
770 | this.callEventTracker?.addMessageReceived();
|
771 | }
|
772 | }
|
773 | private handleEndEvent() {
|
774 | this.readQueue.push({
|
775 | type: 'HALF_CLOSE',
|
776 | compressedMessage: null,
|
777 | parsedMessage: null,
|
778 | });
|
779 | this.receivedHalfClose = true;
|
780 | this.maybePushNextMessage();
|
781 | }
|
782 | start(listener: InterceptingServerListener): void {
|
783 | trace('Request to ' + this.handler.path + ' start called');
|
784 | if (this.checkCancelled()) {
|
785 | return;
|
786 | }
|
787 | this.listener = listener;
|
788 | listener.onReceiveMetadata(this.metadata);
|
789 | }
|
790 | sendMetadata(metadata: Metadata): void {
|
791 | if (this.checkCancelled()) {
|
792 | return;
|
793 | }
|
794 |
|
795 | if (this.metadataSent) {
|
796 | return;
|
797 | }
|
798 |
|
799 | this.metadataSent = true;
|
800 | const custom = metadata ? metadata.toHttp2Headers() : null;
|
801 | const headers = {
|
802 | ...defaultResponseHeaders,
|
803 | ...defaultCompressionHeaders,
|
804 | ...custom,
|
805 | };
|
806 | this.stream.respond(headers, defaultResponseOptions);
|
807 | }
|
808 | sendMessage(message: any, callback: () => void): void {
|
809 | if (this.checkCancelled()) {
|
810 | return;
|
811 | }
|
812 | let response: Buffer;
|
813 | try {
|
814 | response = this.serializeMessage(message);
|
815 | } catch (e) {
|
816 | this.sendStatus({
|
817 | code: Status.INTERNAL,
|
818 | details: `Error serializing response: ${getErrorMessage(e)}`,
|
819 | metadata: null,
|
820 | });
|
821 | return;
|
822 | }
|
823 |
|
824 | if (
|
825 | this.maxSendMessageSize !== -1 &&
|
826 | response.length - 5 > this.maxSendMessageSize
|
827 | ) {
|
828 | this.sendStatus({
|
829 | code: Status.RESOURCE_EXHAUSTED,
|
830 | details: `Sent message larger than max (${response.length} vs. ${this.maxSendMessageSize})`,
|
831 | metadata: null,
|
832 | });
|
833 | return;
|
834 | }
|
835 | this.maybeSendMetadata();
|
836 | trace(
|
837 | 'Request to ' +
|
838 | this.handler.path +
|
839 | ' sent data frame of size ' +
|
840 | response.length
|
841 | );
|
842 | this.stream.write(response, error => {
|
843 | if (error) {
|
844 | this.sendStatus({
|
845 | code: Status.INTERNAL,
|
846 | details: `Error writing message: ${getErrorMessage(error)}`,
|
847 | metadata: null,
|
848 | });
|
849 | return;
|
850 | }
|
851 | this.callEventTracker?.addMessageSent();
|
852 | callback();
|
853 | });
|
854 | }
|
855 | sendStatus(status: PartialStatusObject): void {
|
856 | if (this.checkCancelled()) {
|
857 | return;
|
858 | }
|
859 |
|
860 | trace(
|
861 | 'Request to method ' +
|
862 | this.handler?.path +
|
863 | ' ended with status code: ' +
|
864 | Status[status.code] +
|
865 | ' details: ' +
|
866 | status.details
|
867 | );
|
868 |
|
869 | if (this.metadataSent) {
|
870 | if (!this.wantTrailers) {
|
871 | this.wantTrailers = true;
|
872 | this.stream.once('wantTrailers', () => {
|
873 | if (this.callEventTracker && !this.streamEnded) {
|
874 | this.streamEnded = true;
|
875 | this.callEventTracker.onStreamEnd(true);
|
876 | this.callEventTracker.onCallEnd(status);
|
877 | }
|
878 | const trailersToSend = {
|
879 | [GRPC_STATUS_HEADER]: status.code,
|
880 | [GRPC_MESSAGE_HEADER]: encodeURI(status.details),
|
881 | ...status.metadata?.toHttp2Headers(),
|
882 | };
|
883 |
|
884 | this.stream.sendTrailers(trailersToSend);
|
885 | this.notifyOnCancel();
|
886 | });
|
887 | this.stream.end();
|
888 | } else {
|
889 | this.notifyOnCancel();
|
890 | }
|
891 | } else {
|
892 | if (this.callEventTracker && !this.streamEnded) {
|
893 | this.streamEnded = true;
|
894 | this.callEventTracker.onStreamEnd(true);
|
895 | this.callEventTracker.onCallEnd(status);
|
896 | }
|
897 | // Trailers-only response
|
898 | const trailersToSend = {
|
899 | [GRPC_STATUS_HEADER]: status.code,
|
900 | [GRPC_MESSAGE_HEADER]: encodeURI(status.details),
|
901 | ...defaultResponseHeaders,
|
902 | ...status.metadata?.toHttp2Headers(),
|
903 | };
|
904 | this.stream.respond(trailersToSend, { endStream: true });
|
905 | this.notifyOnCancel();
|
906 | }
|
907 | }
|
908 | startRead(): void {
|
909 | trace('Request to ' + this.handler.path + ' startRead called');
|
910 | if (this.checkCancelled()) {
|
911 | return;
|
912 | }
|
913 | this.isReadPending = true;
|
914 | if (this.readQueue.length === 0) {
|
915 | if (!this.receivedHalfClose) {
|
916 | this.stream.resume();
|
917 | }
|
918 | } else {
|
919 | this.maybePushNextMessage();
|
920 | }
|
921 | }
|
922 | getPeer(): string {
|
923 | const socket = this.stream.session?.socket;
|
924 | if (socket?.remoteAddress) {
|
925 | if (socket.remotePort) {
|
926 | return `${socket.remoteAddress}:${socket.remotePort}`;
|
927 | } else {
|
928 | return socket.remoteAddress;
|
929 | }
|
930 | } else {
|
931 | return 'unknown';
|
932 | }
|
933 | }
|
934 | getDeadline(): Deadline {
|
935 | return this.deadline;
|
936 | }
|
937 | }
|
938 |
|
939 | export function getServerInterceptingCall(
|
940 | interceptors: ServerInterceptor[],
|
941 | stream: http2.ServerHttp2Stream,
|
942 | headers: http2.IncomingHttpHeaders,
|
943 | callEventTracker: CallEventTracker | null,
|
944 | handler: Handler<any, any>,
|
945 | options: ChannelOptions
|
946 | ) {
|
947 | const methodDefinition: ServerMethodDefinition<any, any> = {
|
948 | path: handler.path,
|
949 | requestStream: handler.type === 'clientStream' || handler.type === 'bidi',
|
950 | responseStream: handler.type === 'serverStream' || handler.type === 'bidi',
|
951 | requestDeserialize: handler.deserialize,
|
952 | responseSerialize: handler.serialize,
|
953 | };
|
954 | const baseCall = new BaseServerInterceptingCall(
|
955 | stream,
|
956 | headers,
|
957 | callEventTracker,
|
958 | handler,
|
959 | options
|
960 | );
|
961 | return interceptors.reduce(
|
962 | (call: ServerInterceptingCallInterface, interceptor: ServerInterceptor) => {
|
963 | return interceptor(methodDefinition, call);
|
964 | },
|
965 | baseCall
|
966 | );
|
967 | }
|
968 |
|
\ | No newline at end of file |