1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import * as http2 from 'http2';
|
19 | import * as os from 'os';
|
20 |
|
21 | import { CallCredentials } from './call-credentials';
|
22 | import { Propagate, Status } from './constants';
|
23 | import { Filter, FilterFactory } from './filter';
|
24 | import { FilterStackFactory, FilterStack } from './filter-stack';
|
25 | import { Metadata } from './metadata';
|
26 | import { StreamDecoder } from './stream-decoder';
|
27 | import { ChannelImplementation } from './channel';
|
28 | import { SubchannelCallStatsTracker, Subchannel } from './subchannel';
|
29 | import * as logging from './logging';
|
30 | import { LogVerbosity } from './constants';
|
31 | import { ServerSurfaceCall } from './server-call';
|
32 |
|
33 | const TRACER_NAME = 'call_stream';
|
34 |
|
35 | const {
|
36 | HTTP2_HEADER_STATUS,
|
37 | HTTP2_HEADER_CONTENT_TYPE,
|
38 | NGHTTP2_CANCEL,
|
39 | } = http2.constants;
|
40 |
|
41 |
|
42 |
|
43 |
|
44 | interface SystemError extends Error {
|
45 | address?: string;
|
46 | code: string;
|
47 | dest?: string;
|
48 | errno: number;
|
49 | info?: object;
|
50 | message: string;
|
51 | path?: string;
|
52 | port?: number;
|
53 | syscall: string;
|
54 | }
|
55 |
|
56 |
|
57 |
|
58 |
|
59 |
|
60 |
|
61 |
|
62 | function getSystemErrorName(errno: number): string {
|
63 | for (const [name, num] of Object.entries(os.constants.errno)) {
|
64 | if (num === errno) {
|
65 | return name;
|
66 | }
|
67 | }
|
68 | return 'Unknown system error ' + errno;
|
69 | }
|
70 |
|
71 | export type Deadline = Date | number;
|
72 |
|
73 | function getMinDeadline(deadlineList: Deadline[]): Deadline {
|
74 | let minValue = Infinity;
|
75 | for (const deadline of deadlineList) {
|
76 | const deadlineMsecs =
|
77 | deadline instanceof Date ? deadline.getTime() : deadline;
|
78 | if (deadlineMsecs < minValue) {
|
79 | minValue = deadlineMsecs;
|
80 | }
|
81 | }
|
82 | return minValue;
|
83 | }
|
84 |
|
85 | export interface CallStreamOptions {
|
86 | deadline: Deadline;
|
87 | flags: number;
|
88 | host: string;
|
89 | parentCall: ServerSurfaceCall | null;
|
90 | }
|
91 |
|
92 | export type PartialCallStreamOptions = Partial<CallStreamOptions>;
|
93 |
|
94 | export interface StatusObject {
|
95 | code: Status;
|
96 | details: string;
|
97 | metadata: Metadata;
|
98 | }
|
99 |
|
100 | export type PartialStatusObject = Pick<StatusObject, 'code' | 'details'> & {
|
101 | metadata: Metadata | null;
|
102 | }
|
103 |
|
104 | export const enum WriteFlags {
|
105 | BufferHint = 1,
|
106 | NoCompress = 2,
|
107 | WriteThrough = 4,
|
108 | }
|
109 |
|
110 | export interface WriteObject {
|
111 | message: Buffer;
|
112 | flags?: number;
|
113 | }
|
114 |
|
115 | export interface MetadataListener {
|
116 | (metadata: Metadata, next: (metadata: Metadata) => void): void;
|
117 | }
|
118 |
|
119 | export interface MessageListener {
|
120 |
|
121 | (message: any, next: (message: any) => void): void;
|
122 | }
|
123 |
|
124 | export interface StatusListener {
|
125 | (status: StatusObject, next: (status: StatusObject) => void): void;
|
126 | }
|
127 |
|
128 | export interface FullListener {
|
129 | onReceiveMetadata: MetadataListener;
|
130 | onReceiveMessage: MessageListener;
|
131 | onReceiveStatus: StatusListener;
|
132 | }
|
133 |
|
134 | export type Listener = Partial<FullListener>;
|
135 |
|
136 |
|
137 |
|
138 |
|
139 | export interface InterceptingListener {
|
140 | onReceiveMetadata(metadata: Metadata): void;
|
141 |
|
142 | onReceiveMessage(message: any): void;
|
143 | onReceiveStatus(status: StatusObject): void;
|
144 | }
|
145 |
|
146 | export function isInterceptingListener(
|
147 | listener: Listener | InterceptingListener
|
148 | ): listener is InterceptingListener {
|
149 | return (
|
150 | listener.onReceiveMetadata !== undefined &&
|
151 | listener.onReceiveMetadata.length === 1
|
152 | );
|
153 | }
|
154 |
|
155 | export class InterceptingListenerImpl implements InterceptingListener {
|
156 | private processingMetadata = false;
|
157 | private hasPendingMessage = false;
|
158 | private pendingMessage: any;
|
159 | private processingMessage = false;
|
160 | private pendingStatus: StatusObject | null = null;
|
161 | constructor(
|
162 | private listener: FullListener,
|
163 | private nextListener: InterceptingListener
|
164 | ) {}
|
165 |
|
166 | private processPendingMessage() {
|
167 | if (this.hasPendingMessage) {
|
168 | this.nextListener.onReceiveMessage(this.pendingMessage);
|
169 | this.pendingMessage = null;
|
170 | this.hasPendingMessage = false;
|
171 | }
|
172 | }
|
173 |
|
174 | private processPendingStatus() {
|
175 | if (this.pendingStatus) {
|
176 | this.nextListener.onReceiveStatus(this.pendingStatus);
|
177 | }
|
178 | }
|
179 |
|
180 | onReceiveMetadata(metadata: Metadata): void {
|
181 | this.processingMetadata = true;
|
182 | this.listener.onReceiveMetadata(metadata, (metadata) => {
|
183 | this.processingMetadata = false;
|
184 | this.nextListener.onReceiveMetadata(metadata);
|
185 | this.processPendingMessage();
|
186 | this.processPendingStatus();
|
187 | });
|
188 | }
|
189 |
|
190 | onReceiveMessage(message: any): void {
|
191 | |
192 |
|
193 | this.processingMessage = true;
|
194 | this.listener.onReceiveMessage(message, (msg) => {
|
195 | this.processingMessage = false;
|
196 | if (this.processingMetadata) {
|
197 | this.pendingMessage = msg;
|
198 | this.hasPendingMessage = true;
|
199 | } else {
|
200 | this.nextListener.onReceiveMessage(msg);
|
201 | this.processPendingStatus();
|
202 | }
|
203 | });
|
204 | }
|
205 | onReceiveStatus(status: StatusObject): void {
|
206 | this.listener.onReceiveStatus(status, (processedStatus) => {
|
207 | if (this.processingMetadata || this.processingMessage) {
|
208 | this.pendingStatus = processedStatus;
|
209 | } else {
|
210 | this.nextListener.onReceiveStatus(processedStatus);
|
211 | }
|
212 | });
|
213 | }
|
214 | }
|
215 |
|
216 | export interface WriteCallback {
|
217 | (error?: Error | null): void;
|
218 | }
|
219 |
|
220 | export interface MessageContext {
|
221 | callback?: WriteCallback;
|
222 | flags?: number;
|
223 | }
|
224 |
|
225 | export interface Call {
|
226 | cancelWithStatus(status: Status, details: string): void;
|
227 | getPeer(): string;
|
228 | start(metadata: Metadata, listener: InterceptingListener): void;
|
229 | sendMessageWithContext(context: MessageContext, message: Buffer): void;
|
230 | startRead(): void;
|
231 | halfClose(): void;
|
232 |
|
233 | getDeadline(): Deadline;
|
234 | getCredentials(): CallCredentials;
|
235 | setCredentials(credentials: CallCredentials): void;
|
236 | getMethod(): string;
|
237 | getHost(): string;
|
238 | }
|
239 |
|
240 | export class Http2CallStream implements Call {
|
241 | credentials: CallCredentials;
|
242 | filterStack: FilterStack;
|
243 | private http2Stream: http2.ClientHttp2Stream | null = null;
|
244 | private pendingRead = false;
|
245 | private isWriteFilterPending = false;
|
246 | private pendingWrite: Buffer | null = null;
|
247 | private pendingWriteCallback: WriteCallback | null = null;
|
248 | private writesClosed = false;
|
249 |
|
250 | private decoder = new StreamDecoder();
|
251 |
|
252 | private isReadFilterPending = false;
|
253 | private canPush = false;
|
254 | |
255 |
|
256 |
|
257 |
|
258 | private readsClosed = false;
|
259 |
|
260 | private statusOutput = false;
|
261 |
|
262 | private unpushedReadMessages: Buffer[] = [];
|
263 | private unfilteredReadMessages: Buffer[] = [];
|
264 |
|
265 |
|
266 | private mappedStatusCode: Status = Status.UNKNOWN;
|
267 |
|
268 |
|
269 | private finalStatus: StatusObject | null = null;
|
270 |
|
271 | private subchannel: Subchannel | null = null;
|
272 | private disconnectListener: () => void;
|
273 |
|
274 | private listener: InterceptingListener | null = null;
|
275 |
|
276 | private internalError: SystemError | null = null;
|
277 |
|
278 | private configDeadline: Deadline = Infinity;
|
279 |
|
280 | private statusWatchers: ((status: StatusObject) => void)[] = [];
|
281 | private streamEndWatchers: ((success: boolean) => void)[] = [];
|
282 |
|
283 | private callStatsTracker: SubchannelCallStatsTracker | null = null;
|
284 |
|
285 | constructor(
|
286 | private readonly methodName: string,
|
287 | private readonly channel: ChannelImplementation,
|
288 | private readonly options: CallStreamOptions,
|
289 | filterStackFactory: FilterStackFactory,
|
290 | private readonly channelCallCredentials: CallCredentials,
|
291 | private readonly callNumber: number
|
292 | ) {
|
293 | this.filterStack = filterStackFactory.createFilter(this);
|
294 | this.credentials = channelCallCredentials;
|
295 | this.disconnectListener = () => {
|
296 | this.endCall({
|
297 | code: Status.UNAVAILABLE,
|
298 | details: 'Connection dropped',
|
299 | metadata: new Metadata(),
|
300 | });
|
301 | };
|
302 | if (
|
303 | this.options.parentCall &&
|
304 | this.options.flags & Propagate.CANCELLATION
|
305 | ) {
|
306 | this.options.parentCall.on('cancelled', () => {
|
307 | this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
|
308 | });
|
309 | }
|
310 | }
|
311 |
|
312 | private outputStatus() {
|
313 |
|
314 | if (this.listener && !this.statusOutput) {
|
315 | this.statusOutput = true;
|
316 | const filteredStatus = this.filterStack.receiveTrailers(
|
317 | this.finalStatus!
|
318 | );
|
319 | this.trace(
|
320 | 'ended with status: code=' +
|
321 | filteredStatus.code +
|
322 | ' details="' +
|
323 | filteredStatus.details +
|
324 | '"'
|
325 | );
|
326 | this.statusWatchers.forEach(watcher => watcher(filteredStatus));
|
327 | |
328 |
|
329 |
|
330 |
|
331 |
|
332 |
|
333 | process.nextTick(() => {
|
334 | this.listener?.onReceiveStatus(filteredStatus);
|
335 | });
|
336 | |
337 |
|
338 |
|
339 |
|
340 | this.http2Stream?.resume();
|
341 | if (this.subchannel) {
|
342 | this.subchannel.callUnref();
|
343 | this.subchannel.removeDisconnectListener(this.disconnectListener);
|
344 | }
|
345 | }
|
346 | }
|
347 |
|
348 | private trace(text: string): void {
|
349 | logging.trace(
|
350 | LogVerbosity.DEBUG,
|
351 | TRACER_NAME,
|
352 | '[' + this.callNumber + '] ' + text
|
353 | );
|
354 | }
|
355 |
|
356 | |
357 |
|
358 |
|
359 |
|
360 |
|
361 | private endCall(status: StatusObject): void {
|
362 | |
363 |
|
364 | if (this.finalStatus === null || this.finalStatus.code === Status.OK) {
|
365 | this.finalStatus = status;
|
366 | this.maybeOutputStatus();
|
367 | }
|
368 | this.destroyHttp2Stream();
|
369 | }
|
370 |
|
371 | private maybeOutputStatus() {
|
372 | if (this.finalStatus !== null) {
|
373 | |
374 |
|
375 |
|
376 | if (
|
377 | this.finalStatus.code !== Status.OK ||
|
378 | (this.readsClosed &&
|
379 | this.unpushedReadMessages.length === 0 &&
|
380 | this.unfilteredReadMessages.length === 0 &&
|
381 | !this.isReadFilterPending)
|
382 | ) {
|
383 | this.outputStatus();
|
384 | }
|
385 | }
|
386 | }
|
387 |
|
388 | private push(message: Buffer): void {
|
389 | this.trace(
|
390 | 'pushing to reader message of length ' +
|
391 | (message instanceof Buffer ? message.length : null)
|
392 | );
|
393 | this.canPush = false;
|
394 | process.nextTick(() => {
|
395 | |
396 |
|
397 |
|
398 |
|
399 | if (this.statusOutput) {
|
400 | return;
|
401 | }
|
402 | this.listener?.onReceiveMessage(message);
|
403 | this.maybeOutputStatus();
|
404 | });
|
405 | }
|
406 |
|
407 | private handleFilterError(error: Error) {
|
408 | this.cancelWithStatus(Status.INTERNAL, error.message);
|
409 | }
|
410 |
|
411 | private handleFilteredRead(message: Buffer) {
|
412 | |
413 |
|
414 |
|
415 | if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) {
|
416 | this.maybeOutputStatus();
|
417 | return;
|
418 | }
|
419 | this.isReadFilterPending = false;
|
420 | if (this.canPush) {
|
421 | this.http2Stream!.pause();
|
422 | this.push(message);
|
423 | } else {
|
424 | this.trace(
|
425 | 'unpushedReadMessages.push message of length ' + message.length
|
426 | );
|
427 | this.unpushedReadMessages.push(message);
|
428 | }
|
429 | if (this.unfilteredReadMessages.length > 0) {
|
430 | |
431 |
|
432 | const nextMessage = this.unfilteredReadMessages.shift()!;
|
433 | this.filterReceivedMessage(nextMessage);
|
434 | }
|
435 | }
|
436 |
|
437 | private filterReceivedMessage(framedMessage: Buffer) {
|
438 | |
439 |
|
440 |
|
441 | if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) {
|
442 | this.maybeOutputStatus();
|
443 | return;
|
444 | }
|
445 | this.trace('filterReceivedMessage of length ' + framedMessage.length);
|
446 | this.isReadFilterPending = true;
|
447 | this.filterStack
|
448 | .receiveMessage(Promise.resolve(framedMessage))
|
449 | .then(
|
450 | this.handleFilteredRead.bind(this),
|
451 | this.handleFilterError.bind(this)
|
452 | );
|
453 | }
|
454 |
|
455 | private tryPush(messageBytes: Buffer): void {
|
456 | if (this.isReadFilterPending) {
|
457 | this.trace(
|
458 | 'unfilteredReadMessages.push message of length ' +
|
459 | (messageBytes && messageBytes.length)
|
460 | );
|
461 | this.unfilteredReadMessages.push(messageBytes);
|
462 | } else {
|
463 | this.filterReceivedMessage(messageBytes);
|
464 | }
|
465 | }
|
466 |
|
467 | private handleTrailers(headers: http2.IncomingHttpHeaders) {
|
468 | this.streamEndWatchers.forEach(watcher => watcher(true));
|
469 | let headersString = '';
|
470 | for (const header of Object.keys(headers)) {
|
471 | headersString += '\t\t' + header + ': ' + headers[header] + '\n';
|
472 | }
|
473 | this.trace('Received server trailers:\n' + headersString);
|
474 | let metadata: Metadata;
|
475 | try {
|
476 | metadata = Metadata.fromHttp2Headers(headers);
|
477 | } catch (e) {
|
478 | metadata = new Metadata();
|
479 | }
|
480 | const metadataMap = metadata.getMap();
|
481 | let code: Status = this.mappedStatusCode;
|
482 | if (
|
483 | code === Status.UNKNOWN &&
|
484 | typeof metadataMap['grpc-status'] === 'string'
|
485 | ) {
|
486 | const receivedStatus = Number(metadataMap['grpc-status']);
|
487 | if (receivedStatus in Status) {
|
488 | code = receivedStatus;
|
489 | this.trace('received status code ' + receivedStatus + ' from server');
|
490 | }
|
491 | metadata.remove('grpc-status');
|
492 | }
|
493 | let details = '';
|
494 | if (typeof metadataMap['grpc-message'] === 'string') {
|
495 | try {
|
496 | details = decodeURI(metadataMap['grpc-message']);
|
497 | } catch (e) {
|
498 | details = metadataMap['grpc-message'];
|
499 | }
|
500 | metadata.remove('grpc-message');
|
501 | this.trace(
|
502 | 'received status details string "' + details + '" from server'
|
503 | );
|
504 | }
|
505 | const status: StatusObject = { code, details, metadata };
|
506 |
|
507 | this.endCall(status);
|
508 | }
|
509 |
|
510 | private writeMessageToStream(message: Buffer, callback: WriteCallback) {
|
511 | this.callStatsTracker?.addMessageSent();
|
512 | this.http2Stream!.write(message, callback);
|
513 | }
|
514 |
|
515 | attachHttp2Stream(
|
516 | stream: http2.ClientHttp2Stream,
|
517 | subchannel: Subchannel,
|
518 | extraFilters: Filter[],
|
519 | callStatsTracker: SubchannelCallStatsTracker
|
520 | ): void {
|
521 | this.filterStack.push(extraFilters);
|
522 | if (this.finalStatus !== null) {
|
523 | stream.close(NGHTTP2_CANCEL);
|
524 | } else {
|
525 | this.trace(
|
526 | 'attachHttp2Stream from subchannel ' + subchannel.getAddress()
|
527 | );
|
528 | this.http2Stream = stream;
|
529 | this.subchannel = subchannel;
|
530 | this.callStatsTracker = callStatsTracker;
|
531 | subchannel.addDisconnectListener(this.disconnectListener);
|
532 | subchannel.callRef();
|
533 | stream.on('response', (headers, flags) => {
|
534 | let headersString = '';
|
535 | for (const header of Object.keys(headers)) {
|
536 | headersString += '\t\t' + header + ': ' + headers[header] + '\n';
|
537 | }
|
538 | this.trace('Received server headers:\n' + headersString);
|
539 | switch (headers[':status']) {
|
540 |
|
541 | case 400:
|
542 | this.mappedStatusCode = Status.INTERNAL;
|
543 | break;
|
544 | case 401:
|
545 | this.mappedStatusCode = Status.UNAUTHENTICATED;
|
546 | break;
|
547 | case 403:
|
548 | this.mappedStatusCode = Status.PERMISSION_DENIED;
|
549 | break;
|
550 | case 404:
|
551 | this.mappedStatusCode = Status.UNIMPLEMENTED;
|
552 | break;
|
553 | case 429:
|
554 | case 502:
|
555 | case 503:
|
556 | case 504:
|
557 | this.mappedStatusCode = Status.UNAVAILABLE;
|
558 | break;
|
559 | default:
|
560 | this.mappedStatusCode = Status.UNKNOWN;
|
561 | }
|
562 |
|
563 | if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
|
564 | this.handleTrailers(headers);
|
565 | } else {
|
566 | let metadata: Metadata;
|
567 | try {
|
568 | metadata = Metadata.fromHttp2Headers(headers);
|
569 | } catch (error) {
|
570 | this.endCall({
|
571 | code: Status.UNKNOWN,
|
572 | details: error.message,
|
573 | metadata: new Metadata(),
|
574 | });
|
575 | return;
|
576 | }
|
577 | try {
|
578 | const finalMetadata = this.filterStack.receiveMetadata(metadata);
|
579 | this.listener?.onReceiveMetadata(finalMetadata);
|
580 | } catch (error) {
|
581 | this.endCall({
|
582 | code: Status.UNKNOWN,
|
583 | details: error.message,
|
584 | metadata: new Metadata(),
|
585 | });
|
586 | }
|
587 | }
|
588 | });
|
589 | stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
|
590 | this.handleTrailers(headers);
|
591 | });
|
592 | stream.on('data', (data: Buffer) => {
|
593 | |
594 |
|
595 | if (this.statusOutput) {
|
596 | return;
|
597 | }
|
598 | this.trace('receive HTTP/2 data frame of length ' + data.length);
|
599 | const messages = this.decoder.write(data);
|
600 |
|
601 | for (const message of messages) {
|
602 | this.trace('parsed message of length ' + message.length);
|
603 | this.callStatsTracker!.addMessageReceived();
|
604 | this.tryPush(message);
|
605 | }
|
606 | });
|
607 | stream.on('end', () => {
|
608 | this.readsClosed = true;
|
609 | this.maybeOutputStatus();
|
610 | });
|
611 | stream.on('close', () => {
|
612 | |
613 |
|
614 |
|
615 | process.nextTick(() => {
|
616 | this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
|
617 | |
618 |
|
619 |
|
620 |
|
621 | if (this.finalStatus?.code === Status.OK) {
|
622 | return;
|
623 | }
|
624 | let code: Status;
|
625 | let details = '';
|
626 | switch (stream.rstCode) {
|
627 | case http2.constants.NGHTTP2_NO_ERROR:
|
628 | |
629 |
|
630 |
|
631 | if (this.finalStatus !== null) {
|
632 | return;
|
633 | }
|
634 | code = Status.INTERNAL;
|
635 | details = `Received RST_STREAM with code ${stream.rstCode}`;
|
636 | break;
|
637 | case http2.constants.NGHTTP2_REFUSED_STREAM:
|
638 | code = Status.UNAVAILABLE;
|
639 | details = 'Stream refused by server';
|
640 | break;
|
641 | case http2.constants.NGHTTP2_CANCEL:
|
642 | code = Status.CANCELLED;
|
643 | details = 'Call cancelled';
|
644 | break;
|
645 | case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
|
646 | code = Status.RESOURCE_EXHAUSTED;
|
647 | details = 'Bandwidth exhausted or memory limit exceeded';
|
648 | break;
|
649 | case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
|
650 | code = Status.PERMISSION_DENIED;
|
651 | details = 'Protocol not secure enough';
|
652 | break;
|
653 | case http2.constants.NGHTTP2_INTERNAL_ERROR:
|
654 | code = Status.INTERNAL;
|
655 | if (this.internalError === null) {
|
656 | |
657 |
|
658 |
|
659 |
|
660 |
|
661 | details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
|
662 | } else {
|
663 | if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') {
|
664 | code = Status.UNAVAILABLE;
|
665 | details = this.internalError.message;
|
666 | } else {
|
667 | |
668 |
|
669 |
|
670 |
|
671 | details = `Received RST_STREAM with code ${stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
|
672 | }
|
673 | }
|
674 | break;
|
675 | default:
|
676 | code = Status.INTERNAL;
|
677 | details = `Received RST_STREAM with code ${stream.rstCode}`;
|
678 | }
|
679 |
|
680 |
|
681 |
|
682 |
|
683 | this.endCall({ code, details, metadata: new Metadata() });
|
684 | });
|
685 | });
|
686 | stream.on('error', (err: SystemError) => {
|
687 | |
688 |
|
689 |
|
690 | |
691 |
|
692 |
|
693 |
|
694 | if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
|
695 | this.trace(
|
696 | 'Node error event: message=' +
|
697 | err.message +
|
698 | ' code=' +
|
699 | err.code +
|
700 | ' errno=' +
|
701 | getSystemErrorName(err.errno) +
|
702 | ' syscall=' +
|
703 | err.syscall
|
704 | );
|
705 | this.internalError = err;
|
706 | }
|
707 | this.streamEndWatchers.forEach(watcher => watcher(false));
|
708 | });
|
709 | if (this.pendingWrite) {
|
710 | if (!this.pendingWriteCallback) {
|
711 | throw new Error('Invalid state in write handling code');
|
712 | }
|
713 | this.trace(
|
714 | 'sending data chunk of length ' +
|
715 | this.pendingWrite.length +
|
716 | ' (deferred)'
|
717 | );
|
718 | try {
|
719 | this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback);
|
720 | } catch (error) {
|
721 | this.endCall({
|
722 | code: Status.UNAVAILABLE,
|
723 | details: `Write failed with error ${error.message}`,
|
724 | metadata: new Metadata()
|
725 | });
|
726 | }
|
727 | }
|
728 | this.maybeCloseWrites();
|
729 | }
|
730 | }
|
731 |
|
732 | start(metadata: Metadata, listener: InterceptingListener) {
|
733 | this.trace('Sending metadata');
|
734 | this.listener = listener;
|
735 | this.channel._startCallStream(this, metadata);
|
736 | this.maybeOutputStatus();
|
737 | }
|
738 |
|
739 | private destroyHttp2Stream() {
|
740 |
|
741 |
|
742 | if (this.http2Stream !== null && !this.http2Stream.destroyed) {
|
743 | |
744 |
|
745 |
|
746 | let code: number;
|
747 | if (this.finalStatus?.code === Status.OK) {
|
748 | code = http2.constants.NGHTTP2_NO_ERROR;
|
749 | } else {
|
750 | code = http2.constants.NGHTTP2_CANCEL;
|
751 | }
|
752 | this.trace('close http2 stream with code ' + code);
|
753 | this.http2Stream.close(code);
|
754 | }
|
755 | }
|
756 |
|
757 | cancelWithStatus(status: Status, details: string): void {
|
758 | this.trace(
|
759 | 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
|
760 | );
|
761 | this.endCall({ code: status, details, metadata: new Metadata() });
|
762 | }
|
763 |
|
764 | getDeadline(): Deadline {
|
765 | const deadlineList = [this.options.deadline];
|
766 | if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) {
|
767 | deadlineList.push(this.options.parentCall.getDeadline());
|
768 | }
|
769 | if (this.configDeadline) {
|
770 | deadlineList.push(this.configDeadline);
|
771 | }
|
772 | return getMinDeadline(deadlineList);
|
773 | }
|
774 |
|
775 | getCredentials(): CallCredentials {
|
776 | return this.credentials;
|
777 | }
|
778 |
|
779 | setCredentials(credentials: CallCredentials): void {
|
780 | this.credentials = this.channelCallCredentials.compose(credentials);
|
781 | }
|
782 |
|
783 | getStatus(): StatusObject | null {
|
784 | return this.finalStatus;
|
785 | }
|
786 |
|
787 | getPeer(): string {
|
788 | return this.subchannel?.getAddress() ?? this.channel.getTarget();
|
789 | }
|
790 |
|
791 | getMethod(): string {
|
792 | return this.methodName;
|
793 | }
|
794 |
|
795 | getHost(): string {
|
796 | return this.options.host;
|
797 | }
|
798 |
|
799 | setConfigDeadline(configDeadline: Deadline) {
|
800 | this.configDeadline = configDeadline;
|
801 | }
|
802 |
|
803 | addStatusWatcher(watcher: (status: StatusObject) => void) {
|
804 | this.statusWatchers.push(watcher);
|
805 | }
|
806 |
|
807 | addStreamEndWatcher(watcher: (success: boolean) => void) {
|
808 | this.streamEndWatchers.push(watcher);
|
809 | }
|
810 |
|
811 | addFilters(extraFilters: Filter[]) {
|
812 | this.filterStack.push(extraFilters);
|
813 | }
|
814 |
|
815 | getCallNumber() {
|
816 | return this.callNumber;
|
817 | }
|
818 |
|
819 | startRead() {
|
820 | |
821 |
|
822 | if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) {
|
823 | this.readsClosed = true;
|
824 | this.maybeOutputStatus();
|
825 | return;
|
826 | }
|
827 | this.canPush = true;
|
828 | if (this.http2Stream === null) {
|
829 | this.pendingRead = true;
|
830 | } else {
|
831 | if (this.unpushedReadMessages.length > 0) {
|
832 | const nextMessage: Buffer = this.unpushedReadMessages.shift()!;
|
833 | this.push(nextMessage);
|
834 | return;
|
835 | }
|
836 | |
837 |
|
838 | this.http2Stream.resume();
|
839 | }
|
840 | }
|
841 |
|
842 | private maybeCloseWrites() {
|
843 | if (
|
844 | this.writesClosed &&
|
845 | !this.isWriteFilterPending &&
|
846 | this.http2Stream !== null
|
847 | ) {
|
848 | this.trace('calling end() on HTTP/2 stream');
|
849 | this.http2Stream.end();
|
850 | }
|
851 | }
|
852 |
|
853 | sendMessageWithContext(context: MessageContext, message: Buffer) {
|
854 | this.trace('write() called with message of length ' + message.length);
|
855 | const writeObj: WriteObject = {
|
856 | message,
|
857 | flags: context.flags,
|
858 | };
|
859 | const cb: WriteCallback = (error?: Error | null) => {
|
860 | let code: Status = Status.UNAVAILABLE;
|
861 | if ((error as NodeJS.ErrnoException)?.code === 'ERR_STREAM_WRITE_AFTER_END') {
|
862 | code = Status.INTERNAL;
|
863 | }
|
864 | if (error) {
|
865 | this.cancelWithStatus(code, `Write error: ${error.message}`);
|
866 | }
|
867 | context.callback?.();
|
868 | };
|
869 | this.isWriteFilterPending = true;
|
870 | this.filterStack.sendMessage(Promise.resolve(writeObj)).then((message) => {
|
871 | this.isWriteFilterPending = false;
|
872 | if (this.http2Stream === null) {
|
873 | this.trace(
|
874 | 'deferring writing data chunk of length ' + message.message.length
|
875 | );
|
876 | this.pendingWrite = message.message;
|
877 | this.pendingWriteCallback = cb;
|
878 | } else {
|
879 | this.trace('sending data chunk of length ' + message.message.length);
|
880 | try {
|
881 | this.writeMessageToStream(message.message, cb);
|
882 | } catch (error) {
|
883 | this.endCall({
|
884 | code: Status.UNAVAILABLE,
|
885 | details: `Write failed with error ${error.message}`,
|
886 | metadata: new Metadata()
|
887 | });
|
888 | }
|
889 | this.maybeCloseWrites();
|
890 | }
|
891 | }, this.handleFilterError.bind(this));
|
892 | }
|
893 |
|
894 | halfClose() {
|
895 | this.trace('end() called');
|
896 | this.writesClosed = true;
|
897 | this.maybeCloseWrites();
|
898 | }
|
899 | }
|