UNPKG

29.7 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as http2 from 'http2';
19import * as os from 'os';
20
21import { CallCredentials } from './call-credentials';
22import { Propagate, Status } from './constants';
23import { Filter, FilterFactory } from './filter';
24import { FilterStackFactory, FilterStack } from './filter-stack';
25import { Metadata } from './metadata';
26import { StreamDecoder } from './stream-decoder';
27import { ChannelImplementation } from './channel';
28import { SubchannelCallStatsTracker, Subchannel } from './subchannel';
29import * as logging from './logging';
30import { LogVerbosity } from './constants';
31import { ServerSurfaceCall } from './server-call';
32
33const TRACER_NAME = 'call_stream';
34
35const {
36 HTTP2_HEADER_STATUS,
37 HTTP2_HEADER_CONTENT_TYPE,
38 NGHTTP2_CANCEL,
39} = http2.constants;
40
41/**
42 * https://nodejs.org/api/errors.html#errors_class_systemerror
43 */
44interface SystemError extends Error {
45 address?: string;
46 code: string;
47 dest?: string;
48 errno: number;
49 info?: object;
50 message: string;
51 path?: string;
52 port?: number;
53 syscall: string;
54}
55
56/**
57 * Should do approximately the same thing as util.getSystemErrorName but the
58 * TypeScript types don't have that function for some reason so I just made my
59 * own.
60 * @param errno
61 */
62function getSystemErrorName(errno: number): string {
63 for (const [name, num] of Object.entries(os.constants.errno)) {
64 if (num === errno) {
65 return name;
66 }
67 }
68 return 'Unknown system error ' + errno;
69}
70
71export type Deadline = Date | number;
72
73function getMinDeadline(deadlineList: Deadline[]): Deadline {
74 let minValue = Infinity;
75 for (const deadline of deadlineList) {
76 const deadlineMsecs =
77 deadline instanceof Date ? deadline.getTime() : deadline;
78 if (deadlineMsecs < minValue) {
79 minValue = deadlineMsecs;
80 }
81 }
82 return minValue;
83}
84
85export interface CallStreamOptions {
86 deadline: Deadline;
87 flags: number;
88 host: string;
89 parentCall: ServerSurfaceCall | null;
90}
91
92export type PartialCallStreamOptions = Partial<CallStreamOptions>;
93
94export interface StatusObject {
95 code: Status;
96 details: string;
97 metadata: Metadata;
98}
99
100export type PartialStatusObject = Pick<StatusObject, 'code' | 'details'> & {
101 metadata: Metadata | null;
102}
103
104export const enum WriteFlags {
105 BufferHint = 1,
106 NoCompress = 2,
107 WriteThrough = 4,
108}
109
110export interface WriteObject {
111 message: Buffer;
112 flags?: number;
113}
114
115export interface MetadataListener {
116 (metadata: Metadata, next: (metadata: Metadata) => void): void;
117}
118
119export interface MessageListener {
120 // eslint-disable-next-line @typescript-eslint/no-explicit-any
121 (message: any, next: (message: any) => void): void;
122}
123
124export interface StatusListener {
125 (status: StatusObject, next: (status: StatusObject) => void): void;
126}
127
128export interface FullListener {
129 onReceiveMetadata: MetadataListener;
130 onReceiveMessage: MessageListener;
131 onReceiveStatus: StatusListener;
132}
133
134export type Listener = Partial<FullListener>;
135
136/**
137 * An object with methods for handling the responses to a call.
138 */
139export interface InterceptingListener {
140 onReceiveMetadata(metadata: Metadata): void;
141 // eslint-disable-next-line @typescript-eslint/no-explicit-any
142 onReceiveMessage(message: any): void;
143 onReceiveStatus(status: StatusObject): void;
144}
145
146export function isInterceptingListener(
147 listener: Listener | InterceptingListener
148): listener is InterceptingListener {
149 return (
150 listener.onReceiveMetadata !== undefined &&
151 listener.onReceiveMetadata.length === 1
152 );
153}
154
155export class InterceptingListenerImpl implements InterceptingListener {
156 private processingMetadata = false;
157 private hasPendingMessage = false;
158 private pendingMessage: any;
159 private processingMessage = false;
160 private pendingStatus: StatusObject | null = null;
161 constructor(
162 private listener: FullListener,
163 private nextListener: InterceptingListener
164 ) {}
165
166 private processPendingMessage() {
167 if (this.hasPendingMessage) {
168 this.nextListener.onReceiveMessage(this.pendingMessage);
169 this.pendingMessage = null;
170 this.hasPendingMessage = false;
171 }
172 }
173
174 private processPendingStatus() {
175 if (this.pendingStatus) {
176 this.nextListener.onReceiveStatus(this.pendingStatus);
177 }
178 }
179
180 onReceiveMetadata(metadata: Metadata): void {
181 this.processingMetadata = true;
182 this.listener.onReceiveMetadata(metadata, (metadata) => {
183 this.processingMetadata = false;
184 this.nextListener.onReceiveMetadata(metadata);
185 this.processPendingMessage();
186 this.processPendingStatus();
187 });
188 }
189 // eslint-disable-next-line @typescript-eslint/no-explicit-any
190 onReceiveMessage(message: any): void {
191 /* If this listener processes messages asynchronously, the last message may
192 * be reordered with respect to the status */
193 this.processingMessage = true;
194 this.listener.onReceiveMessage(message, (msg) => {
195 this.processingMessage = false;
196 if (this.processingMetadata) {
197 this.pendingMessage = msg;
198 this.hasPendingMessage = true;
199 } else {
200 this.nextListener.onReceiveMessage(msg);
201 this.processPendingStatus();
202 }
203 });
204 }
205 onReceiveStatus(status: StatusObject): void {
206 this.listener.onReceiveStatus(status, (processedStatus) => {
207 if (this.processingMetadata || this.processingMessage) {
208 this.pendingStatus = processedStatus;
209 } else {
210 this.nextListener.onReceiveStatus(processedStatus);
211 }
212 });
213 }
214}
215
216export interface WriteCallback {
217 (error?: Error | null): void;
218}
219
220export interface MessageContext {
221 callback?: WriteCallback;
222 flags?: number;
223}
224
225export interface Call {
226 cancelWithStatus(status: Status, details: string): void;
227 getPeer(): string;
228 start(metadata: Metadata, listener: InterceptingListener): void;
229 sendMessageWithContext(context: MessageContext, message: Buffer): void;
230 startRead(): void;
231 halfClose(): void;
232
233 getDeadline(): Deadline;
234 getCredentials(): CallCredentials;
235 setCredentials(credentials: CallCredentials): void;
236 getMethod(): string;
237 getHost(): string;
238}
239
240export class Http2CallStream implements Call {
241 credentials: CallCredentials;
242 filterStack: FilterStack;
243 private http2Stream: http2.ClientHttp2Stream | null = null;
244 private pendingRead = false;
245 private isWriteFilterPending = false;
246 private pendingWrite: Buffer | null = null;
247 private pendingWriteCallback: WriteCallback | null = null;
248 private writesClosed = false;
249
250 private decoder = new StreamDecoder();
251
252 private isReadFilterPending = false;
253 private canPush = false;
254 /**
255 * Indicates that an 'end' event has come from the http2 stream, so there
256 * will be no more data events.
257 */
258 private readsClosed = false;
259
260 private statusOutput = false;
261
262 private unpushedReadMessages: Buffer[] = [];
263 private unfilteredReadMessages: Buffer[] = [];
264
265 // Status code mapped from :status. To be used if grpc-status is not received
266 private mappedStatusCode: Status = Status.UNKNOWN;
267
268 // This is populated (non-null) if and only if the call has ended
269 private finalStatus: StatusObject | null = null;
270
271 private subchannel: Subchannel | null = null;
272 private disconnectListener: () => void;
273
274 private listener: InterceptingListener | null = null;
275
276 private internalError: SystemError | null = null;
277
278 private configDeadline: Deadline = Infinity;
279
280 private statusWatchers: ((status: StatusObject) => void)[] = [];
281 private streamEndWatchers: ((success: boolean) => void)[] = [];
282
283 private callStatsTracker: SubchannelCallStatsTracker | null = null;
284
285 constructor(
286 private readonly methodName: string,
287 private readonly channel: ChannelImplementation,
288 private readonly options: CallStreamOptions,
289 filterStackFactory: FilterStackFactory,
290 private readonly channelCallCredentials: CallCredentials,
291 private readonly callNumber: number
292 ) {
293 this.filterStack = filterStackFactory.createFilter(this);
294 this.credentials = channelCallCredentials;
295 this.disconnectListener = () => {
296 this.endCall({
297 code: Status.UNAVAILABLE,
298 details: 'Connection dropped',
299 metadata: new Metadata(),
300 });
301 };
302 if (
303 this.options.parentCall &&
304 this.options.flags & Propagate.CANCELLATION
305 ) {
306 this.options.parentCall.on('cancelled', () => {
307 this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
308 });
309 }
310 }
311
312 private outputStatus() {
313 /* Precondition: this.finalStatus !== null */
314 if (this.listener && !this.statusOutput) {
315 this.statusOutput = true;
316 const filteredStatus = this.filterStack.receiveTrailers(
317 this.finalStatus!
318 );
319 this.trace(
320 'ended with status: code=' +
321 filteredStatus.code +
322 ' details="' +
323 filteredStatus.details +
324 '"'
325 );
326 this.statusWatchers.forEach(watcher => watcher(filteredStatus));
327 /* We delay the actual action of bubbling up the status to insulate the
328 * cleanup code in this class from any errors that may be thrown in the
329 * upper layers as a result of bubbling up the status. In particular,
330 * if the status is not OK, the "error" event may be emitted
331 * synchronously at the top level, which will result in a thrown error if
332 * the user does not handle that event. */
333 process.nextTick(() => {
334 this.listener?.onReceiveStatus(filteredStatus);
335 });
336 /* Leave the http2 stream in flowing state to drain incoming messages, to
337 * ensure that the stream closure completes. The call stream already does
338 * not push more messages after the status is output, so the messages go
339 * nowhere either way. */
340 this.http2Stream?.resume();
341 if (this.subchannel) {
342 this.subchannel.callUnref();
343 this.subchannel.removeDisconnectListener(this.disconnectListener);
344 }
345 }
346 }
347
348 private trace(text: string): void {
349 logging.trace(
350 LogVerbosity.DEBUG,
351 TRACER_NAME,
352 '[' + this.callNumber + '] ' + text
353 );
354 }
355
356 /**
357 * On first call, emits a 'status' event with the given StatusObject.
358 * Subsequent calls are no-ops.
359 * @param status The status of the call.
360 */
361 private endCall(status: StatusObject): void {
362 /* If the status is OK and a new status comes in (e.g. from a
363 * deserialization failure), that new status takes priority */
364 if (this.finalStatus === null || this.finalStatus.code === Status.OK) {
365 this.finalStatus = status;
366 this.maybeOutputStatus();
367 }
368 this.destroyHttp2Stream();
369 }
370
371 private maybeOutputStatus() {
372 if (this.finalStatus !== null) {
373 /* The combination check of readsClosed and that the two message buffer
374 * arrays are empty checks that there all incoming data has been fully
375 * processed */
376 if (
377 this.finalStatus.code !== Status.OK ||
378 (this.readsClosed &&
379 this.unpushedReadMessages.length === 0 &&
380 this.unfilteredReadMessages.length === 0 &&
381 !this.isReadFilterPending)
382 ) {
383 this.outputStatus();
384 }
385 }
386 }
387
388 private push(message: Buffer): void {
389 this.trace(
390 'pushing to reader message of length ' +
391 (message instanceof Buffer ? message.length : null)
392 );
393 this.canPush = false;
394 process.nextTick(() => {
395 /* If we have already output the status any later messages should be
396 * ignored, and can cause out-of-order operation errors higher up in the
397 * stack. Checking as late as possible here to avoid any race conditions.
398 */
399 if (this.statusOutput) {
400 return;
401 }
402 this.listener?.onReceiveMessage(message);
403 this.maybeOutputStatus();
404 });
405 }
406
407 private handleFilterError(error: Error) {
408 this.cancelWithStatus(Status.INTERNAL, error.message);
409 }
410
411 private handleFilteredRead(message: Buffer) {
412 /* If we the call has already ended with an error, we don't want to do
413 * anything with this message. Dropping it on the floor is correct
414 * behavior */
415 if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) {
416 this.maybeOutputStatus();
417 return;
418 }
419 this.isReadFilterPending = false;
420 if (this.canPush) {
421 this.http2Stream!.pause();
422 this.push(message);
423 } else {
424 this.trace(
425 'unpushedReadMessages.push message of length ' + message.length
426 );
427 this.unpushedReadMessages.push(message);
428 }
429 if (this.unfilteredReadMessages.length > 0) {
430 /* nextMessage is guaranteed not to be undefined because
431 unfilteredReadMessages is non-empty */
432 const nextMessage = this.unfilteredReadMessages.shift()!;
433 this.filterReceivedMessage(nextMessage);
434 }
435 }
436
437 private filterReceivedMessage(framedMessage: Buffer) {
438 /* If we the call has already ended with an error, we don't want to do
439 * anything with this message. Dropping it on the floor is correct
440 * behavior */
441 if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) {
442 this.maybeOutputStatus();
443 return;
444 }
445 this.trace('filterReceivedMessage of length ' + framedMessage.length);
446 this.isReadFilterPending = true;
447 this.filterStack
448 .receiveMessage(Promise.resolve(framedMessage))
449 .then(
450 this.handleFilteredRead.bind(this),
451 this.handleFilterError.bind(this)
452 );
453 }
454
455 private tryPush(messageBytes: Buffer): void {
456 if (this.isReadFilterPending) {
457 this.trace(
458 'unfilteredReadMessages.push message of length ' +
459 (messageBytes && messageBytes.length)
460 );
461 this.unfilteredReadMessages.push(messageBytes);
462 } else {
463 this.filterReceivedMessage(messageBytes);
464 }
465 }
466
467 private handleTrailers(headers: http2.IncomingHttpHeaders) {
468 this.streamEndWatchers.forEach(watcher => watcher(true));
469 let headersString = '';
470 for (const header of Object.keys(headers)) {
471 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
472 }
473 this.trace('Received server trailers:\n' + headersString);
474 let metadata: Metadata;
475 try {
476 metadata = Metadata.fromHttp2Headers(headers);
477 } catch (e) {
478 metadata = new Metadata();
479 }
480 const metadataMap = metadata.getMap();
481 let code: Status = this.mappedStatusCode;
482 if (
483 code === Status.UNKNOWN &&
484 typeof metadataMap['grpc-status'] === 'string'
485 ) {
486 const receivedStatus = Number(metadataMap['grpc-status']);
487 if (receivedStatus in Status) {
488 code = receivedStatus;
489 this.trace('received status code ' + receivedStatus + ' from server');
490 }
491 metadata.remove('grpc-status');
492 }
493 let details = '';
494 if (typeof metadataMap['grpc-message'] === 'string') {
495 try {
496 details = decodeURI(metadataMap['grpc-message']);
497 } catch (e) {
498 details = metadataMap['grpc-message'];
499 }
500 metadata.remove('grpc-message');
501 this.trace(
502 'received status details string "' + details + '" from server'
503 );
504 }
505 const status: StatusObject = { code, details, metadata };
506 // This is a no-op if the call was already ended when handling headers.
507 this.endCall(status);
508 }
509
510 private writeMessageToStream(message: Buffer, callback: WriteCallback) {
511 this.callStatsTracker?.addMessageSent();
512 this.http2Stream!.write(message, callback);
513 }
514
515 attachHttp2Stream(
516 stream: http2.ClientHttp2Stream,
517 subchannel: Subchannel,
518 extraFilters: Filter[],
519 callStatsTracker: SubchannelCallStatsTracker
520 ): void {
521 this.filterStack.push(extraFilters);
522 if (this.finalStatus !== null) {
523 stream.close(NGHTTP2_CANCEL);
524 } else {
525 this.trace(
526 'attachHttp2Stream from subchannel ' + subchannel.getAddress()
527 );
528 this.http2Stream = stream;
529 this.subchannel = subchannel;
530 this.callStatsTracker = callStatsTracker;
531 subchannel.addDisconnectListener(this.disconnectListener);
532 subchannel.callRef();
533 stream.on('response', (headers, flags) => {
534 let headersString = '';
535 for (const header of Object.keys(headers)) {
536 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
537 }
538 this.trace('Received server headers:\n' + headersString);
539 switch (headers[':status']) {
540 // TODO(murgatroid99): handle 100 and 101
541 case 400:
542 this.mappedStatusCode = Status.INTERNAL;
543 break;
544 case 401:
545 this.mappedStatusCode = Status.UNAUTHENTICATED;
546 break;
547 case 403:
548 this.mappedStatusCode = Status.PERMISSION_DENIED;
549 break;
550 case 404:
551 this.mappedStatusCode = Status.UNIMPLEMENTED;
552 break;
553 case 429:
554 case 502:
555 case 503:
556 case 504:
557 this.mappedStatusCode = Status.UNAVAILABLE;
558 break;
559 default:
560 this.mappedStatusCode = Status.UNKNOWN;
561 }
562
563 if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
564 this.handleTrailers(headers);
565 } else {
566 let metadata: Metadata;
567 try {
568 metadata = Metadata.fromHttp2Headers(headers);
569 } catch (error) {
570 this.endCall({
571 code: Status.UNKNOWN,
572 details: error.message,
573 metadata: new Metadata(),
574 });
575 return;
576 }
577 try {
578 const finalMetadata = this.filterStack.receiveMetadata(metadata);
579 this.listener?.onReceiveMetadata(finalMetadata);
580 } catch (error) {
581 this.endCall({
582 code: Status.UNKNOWN,
583 details: error.message,
584 metadata: new Metadata(),
585 });
586 }
587 }
588 });
589 stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
590 this.handleTrailers(headers);
591 });
592 stream.on('data', (data: Buffer) => {
593 /* If the status has already been output, allow the http2 stream to
594 * drain without processing the data. */
595 if (this.statusOutput) {
596 return;
597 }
598 this.trace('receive HTTP/2 data frame of length ' + data.length);
599 const messages = this.decoder.write(data);
600
601 for (const message of messages) {
602 this.trace('parsed message of length ' + message.length);
603 this.callStatsTracker!.addMessageReceived();
604 this.tryPush(message);
605 }
606 });
607 stream.on('end', () => {
608 this.readsClosed = true;
609 this.maybeOutputStatus();
610 });
611 stream.on('close', () => {
612 /* Use process.next tick to ensure that this code happens after any
613 * "error" event that may be emitted at about the same time, so that
614 * we can bubble up the error message from that event. */
615 process.nextTick(() => {
616 this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
617 /* If we have a final status with an OK status code, that means that
618 * we have received all of the messages and we have processed the
619 * trailers and the call completed successfully, so it doesn't matter
620 * how the stream ends after that */
621 if (this.finalStatus?.code === Status.OK) {
622 return;
623 }
624 let code: Status;
625 let details = '';
626 switch (stream.rstCode) {
627 case http2.constants.NGHTTP2_NO_ERROR:
628 /* If we get a NO_ERROR code and we already have a status, the
629 * stream completed properly and we just haven't fully processed
630 * it yet */
631 if (this.finalStatus !== null) {
632 return;
633 }
634 code = Status.INTERNAL;
635 details = `Received RST_STREAM with code ${stream.rstCode}`;
636 break;
637 case http2.constants.NGHTTP2_REFUSED_STREAM:
638 code = Status.UNAVAILABLE;
639 details = 'Stream refused by server';
640 break;
641 case http2.constants.NGHTTP2_CANCEL:
642 code = Status.CANCELLED;
643 details = 'Call cancelled';
644 break;
645 case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
646 code = Status.RESOURCE_EXHAUSTED;
647 details = 'Bandwidth exhausted or memory limit exceeded';
648 break;
649 case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
650 code = Status.PERMISSION_DENIED;
651 details = 'Protocol not secure enough';
652 break;
653 case http2.constants.NGHTTP2_INTERNAL_ERROR:
654 code = Status.INTERNAL;
655 if (this.internalError === null) {
656 /* This error code was previously handled in the default case, and
657 * there are several instances of it online, so I wanted to
658 * preserve the original error message so that people find existing
659 * information in searches, but also include the more recognizable
660 * "Internal server error" message. */
661 details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
662 } else {
663 if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') {
664 code = Status.UNAVAILABLE;
665 details = this.internalError.message;
666 } else {
667 /* The "Received RST_STREAM with code ..." error is preserved
668 * here for continuity with errors reported online, but the
669 * error message at the end will probably be more relevant in
670 * most cases. */
671 details = `Received RST_STREAM with code ${stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
672 }
673 }
674 break;
675 default:
676 code = Status.INTERNAL;
677 details = `Received RST_STREAM with code ${stream.rstCode}`;
678 }
679 // This is a no-op if trailers were received at all.
680 // This is OK, because status codes emitted here correspond to more
681 // catastrophic issues that prevent us from receiving trailers in the
682 // first place.
683 this.endCall({ code, details, metadata: new Metadata() });
684 });
685 });
686 stream.on('error', (err: SystemError) => {
687 /* We need an error handler here to stop "Uncaught Error" exceptions
688 * from bubbling up. However, errors here should all correspond to
689 * "close" events, where we will handle the error more granularly */
690 /* Specifically looking for stream errors that were *not* constructed
691 * from a RST_STREAM response here:
692 * https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
693 */
694 if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
695 this.trace(
696 'Node error event: message=' +
697 err.message +
698 ' code=' +
699 err.code +
700 ' errno=' +
701 getSystemErrorName(err.errno) +
702 ' syscall=' +
703 err.syscall
704 );
705 this.internalError = err;
706 }
707 this.streamEndWatchers.forEach(watcher => watcher(false));
708 });
709 if (this.pendingWrite) {
710 if (!this.pendingWriteCallback) {
711 throw new Error('Invalid state in write handling code');
712 }
713 this.trace(
714 'sending data chunk of length ' +
715 this.pendingWrite.length +
716 ' (deferred)'
717 );
718 try {
719 this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback);
720 } catch (error) {
721 this.endCall({
722 code: Status.UNAVAILABLE,
723 details: `Write failed with error ${error.message}`,
724 metadata: new Metadata()
725 });
726 }
727 }
728 this.maybeCloseWrites();
729 }
730 }
731
732 start(metadata: Metadata, listener: InterceptingListener) {
733 this.trace('Sending metadata');
734 this.listener = listener;
735 this.channel._startCallStream(this, metadata);
736 this.maybeOutputStatus();
737 }
738
739 private destroyHttp2Stream() {
740 // The http2 stream could already have been destroyed if cancelWithStatus
741 // is called in response to an internal http2 error.
742 if (this.http2Stream !== null && !this.http2Stream.destroyed) {
743 /* If the call has ended with an OK status, communicate that when closing
744 * the stream, partly to avoid a situation in which we detect an error
745 * RST_STREAM as a result after we have the status */
746 let code: number;
747 if (this.finalStatus?.code === Status.OK) {
748 code = http2.constants.NGHTTP2_NO_ERROR;
749 } else {
750 code = http2.constants.NGHTTP2_CANCEL;
751 }
752 this.trace('close http2 stream with code ' + code);
753 this.http2Stream.close(code);
754 }
755 }
756
757 cancelWithStatus(status: Status, details: string): void {
758 this.trace(
759 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
760 );
761 this.endCall({ code: status, details, metadata: new Metadata() });
762 }
763
764 getDeadline(): Deadline {
765 const deadlineList = [this.options.deadline];
766 if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) {
767 deadlineList.push(this.options.parentCall.getDeadline());
768 }
769 if (this.configDeadline) {
770 deadlineList.push(this.configDeadline);
771 }
772 return getMinDeadline(deadlineList);
773 }
774
775 getCredentials(): CallCredentials {
776 return this.credentials;
777 }
778
779 setCredentials(credentials: CallCredentials): void {
780 this.credentials = this.channelCallCredentials.compose(credentials);
781 }
782
783 getStatus(): StatusObject | null {
784 return this.finalStatus;
785 }
786
787 getPeer(): string {
788 return this.subchannel?.getAddress() ?? this.channel.getTarget();
789 }
790
791 getMethod(): string {
792 return this.methodName;
793 }
794
795 getHost(): string {
796 return this.options.host;
797 }
798
799 setConfigDeadline(configDeadline: Deadline) {
800 this.configDeadline = configDeadline;
801 }
802
803 addStatusWatcher(watcher: (status: StatusObject) => void) {
804 this.statusWatchers.push(watcher);
805 }
806
807 addStreamEndWatcher(watcher: (success: boolean) => void) {
808 this.streamEndWatchers.push(watcher);
809 }
810
811 addFilters(extraFilters: Filter[]) {
812 this.filterStack.push(extraFilters);
813 }
814
815 getCallNumber() {
816 return this.callNumber;
817 }
818
819 startRead() {
820 /* If the stream has ended with an error, we should not emit any more
821 * messages and we should communicate that the stream has ended */
822 if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) {
823 this.readsClosed = true;
824 this.maybeOutputStatus();
825 return;
826 }
827 this.canPush = true;
828 if (this.http2Stream === null) {
829 this.pendingRead = true;
830 } else {
831 if (this.unpushedReadMessages.length > 0) {
832 const nextMessage: Buffer = this.unpushedReadMessages.shift()!;
833 this.push(nextMessage);
834 return;
835 }
836 /* Only resume reading from the http2Stream if we don't have any pending
837 * messages to emit */
838 this.http2Stream.resume();
839 }
840 }
841
842 private maybeCloseWrites() {
843 if (
844 this.writesClosed &&
845 !this.isWriteFilterPending &&
846 this.http2Stream !== null
847 ) {
848 this.trace('calling end() on HTTP/2 stream');
849 this.http2Stream.end();
850 }
851 }
852
853 sendMessageWithContext(context: MessageContext, message: Buffer) {
854 this.trace('write() called with message of length ' + message.length);
855 const writeObj: WriteObject = {
856 message,
857 flags: context.flags,
858 };
859 const cb: WriteCallback = (error?: Error | null) => {
860 let code: Status = Status.UNAVAILABLE;
861 if ((error as NodeJS.ErrnoException)?.code === 'ERR_STREAM_WRITE_AFTER_END') {
862 code = Status.INTERNAL;
863 }
864 if (error) {
865 this.cancelWithStatus(code, `Write error: ${error.message}`);
866 }
867 context.callback?.();
868 };
869 this.isWriteFilterPending = true;
870 this.filterStack.sendMessage(Promise.resolve(writeObj)).then((message) => {
871 this.isWriteFilterPending = false;
872 if (this.http2Stream === null) {
873 this.trace(
874 'deferring writing data chunk of length ' + message.message.length
875 );
876 this.pendingWrite = message.message;
877 this.pendingWriteCallback = cb;
878 } else {
879 this.trace('sending data chunk of length ' + message.message.length);
880 try {
881 this.writeMessageToStream(message.message, cb);
882 } catch (error) {
883 this.endCall({
884 code: Status.UNAVAILABLE,
885 details: `Write failed with error ${error.message}`,
886 metadata: new Metadata()
887 });
888 }
889 this.maybeCloseWrites();
890 }
891 }, this.handleFilterError.bind(this));
892 }
893
894 halfClose() {
895 this.trace('end() called');
896 this.writesClosed = true;
897 this.maybeCloseWrites();
898 }
899}