UNPKG

24.2 kBPlain TextView Raw
1/*
2 * Copyright 2022 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { CallCredentials } from './call-credentials';
19import { LogVerbosity, Status } from './constants';
20import { Deadline } from './deadline';
21import { Metadata } from './metadata';
22import { CallConfig } from './resolver';
23import * as logging from './logging';
24import {
25 Call,
26 InterceptingListener,
27 MessageContext,
28 StatusObject,
29 WriteCallback,
30 WriteObject,
31} from './call-interface';
32import {
33 LoadBalancingCall,
34 StatusObjectWithProgress,
35} from './load-balancing-call';
36import { InternalChannel } from './internal-channel';
37
38const TRACER_NAME = 'retrying_call';
39
40export class RetryThrottler {
41 private tokens: number;
42 constructor(
43 private readonly maxTokens: number,
44 private readonly tokenRatio: number,
45 previousRetryThrottler?: RetryThrottler
46 ) {
47 if (previousRetryThrottler) {
48 /* When carrying over tokens from a previous config, rescale them to the
49 * new max value */
50 this.tokens =
51 previousRetryThrottler.tokens *
52 (maxTokens / previousRetryThrottler.maxTokens);
53 } else {
54 this.tokens = maxTokens;
55 }
56 }
57
58 addCallSucceeded() {
59 this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
60 }
61
62 addCallFailed() {
63 this.tokens = Math.min(this.tokens - 1, 0);
64 }
65
66 canRetryCall() {
67 return this.tokens > this.maxTokens / 2;
68 }
69}
70
71export class MessageBufferTracker {
72 private totalAllocated = 0;
73 private allocatedPerCall: Map<number, number> = new Map<number, number>();
74
75 constructor(private totalLimit: number, private limitPerCall: number) {}
76
77 allocate(size: number, callId: number): boolean {
78 const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
79 if (
80 this.limitPerCall - currentPerCall < size ||
81 this.totalLimit - this.totalAllocated < size
82 ) {
83 return false;
84 }
85 this.allocatedPerCall.set(callId, currentPerCall + size);
86 this.totalAllocated += size;
87 return true;
88 }
89
90 free(size: number, callId: number) {
91 if (this.totalAllocated < size) {
92 throw new Error(
93 `Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`
94 );
95 }
96 this.totalAllocated -= size;
97 const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
98 if (currentPerCall < size) {
99 throw new Error(
100 `Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`
101 );
102 }
103 this.allocatedPerCall.set(callId, currentPerCall - size);
104 }
105
106 freeAll(callId: number) {
107 const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
108 if (this.totalAllocated < currentPerCall) {
109 throw new Error(
110 `Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`
111 );
112 }
113 this.totalAllocated -= currentPerCall;
114 this.allocatedPerCall.delete(callId);
115 }
116}
117
118type UnderlyingCallState = 'ACTIVE' | 'COMPLETED';
119
120interface UnderlyingCall {
121 state: UnderlyingCallState;
122 call: LoadBalancingCall;
123 nextMessageToSend: number;
124}
125
126/**
127 * A retrying call can be in one of these states:
128 * RETRY: Retries are configured and new attempts may be sent
129 * HEDGING: Hedging is configured and new attempts may be sent
130 * TRANSPARENT_ONLY: Neither retries nor hedging are configured, and
131 * transparent retry attempts may still be sent
132 * COMMITTED: One attempt is committed, and no new attempts will be
133 * sent
134 */
135type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED';
136
137/**
138 * The different types of objects that can be stored in the write buffer, with
139 * the following meanings:
140 * MESSAGE: This is a message to be sent.
141 * HALF_CLOSE: When this entry is reached, the calls should send a half-close.
142 * FREED: This slot previously contained a message that has been sent on all
143 * child calls and is no longer needed.
144 */
145type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED';
146
147/**
148 * Entry in the buffer of messages to send to the remote end.
149 */
150interface WriteBufferEntry {
151 entryType: WriteBufferEntryType;
152 /**
153 * Message to send.
154 * Only populated if entryType is MESSAGE.
155 */
156 message?: WriteObject;
157 /**
158 * Callback to call after sending the message.
159 * Only populated if entryType is MESSAGE and the call is in the COMMITTED
160 * state.
161 */
162 callback?: WriteCallback;
163 /**
164 * Indicates whether the message is allocated in the buffer tracker. Ignored
165 * if entryType is not MESSAGE. Should be the return value of
166 * bufferTracker.allocate.
167 */
168 allocated: boolean;
169}
170
171const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
172
173export class RetryingCall implements Call {
174 private state: RetryingCallState;
175 private listener: InterceptingListener | null = null;
176 private initialMetadata: Metadata | null = null;
177 private underlyingCalls: UnderlyingCall[] = [];
178 private writeBuffer: WriteBufferEntry[] = [];
179 /**
180 * The offset of message indices in the writeBuffer. For example, if
181 * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
182 * is in writeBuffer[5].
183 */
184 private writeBufferOffset = 0;
185 /**
186 * Tracks whether a read has been started, so that we know whether to start
187 * reads on new child calls. This only matters for the first read, because
188 * once a message comes in the child call becomes committed and there will
189 * be no new child calls.
190 */
191 private readStarted = false;
192 private transparentRetryUsed = false;
193 /**
194 * Number of attempts so far
195 */
196 private attempts = 0;
197 private hedgingTimer: NodeJS.Timeout | null = null;
198 private committedCallIndex: number | null = null;
199 private initialRetryBackoffSec = 0;
200 private nextRetryBackoffSec = 0;
201 constructor(
202 private readonly channel: InternalChannel,
203 private readonly callConfig: CallConfig,
204 private readonly methodName: string,
205 private readonly host: string,
206 private readonly credentials: CallCredentials,
207 private readonly deadline: Deadline,
208 private readonly callNumber: number,
209 private readonly bufferTracker: MessageBufferTracker,
210 private readonly retryThrottler?: RetryThrottler
211 ) {
212 if (callConfig.methodConfig.retryPolicy) {
213 this.state = 'RETRY';
214 const retryPolicy = callConfig.methodConfig.retryPolicy;
215 this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(
216 retryPolicy.initialBackoff.substring(
217 0,
218 retryPolicy.initialBackoff.length - 1
219 )
220 );
221 } else if (callConfig.methodConfig.hedgingPolicy) {
222 this.state = 'HEDGING';
223 } else {
224 this.state = 'TRANSPARENT_ONLY';
225 }
226 }
227 getCallNumber(): number {
228 return this.callNumber;
229 }
230
231 private trace(text: string): void {
232 logging.trace(
233 LogVerbosity.DEBUG,
234 TRACER_NAME,
235 '[' + this.callNumber + '] ' + text
236 );
237 }
238
239 private reportStatus(statusObject: StatusObject) {
240 this.trace(
241 'ended with status: code=' +
242 statusObject.code +
243 ' details="' +
244 statusObject.details +
245 '"'
246 );
247 this.bufferTracker.freeAll(this.callNumber);
248 this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
249 this.writeBuffer = [];
250 process.nextTick(() => {
251 // Explicitly construct status object to remove progress field
252 this.listener?.onReceiveStatus({
253 code: statusObject.code,
254 details: statusObject.details,
255 metadata: statusObject.metadata,
256 });
257 });
258 }
259
260 cancelWithStatus(status: Status, details: string): void {
261 this.trace(
262 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
263 );
264 this.reportStatus({ code: status, details, metadata: new Metadata() });
265 for (const { call } of this.underlyingCalls) {
266 call.cancelWithStatus(status, details);
267 }
268 }
269 getPeer(): string {
270 if (this.committedCallIndex !== null) {
271 return this.underlyingCalls[this.committedCallIndex].call.getPeer();
272 } else {
273 return 'unknown';
274 }
275 }
276
277 private getBufferEntry(messageIndex: number): WriteBufferEntry {
278 return (
279 this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {
280 entryType: 'FREED',
281 allocated: false,
282 }
283 );
284 }
285
286 private getNextBufferIndex() {
287 return this.writeBufferOffset + this.writeBuffer.length;
288 }
289
290 private clearSentMessages() {
291 if (this.state !== 'COMMITTED') {
292 return;
293 }
294 const earliestNeededMessageIndex =
295 this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
296 for (
297 let messageIndex = this.writeBufferOffset;
298 messageIndex < earliestNeededMessageIndex;
299 messageIndex++
300 ) {
301 const bufferEntry = this.getBufferEntry(messageIndex);
302 if (bufferEntry.allocated) {
303 this.bufferTracker.free(
304 bufferEntry.message!.message.length,
305 this.callNumber
306 );
307 }
308 }
309 this.writeBuffer = this.writeBuffer.slice(
310 earliestNeededMessageIndex - this.writeBufferOffset
311 );
312 this.writeBufferOffset = earliestNeededMessageIndex;
313 }
314
315 private commitCall(index: number) {
316 if (this.state === 'COMMITTED') {
317 return;
318 }
319 if (this.underlyingCalls[index].state === 'COMPLETED') {
320 return;
321 }
322 this.trace(
323 'Committing call [' +
324 this.underlyingCalls[index].call.getCallNumber() +
325 '] at index ' +
326 index
327 );
328 this.state = 'COMMITTED';
329 this.committedCallIndex = index;
330 for (let i = 0; i < this.underlyingCalls.length; i++) {
331 if (i === index) {
332 continue;
333 }
334 if (this.underlyingCalls[i].state === 'COMPLETED') {
335 continue;
336 }
337 this.underlyingCalls[i].state = 'COMPLETED';
338 this.underlyingCalls[i].call.cancelWithStatus(
339 Status.CANCELLED,
340 'Discarded in favor of other hedged attempt'
341 );
342 }
343 this.clearSentMessages();
344 }
345
346 private commitCallWithMostMessages() {
347 if (this.state === 'COMMITTED') {
348 return;
349 }
350 let mostMessages = -1;
351 let callWithMostMessages = -1;
352 for (const [index, childCall] of this.underlyingCalls.entries()) {
353 if (
354 childCall.state === 'ACTIVE' &&
355 childCall.nextMessageToSend > mostMessages
356 ) {
357 mostMessages = childCall.nextMessageToSend;
358 callWithMostMessages = index;
359 }
360 }
361 if (callWithMostMessages === -1) {
362 /* There are no active calls, disable retries to force the next call that
363 * is started to be committed. */
364 this.state = 'TRANSPARENT_ONLY';
365 } else {
366 this.commitCall(callWithMostMessages);
367 }
368 }
369
370 private isStatusCodeInList(list: (Status | string)[], code: Status) {
371 return list.some(
372 value =>
373 value === code ||
374 value.toString().toLowerCase() === Status[code].toLowerCase()
375 );
376 }
377
378 private getNextRetryBackoffMs() {
379 const retryPolicy = this.callConfig?.methodConfig.retryPolicy;
380 if (!retryPolicy) {
381 return 0;
382 }
383 const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
384 const maxBackoffSec = Number(
385 retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1)
386 );
387 this.nextRetryBackoffSec = Math.min(
388 this.nextRetryBackoffSec * retryPolicy.backoffMultiplier,
389 maxBackoffSec
390 );
391 return nextBackoffMs;
392 }
393
394 private maybeRetryCall(
395 pushback: number | null,
396 callback: (retried: boolean) => void
397 ) {
398 if (this.state !== 'RETRY') {
399 callback(false);
400 return;
401 }
402 const retryPolicy = this.callConfig!.methodConfig.retryPolicy!;
403 if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) {
404 callback(false);
405 return;
406 }
407 let retryDelayMs: number;
408 if (pushback === null) {
409 retryDelayMs = this.getNextRetryBackoffMs();
410 } else if (pushback < 0) {
411 this.state = 'TRANSPARENT_ONLY';
412 callback(false);
413 return;
414 } else {
415 retryDelayMs = pushback;
416 this.nextRetryBackoffSec = this.initialRetryBackoffSec;
417 }
418 setTimeout(() => {
419 if (this.state !== 'RETRY') {
420 callback(false);
421 return;
422 }
423 if (this.retryThrottler?.canRetryCall() ?? true) {
424 callback(true);
425 this.attempts += 1;
426 this.startNewAttempt();
427 }
428 }, retryDelayMs);
429 }
430
431 private countActiveCalls(): number {
432 let count = 0;
433 for (const call of this.underlyingCalls) {
434 if (call?.state === 'ACTIVE') {
435 count += 1;
436 }
437 }
438 return count;
439 }
440
441 private handleProcessedStatus(
442 status: StatusObject,
443 callIndex: number,
444 pushback: number | null
445 ) {
446 switch (this.state) {
447 case 'COMMITTED':
448 case 'TRANSPARENT_ONLY':
449 this.commitCall(callIndex);
450 this.reportStatus(status);
451 break;
452 case 'HEDGING':
453 if (
454 this.isStatusCodeInList(
455 this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ??
456 [],
457 status.code
458 )
459 ) {
460 this.retryThrottler?.addCallFailed();
461 let delayMs: number;
462 if (pushback === null) {
463 delayMs = 0;
464 } else if (pushback < 0) {
465 this.state = 'TRANSPARENT_ONLY';
466 this.commitCall(callIndex);
467 this.reportStatus(status);
468 return;
469 } else {
470 delayMs = pushback;
471 }
472 setTimeout(() => {
473 this.maybeStartHedgingAttempt();
474 // If after trying to start a call there are no active calls, this was the last one
475 if (this.countActiveCalls() === 0) {
476 this.commitCall(callIndex);
477 this.reportStatus(status);
478 }
479 }, delayMs);
480 } else {
481 this.commitCall(callIndex);
482 this.reportStatus(status);
483 }
484 break;
485 case 'RETRY':
486 if (
487 this.isStatusCodeInList(
488 this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes,
489 status.code
490 )
491 ) {
492 this.retryThrottler?.addCallFailed();
493 this.maybeRetryCall(pushback, retried => {
494 if (!retried) {
495 this.commitCall(callIndex);
496 this.reportStatus(status);
497 }
498 });
499 } else {
500 this.commitCall(callIndex);
501 this.reportStatus(status);
502 }
503 break;
504 }
505 }
506
507 private getPushback(metadata: Metadata): number | null {
508 const mdValue = metadata.get('grpc-retry-pushback-ms');
509 if (mdValue.length === 0) {
510 return null;
511 }
512 try {
513 return parseInt(mdValue[0] as string);
514 } catch (e) {
515 return -1;
516 }
517 }
518
519 private handleChildStatus(
520 status: StatusObjectWithProgress,
521 callIndex: number
522 ) {
523 if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
524 return;
525 }
526 this.trace(
527 'state=' +
528 this.state +
529 ' handling status with progress ' +
530 status.progress +
531 ' from child [' +
532 this.underlyingCalls[callIndex].call.getCallNumber() +
533 '] in state ' +
534 this.underlyingCalls[callIndex].state
535 );
536 this.underlyingCalls[callIndex].state = 'COMPLETED';
537 if (status.code === Status.OK) {
538 this.retryThrottler?.addCallSucceeded();
539 this.commitCall(callIndex);
540 this.reportStatus(status);
541 return;
542 }
543 if (this.state === 'COMMITTED') {
544 this.reportStatus(status);
545 return;
546 }
547 const pushback = this.getPushback(status.metadata);
548 switch (status.progress) {
549 case 'NOT_STARTED':
550 // RPC never leaves the client, always safe to retry
551 this.startNewAttempt();
552 break;
553 case 'REFUSED':
554 // RPC reaches the server library, but not the server application logic
555 if (this.transparentRetryUsed) {
556 this.handleProcessedStatus(status, callIndex, pushback);
557 } else {
558 this.transparentRetryUsed = true;
559 this.startNewAttempt();
560 }
561 break;
562 case 'DROP':
563 this.commitCall(callIndex);
564 this.reportStatus(status);
565 break;
566 case 'PROCESSED':
567 this.handleProcessedStatus(status, callIndex, pushback);
568 break;
569 }
570 }
571
572 private maybeStartHedgingAttempt() {
573 if (this.state !== 'HEDGING') {
574 return;
575 }
576 if (!this.callConfig.methodConfig.hedgingPolicy) {
577 return;
578 }
579 const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
580 if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
581 return;
582 }
583 this.attempts += 1;
584 this.startNewAttempt();
585 this.maybeStartHedgingTimer();
586 }
587
588 private maybeStartHedgingTimer() {
589 if (this.hedgingTimer) {
590 clearTimeout(this.hedgingTimer);
591 }
592 if (this.state !== 'HEDGING') {
593 return;
594 }
595 if (!this.callConfig.methodConfig.hedgingPolicy) {
596 return;
597 }
598 const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
599 if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
600 return;
601 }
602 const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s';
603 const hedgingDelaySec = Number(
604 hedgingDelayString.substring(0, hedgingDelayString.length - 1)
605 );
606 this.hedgingTimer = setTimeout(() => {
607 this.maybeStartHedgingAttempt();
608 }, hedgingDelaySec * 1000);
609 this.hedgingTimer.unref?.();
610 }
611
612 private startNewAttempt() {
613 const child = this.channel.createLoadBalancingCall(
614 this.callConfig,
615 this.methodName,
616 this.host,
617 this.credentials,
618 this.deadline
619 );
620 this.trace(
621 'Created child call [' +
622 child.getCallNumber() +
623 '] for attempt ' +
624 this.attempts
625 );
626 const index = this.underlyingCalls.length;
627 this.underlyingCalls.push({
628 state: 'ACTIVE',
629 call: child,
630 nextMessageToSend: 0,
631 });
632 const previousAttempts = this.attempts - 1;
633 const initialMetadata = this.initialMetadata!.clone();
634 if (previousAttempts > 0) {
635 initialMetadata.set(
636 PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
637 `${previousAttempts}`
638 );
639 }
640 let receivedMetadata = false;
641 child.start(initialMetadata, {
642 onReceiveMetadata: metadata => {
643 this.trace(
644 'Received metadata from child [' + child.getCallNumber() + ']'
645 );
646 this.commitCall(index);
647 receivedMetadata = true;
648 if (previousAttempts > 0) {
649 metadata.set(
650 PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
651 `${previousAttempts}`
652 );
653 }
654 if (this.underlyingCalls[index].state === 'ACTIVE') {
655 this.listener!.onReceiveMetadata(metadata);
656 }
657 },
658 onReceiveMessage: message => {
659 this.trace(
660 'Received message from child [' + child.getCallNumber() + ']'
661 );
662 this.commitCall(index);
663 if (this.underlyingCalls[index].state === 'ACTIVE') {
664 this.listener!.onReceiveMessage(message);
665 }
666 },
667 onReceiveStatus: status => {
668 this.trace(
669 'Received status from child [' + child.getCallNumber() + ']'
670 );
671 if (!receivedMetadata && previousAttempts > 0) {
672 status.metadata.set(
673 PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
674 `${previousAttempts}`
675 );
676 }
677 this.handleChildStatus(status, index);
678 },
679 });
680 this.sendNextChildMessage(index);
681 if (this.readStarted) {
682 child.startRead();
683 }
684 }
685
686 start(metadata: Metadata, listener: InterceptingListener): void {
687 this.trace('start called');
688 this.listener = listener;
689 this.initialMetadata = metadata;
690 this.attempts += 1;
691 this.startNewAttempt();
692 this.maybeStartHedgingTimer();
693 }
694
695 private handleChildWriteCompleted(childIndex: number) {
696 const childCall = this.underlyingCalls[childIndex];
697 const messageIndex = childCall.nextMessageToSend;
698 this.getBufferEntry(messageIndex).callback?.();
699 this.clearSentMessages();
700 childCall.nextMessageToSend += 1;
701 this.sendNextChildMessage(childIndex);
702 }
703
704 private sendNextChildMessage(childIndex: number) {
705 const childCall = this.underlyingCalls[childIndex];
706 if (childCall.state === 'COMPLETED') {
707 return;
708 }
709 if (this.getBufferEntry(childCall.nextMessageToSend)) {
710 const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
711 switch (bufferEntry.entryType) {
712 case 'MESSAGE':
713 childCall.call.sendMessageWithContext(
714 {
715 callback: error => {
716 // Ignore error
717 this.handleChildWriteCompleted(childIndex);
718 },
719 },
720 bufferEntry.message!.message
721 );
722 break;
723 case 'HALF_CLOSE':
724 childCall.nextMessageToSend += 1;
725 childCall.call.halfClose();
726 break;
727 case 'FREED':
728 // Should not be possible
729 break;
730 }
731 }
732 }
733
734 sendMessageWithContext(context: MessageContext, message: Buffer): void {
735 this.trace('write() called with message of length ' + message.length);
736 const writeObj: WriteObject = {
737 message,
738 flags: context.flags,
739 };
740 const messageIndex = this.getNextBufferIndex();
741 const bufferEntry: WriteBufferEntry = {
742 entryType: 'MESSAGE',
743 message: writeObj,
744 allocated: this.bufferTracker.allocate(message.length, this.callNumber),
745 };
746 this.writeBuffer.push(bufferEntry);
747 if (bufferEntry.allocated) {
748 context.callback?.();
749 for (const [callIndex, call] of this.underlyingCalls.entries()) {
750 if (
751 call.state === 'ACTIVE' &&
752 call.nextMessageToSend === messageIndex
753 ) {
754 call.call.sendMessageWithContext(
755 {
756 callback: error => {
757 // Ignore error
758 this.handleChildWriteCompleted(callIndex);
759 },
760 },
761 message
762 );
763 }
764 }
765 } else {
766 this.commitCallWithMostMessages();
767 // commitCallWithMostMessages can fail if we are between ping attempts
768 if (this.committedCallIndex === null) {
769 return;
770 }
771 const call = this.underlyingCalls[this.committedCallIndex];
772 bufferEntry.callback = context.callback;
773 if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
774 call.call.sendMessageWithContext(
775 {
776 callback: error => {
777 // Ignore error
778 this.handleChildWriteCompleted(this.committedCallIndex!);
779 },
780 },
781 message
782 );
783 }
784 }
785 }
786 startRead(): void {
787 this.trace('startRead called');
788 this.readStarted = true;
789 for (const underlyingCall of this.underlyingCalls) {
790 if (underlyingCall?.state === 'ACTIVE') {
791 underlyingCall.call.startRead();
792 }
793 }
794 }
795 halfClose(): void {
796 this.trace('halfClose called');
797 const halfCloseIndex = this.getNextBufferIndex();
798 this.writeBuffer.push({
799 entryType: 'HALF_CLOSE',
800 allocated: false,
801 });
802 for (const call of this.underlyingCalls) {
803 if (
804 call?.state === 'ACTIVE' &&
805 call.nextMessageToSend === halfCloseIndex
806 ) {
807 call.nextMessageToSend += 1;
808 call.call.halfClose();
809 }
810 }
811 }
812 setCredentials(newCredentials: CallCredentials): void {
813 throw new Error('Method not implemented.');
814 }
815 getMethod(): string {
816 return this.methodName;
817 }
818 getHost(): string {
819 return this.host;
820 }
821}