1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { CallCredentials } from './call-credentials';
|
19 | import { LogVerbosity, Status } from './constants';
|
20 | import { Deadline } from './deadline';
|
21 | import { Metadata } from './metadata';
|
22 | import { CallConfig } from './resolver';
|
23 | import * as logging from './logging';
|
24 | import {
|
25 | Call,
|
26 | InterceptingListener,
|
27 | MessageContext,
|
28 | StatusObject,
|
29 | WriteCallback,
|
30 | WriteObject,
|
31 | } from './call-interface';
|
32 | import {
|
33 | LoadBalancingCall,
|
34 | StatusObjectWithProgress,
|
35 | } from './load-balancing-call';
|
36 | import { InternalChannel } from './internal-channel';
|
37 |
|
38 | const TRACER_NAME = 'retrying_call';
|
39 |
|
40 | export class RetryThrottler {
|
41 | private tokens: number;
|
42 | constructor(
|
43 | private readonly maxTokens: number,
|
44 | private readonly tokenRatio: number,
|
45 | previousRetryThrottler?: RetryThrottler
|
46 | ) {
|
47 | if (previousRetryThrottler) {
|
48 | |
49 |
|
50 | this.tokens =
|
51 | previousRetryThrottler.tokens *
|
52 | (maxTokens / previousRetryThrottler.maxTokens);
|
53 | } else {
|
54 | this.tokens = maxTokens;
|
55 | }
|
56 | }
|
57 |
|
58 | addCallSucceeded() {
|
59 | this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
|
60 | }
|
61 |
|
62 | addCallFailed() {
|
63 | this.tokens = Math.min(this.tokens - 1, 0);
|
64 | }
|
65 |
|
66 | canRetryCall() {
|
67 | return this.tokens > this.maxTokens / 2;
|
68 | }
|
69 | }
|
70 |
|
71 | export class MessageBufferTracker {
|
72 | private totalAllocated = 0;
|
73 | private allocatedPerCall: Map<number, number> = new Map<number, number>();
|
74 |
|
75 | constructor(private totalLimit: number, private limitPerCall: number) {}
|
76 |
|
77 | allocate(size: number, callId: number): boolean {
|
78 | const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
|
79 | if (
|
80 | this.limitPerCall - currentPerCall < size ||
|
81 | this.totalLimit - this.totalAllocated < size
|
82 | ) {
|
83 | return false;
|
84 | }
|
85 | this.allocatedPerCall.set(callId, currentPerCall + size);
|
86 | this.totalAllocated += size;
|
87 | return true;
|
88 | }
|
89 |
|
90 | free(size: number, callId: number) {
|
91 | if (this.totalAllocated < size) {
|
92 | throw new Error(
|
93 | `Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`
|
94 | );
|
95 | }
|
96 | this.totalAllocated -= size;
|
97 | const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
|
98 | if (currentPerCall < size) {
|
99 | throw new Error(
|
100 | `Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`
|
101 | );
|
102 | }
|
103 | this.allocatedPerCall.set(callId, currentPerCall - size);
|
104 | }
|
105 |
|
106 | freeAll(callId: number) {
|
107 | const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
|
108 | if (this.totalAllocated < currentPerCall) {
|
109 | throw new Error(
|
110 | `Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`
|
111 | );
|
112 | }
|
113 | this.totalAllocated -= currentPerCall;
|
114 | this.allocatedPerCall.delete(callId);
|
115 | }
|
116 | }
|
117 |
|
118 | type UnderlyingCallState = 'ACTIVE' | 'COMPLETED';
|
119 |
|
120 | interface UnderlyingCall {
|
121 | state: UnderlyingCallState;
|
122 | call: LoadBalancingCall;
|
123 | nextMessageToSend: number;
|
124 | }
|
125 |
|
126 |
|
127 |
|
128 |
|
129 |
|
130 |
|
131 |
|
132 |
|
133 |
|
134 |
|
135 | type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED';
|
136 |
|
137 |
|
138 |
|
139 |
|
140 |
|
141 |
|
142 |
|
143 |
|
144 |
|
145 | type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED';
|
146 |
|
147 |
|
148 |
|
149 |
|
150 | interface WriteBufferEntry {
|
151 | entryType: WriteBufferEntryType;
|
152 | |
153 |
|
154 |
|
155 |
|
156 | message?: WriteObject;
|
157 | |
158 |
|
159 |
|
160 |
|
161 |
|
162 | callback?: WriteCallback;
|
163 | |
164 |
|
165 |
|
166 |
|
167 |
|
168 | allocated: boolean;
|
169 | }
|
170 |
|
171 | const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
|
172 |
|
173 | export class RetryingCall implements Call {
|
174 | private state: RetryingCallState;
|
175 | private listener: InterceptingListener | null = null;
|
176 | private initialMetadata: Metadata | null = null;
|
177 | private underlyingCalls: UnderlyingCall[] = [];
|
178 | private writeBuffer: WriteBufferEntry[] = [];
|
179 | |
180 |
|
181 |
|
182 |
|
183 |
|
184 | private writeBufferOffset = 0;
|
185 | |
186 |
|
187 |
|
188 |
|
189 |
|
190 |
|
191 | private readStarted = false;
|
192 | private transparentRetryUsed = false;
|
193 | |
194 |
|
195 |
|
196 | private attempts = 0;
|
197 | private hedgingTimer: NodeJS.Timeout | null = null;
|
198 | private committedCallIndex: number | null = null;
|
199 | private initialRetryBackoffSec = 0;
|
200 | private nextRetryBackoffSec = 0;
|
201 | constructor(
|
202 | private readonly channel: InternalChannel,
|
203 | private readonly callConfig: CallConfig,
|
204 | private readonly methodName: string,
|
205 | private readonly host: string,
|
206 | private readonly credentials: CallCredentials,
|
207 | private readonly deadline: Deadline,
|
208 | private readonly callNumber: number,
|
209 | private readonly bufferTracker: MessageBufferTracker,
|
210 | private readonly retryThrottler?: RetryThrottler
|
211 | ) {
|
212 | if (callConfig.methodConfig.retryPolicy) {
|
213 | this.state = 'RETRY';
|
214 | const retryPolicy = callConfig.methodConfig.retryPolicy;
|
215 | this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(
|
216 | retryPolicy.initialBackoff.substring(
|
217 | 0,
|
218 | retryPolicy.initialBackoff.length - 1
|
219 | )
|
220 | );
|
221 | } else if (callConfig.methodConfig.hedgingPolicy) {
|
222 | this.state = 'HEDGING';
|
223 | } else {
|
224 | this.state = 'TRANSPARENT_ONLY';
|
225 | }
|
226 | }
|
227 | getCallNumber(): number {
|
228 | return this.callNumber;
|
229 | }
|
230 |
|
231 | private trace(text: string): void {
|
232 | logging.trace(
|
233 | LogVerbosity.DEBUG,
|
234 | TRACER_NAME,
|
235 | '[' + this.callNumber + '] ' + text
|
236 | );
|
237 | }
|
238 |
|
239 | private reportStatus(statusObject: StatusObject) {
|
240 | this.trace(
|
241 | 'ended with status: code=' +
|
242 | statusObject.code +
|
243 | ' details="' +
|
244 | statusObject.details +
|
245 | '"'
|
246 | );
|
247 | this.bufferTracker.freeAll(this.callNumber);
|
248 | this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
|
249 | this.writeBuffer = [];
|
250 | process.nextTick(() => {
|
251 |
|
252 | this.listener?.onReceiveStatus({
|
253 | code: statusObject.code,
|
254 | details: statusObject.details,
|
255 | metadata: statusObject.metadata,
|
256 | });
|
257 | });
|
258 | }
|
259 |
|
260 | cancelWithStatus(status: Status, details: string): void {
|
261 | this.trace(
|
262 | 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
|
263 | );
|
264 | this.reportStatus({ code: status, details, metadata: new Metadata() });
|
265 | for (const { call } of this.underlyingCalls) {
|
266 | call.cancelWithStatus(status, details);
|
267 | }
|
268 | }
|
269 | getPeer(): string {
|
270 | if (this.committedCallIndex !== null) {
|
271 | return this.underlyingCalls[this.committedCallIndex].call.getPeer();
|
272 | } else {
|
273 | return 'unknown';
|
274 | }
|
275 | }
|
276 |
|
277 | private getBufferEntry(messageIndex: number): WriteBufferEntry {
|
278 | return (
|
279 | this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {
|
280 | entryType: 'FREED',
|
281 | allocated: false,
|
282 | }
|
283 | );
|
284 | }
|
285 |
|
286 | private getNextBufferIndex() {
|
287 | return this.writeBufferOffset + this.writeBuffer.length;
|
288 | }
|
289 |
|
290 | private clearSentMessages() {
|
291 | if (this.state !== 'COMMITTED') {
|
292 | return;
|
293 | }
|
294 | const earliestNeededMessageIndex =
|
295 | this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
|
296 | for (
|
297 | let messageIndex = this.writeBufferOffset;
|
298 | messageIndex < earliestNeededMessageIndex;
|
299 | messageIndex++
|
300 | ) {
|
301 | const bufferEntry = this.getBufferEntry(messageIndex);
|
302 | if (bufferEntry.allocated) {
|
303 | this.bufferTracker.free(
|
304 | bufferEntry.message!.message.length,
|
305 | this.callNumber
|
306 | );
|
307 | }
|
308 | }
|
309 | this.writeBuffer = this.writeBuffer.slice(
|
310 | earliestNeededMessageIndex - this.writeBufferOffset
|
311 | );
|
312 | this.writeBufferOffset = earliestNeededMessageIndex;
|
313 | }
|
314 |
|
315 | private commitCall(index: number) {
|
316 | if (this.state === 'COMMITTED') {
|
317 | return;
|
318 | }
|
319 | if (this.underlyingCalls[index].state === 'COMPLETED') {
|
320 | return;
|
321 | }
|
322 | this.trace(
|
323 | 'Committing call [' +
|
324 | this.underlyingCalls[index].call.getCallNumber() +
|
325 | '] at index ' +
|
326 | index
|
327 | );
|
328 | this.state = 'COMMITTED';
|
329 | this.committedCallIndex = index;
|
330 | for (let i = 0; i < this.underlyingCalls.length; i++) {
|
331 | if (i === index) {
|
332 | continue;
|
333 | }
|
334 | if (this.underlyingCalls[i].state === 'COMPLETED') {
|
335 | continue;
|
336 | }
|
337 | this.underlyingCalls[i].state = 'COMPLETED';
|
338 | this.underlyingCalls[i].call.cancelWithStatus(
|
339 | Status.CANCELLED,
|
340 | 'Discarded in favor of other hedged attempt'
|
341 | );
|
342 | }
|
343 | this.clearSentMessages();
|
344 | }
|
345 |
|
346 | private commitCallWithMostMessages() {
|
347 | if (this.state === 'COMMITTED') {
|
348 | return;
|
349 | }
|
350 | let mostMessages = -1;
|
351 | let callWithMostMessages = -1;
|
352 | for (const [index, childCall] of this.underlyingCalls.entries()) {
|
353 | if (
|
354 | childCall.state === 'ACTIVE' &&
|
355 | childCall.nextMessageToSend > mostMessages
|
356 | ) {
|
357 | mostMessages = childCall.nextMessageToSend;
|
358 | callWithMostMessages = index;
|
359 | }
|
360 | }
|
361 | if (callWithMostMessages === -1) {
|
362 | |
363 |
|
364 | this.state = 'TRANSPARENT_ONLY';
|
365 | } else {
|
366 | this.commitCall(callWithMostMessages);
|
367 | }
|
368 | }
|
369 |
|
370 | private isStatusCodeInList(list: (Status | string)[], code: Status) {
|
371 | return list.some(
|
372 | value =>
|
373 | value === code ||
|
374 | value.toString().toLowerCase() === Status[code].toLowerCase()
|
375 | );
|
376 | }
|
377 |
|
378 | private getNextRetryBackoffMs() {
|
379 | const retryPolicy = this.callConfig?.methodConfig.retryPolicy;
|
380 | if (!retryPolicy) {
|
381 | return 0;
|
382 | }
|
383 | const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
|
384 | const maxBackoffSec = Number(
|
385 | retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1)
|
386 | );
|
387 | this.nextRetryBackoffSec = Math.min(
|
388 | this.nextRetryBackoffSec * retryPolicy.backoffMultiplier,
|
389 | maxBackoffSec
|
390 | );
|
391 | return nextBackoffMs;
|
392 | }
|
393 |
|
394 | private maybeRetryCall(
|
395 | pushback: number | null,
|
396 | callback: (retried: boolean) => void
|
397 | ) {
|
398 | if (this.state !== 'RETRY') {
|
399 | callback(false);
|
400 | return;
|
401 | }
|
402 | const retryPolicy = this.callConfig!.methodConfig.retryPolicy!;
|
403 | if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) {
|
404 | callback(false);
|
405 | return;
|
406 | }
|
407 | let retryDelayMs: number;
|
408 | if (pushback === null) {
|
409 | retryDelayMs = this.getNextRetryBackoffMs();
|
410 | } else if (pushback < 0) {
|
411 | this.state = 'TRANSPARENT_ONLY';
|
412 | callback(false);
|
413 | return;
|
414 | } else {
|
415 | retryDelayMs = pushback;
|
416 | this.nextRetryBackoffSec = this.initialRetryBackoffSec;
|
417 | }
|
418 | setTimeout(() => {
|
419 | if (this.state !== 'RETRY') {
|
420 | callback(false);
|
421 | return;
|
422 | }
|
423 | if (this.retryThrottler?.canRetryCall() ?? true) {
|
424 | callback(true);
|
425 | this.attempts += 1;
|
426 | this.startNewAttempt();
|
427 | }
|
428 | }, retryDelayMs);
|
429 | }
|
430 |
|
431 | private countActiveCalls(): number {
|
432 | let count = 0;
|
433 | for (const call of this.underlyingCalls) {
|
434 | if (call?.state === 'ACTIVE') {
|
435 | count += 1;
|
436 | }
|
437 | }
|
438 | return count;
|
439 | }
|
440 |
|
441 | private handleProcessedStatus(
|
442 | status: StatusObject,
|
443 | callIndex: number,
|
444 | pushback: number | null
|
445 | ) {
|
446 | switch (this.state) {
|
447 | case 'COMMITTED':
|
448 | case 'TRANSPARENT_ONLY':
|
449 | this.commitCall(callIndex);
|
450 | this.reportStatus(status);
|
451 | break;
|
452 | case 'HEDGING':
|
453 | if (
|
454 | this.isStatusCodeInList(
|
455 | this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ??
|
456 | [],
|
457 | status.code
|
458 | )
|
459 | ) {
|
460 | this.retryThrottler?.addCallFailed();
|
461 | let delayMs: number;
|
462 | if (pushback === null) {
|
463 | delayMs = 0;
|
464 | } else if (pushback < 0) {
|
465 | this.state = 'TRANSPARENT_ONLY';
|
466 | this.commitCall(callIndex);
|
467 | this.reportStatus(status);
|
468 | return;
|
469 | } else {
|
470 | delayMs = pushback;
|
471 | }
|
472 | setTimeout(() => {
|
473 | this.maybeStartHedgingAttempt();
|
474 |
|
475 | if (this.countActiveCalls() === 0) {
|
476 | this.commitCall(callIndex);
|
477 | this.reportStatus(status);
|
478 | }
|
479 | }, delayMs);
|
480 | } else {
|
481 | this.commitCall(callIndex);
|
482 | this.reportStatus(status);
|
483 | }
|
484 | break;
|
485 | case 'RETRY':
|
486 | if (
|
487 | this.isStatusCodeInList(
|
488 | this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes,
|
489 | status.code
|
490 | )
|
491 | ) {
|
492 | this.retryThrottler?.addCallFailed();
|
493 | this.maybeRetryCall(pushback, retried => {
|
494 | if (!retried) {
|
495 | this.commitCall(callIndex);
|
496 | this.reportStatus(status);
|
497 | }
|
498 | });
|
499 | } else {
|
500 | this.commitCall(callIndex);
|
501 | this.reportStatus(status);
|
502 | }
|
503 | break;
|
504 | }
|
505 | }
|
506 |
|
507 | private getPushback(metadata: Metadata): number | null {
|
508 | const mdValue = metadata.get('grpc-retry-pushback-ms');
|
509 | if (mdValue.length === 0) {
|
510 | return null;
|
511 | }
|
512 | try {
|
513 | return parseInt(mdValue[0] as string);
|
514 | } catch (e) {
|
515 | return -1;
|
516 | }
|
517 | }
|
518 |
|
519 | private handleChildStatus(
|
520 | status: StatusObjectWithProgress,
|
521 | callIndex: number
|
522 | ) {
|
523 | if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
|
524 | return;
|
525 | }
|
526 | this.trace(
|
527 | 'state=' +
|
528 | this.state +
|
529 | ' handling status with progress ' +
|
530 | status.progress +
|
531 | ' from child [' +
|
532 | this.underlyingCalls[callIndex].call.getCallNumber() +
|
533 | '] in state ' +
|
534 | this.underlyingCalls[callIndex].state
|
535 | );
|
536 | this.underlyingCalls[callIndex].state = 'COMPLETED';
|
537 | if (status.code === Status.OK) {
|
538 | this.retryThrottler?.addCallSucceeded();
|
539 | this.commitCall(callIndex);
|
540 | this.reportStatus(status);
|
541 | return;
|
542 | }
|
543 | if (this.state === 'COMMITTED') {
|
544 | this.reportStatus(status);
|
545 | return;
|
546 | }
|
547 | const pushback = this.getPushback(status.metadata);
|
548 | switch (status.progress) {
|
549 | case 'NOT_STARTED':
|
550 |
|
551 | this.startNewAttempt();
|
552 | break;
|
553 | case 'REFUSED':
|
554 |
|
555 | if (this.transparentRetryUsed) {
|
556 | this.handleProcessedStatus(status, callIndex, pushback);
|
557 | } else {
|
558 | this.transparentRetryUsed = true;
|
559 | this.startNewAttempt();
|
560 | }
|
561 | break;
|
562 | case 'DROP':
|
563 | this.commitCall(callIndex);
|
564 | this.reportStatus(status);
|
565 | break;
|
566 | case 'PROCESSED':
|
567 | this.handleProcessedStatus(status, callIndex, pushback);
|
568 | break;
|
569 | }
|
570 | }
|
571 |
|
572 | private maybeStartHedgingAttempt() {
|
573 | if (this.state !== 'HEDGING') {
|
574 | return;
|
575 | }
|
576 | if (!this.callConfig.methodConfig.hedgingPolicy) {
|
577 | return;
|
578 | }
|
579 | const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
|
580 | if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
|
581 | return;
|
582 | }
|
583 | this.attempts += 1;
|
584 | this.startNewAttempt();
|
585 | this.maybeStartHedgingTimer();
|
586 | }
|
587 |
|
588 | private maybeStartHedgingTimer() {
|
589 | if (this.hedgingTimer) {
|
590 | clearTimeout(this.hedgingTimer);
|
591 | }
|
592 | if (this.state !== 'HEDGING') {
|
593 | return;
|
594 | }
|
595 | if (!this.callConfig.methodConfig.hedgingPolicy) {
|
596 | return;
|
597 | }
|
598 | const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
|
599 | if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
|
600 | return;
|
601 | }
|
602 | const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s';
|
603 | const hedgingDelaySec = Number(
|
604 | hedgingDelayString.substring(0, hedgingDelayString.length - 1)
|
605 | );
|
606 | this.hedgingTimer = setTimeout(() => {
|
607 | this.maybeStartHedgingAttempt();
|
608 | }, hedgingDelaySec * 1000);
|
609 | this.hedgingTimer.unref?.();
|
610 | }
|
611 |
|
612 | private startNewAttempt() {
|
613 | const child = this.channel.createLoadBalancingCall(
|
614 | this.callConfig,
|
615 | this.methodName,
|
616 | this.host,
|
617 | this.credentials,
|
618 | this.deadline
|
619 | );
|
620 | this.trace(
|
621 | 'Created child call [' +
|
622 | child.getCallNumber() +
|
623 | '] for attempt ' +
|
624 | this.attempts
|
625 | );
|
626 | const index = this.underlyingCalls.length;
|
627 | this.underlyingCalls.push({
|
628 | state: 'ACTIVE',
|
629 | call: child,
|
630 | nextMessageToSend: 0,
|
631 | });
|
632 | const previousAttempts = this.attempts - 1;
|
633 | const initialMetadata = this.initialMetadata!.clone();
|
634 | if (previousAttempts > 0) {
|
635 | initialMetadata.set(
|
636 | PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
|
637 | `${previousAttempts}`
|
638 | );
|
639 | }
|
640 | let receivedMetadata = false;
|
641 | child.start(initialMetadata, {
|
642 | onReceiveMetadata: metadata => {
|
643 | this.trace(
|
644 | 'Received metadata from child [' + child.getCallNumber() + ']'
|
645 | );
|
646 | this.commitCall(index);
|
647 | receivedMetadata = true;
|
648 | if (previousAttempts > 0) {
|
649 | metadata.set(
|
650 | PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
|
651 | `${previousAttempts}`
|
652 | );
|
653 | }
|
654 | if (this.underlyingCalls[index].state === 'ACTIVE') {
|
655 | this.listener!.onReceiveMetadata(metadata);
|
656 | }
|
657 | },
|
658 | onReceiveMessage: message => {
|
659 | this.trace(
|
660 | 'Received message from child [' + child.getCallNumber() + ']'
|
661 | );
|
662 | this.commitCall(index);
|
663 | if (this.underlyingCalls[index].state === 'ACTIVE') {
|
664 | this.listener!.onReceiveMessage(message);
|
665 | }
|
666 | },
|
667 | onReceiveStatus: status => {
|
668 | this.trace(
|
669 | 'Received status from child [' + child.getCallNumber() + ']'
|
670 | );
|
671 | if (!receivedMetadata && previousAttempts > 0) {
|
672 | status.metadata.set(
|
673 | PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
|
674 | `${previousAttempts}`
|
675 | );
|
676 | }
|
677 | this.handleChildStatus(status, index);
|
678 | },
|
679 | });
|
680 | this.sendNextChildMessage(index);
|
681 | if (this.readStarted) {
|
682 | child.startRead();
|
683 | }
|
684 | }
|
685 |
|
686 | start(metadata: Metadata, listener: InterceptingListener): void {
|
687 | this.trace('start called');
|
688 | this.listener = listener;
|
689 | this.initialMetadata = metadata;
|
690 | this.attempts += 1;
|
691 | this.startNewAttempt();
|
692 | this.maybeStartHedgingTimer();
|
693 | }
|
694 |
|
695 | private handleChildWriteCompleted(childIndex: number) {
|
696 | const childCall = this.underlyingCalls[childIndex];
|
697 | const messageIndex = childCall.nextMessageToSend;
|
698 | this.getBufferEntry(messageIndex).callback?.();
|
699 | this.clearSentMessages();
|
700 | childCall.nextMessageToSend += 1;
|
701 | this.sendNextChildMessage(childIndex);
|
702 | }
|
703 |
|
704 | private sendNextChildMessage(childIndex: number) {
|
705 | const childCall = this.underlyingCalls[childIndex];
|
706 | if (childCall.state === 'COMPLETED') {
|
707 | return;
|
708 | }
|
709 | if (this.getBufferEntry(childCall.nextMessageToSend)) {
|
710 | const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
|
711 | switch (bufferEntry.entryType) {
|
712 | case 'MESSAGE':
|
713 | childCall.call.sendMessageWithContext(
|
714 | {
|
715 | callback: error => {
|
716 |
|
717 | this.handleChildWriteCompleted(childIndex);
|
718 | },
|
719 | },
|
720 | bufferEntry.message!.message
|
721 | );
|
722 | break;
|
723 | case 'HALF_CLOSE':
|
724 | childCall.nextMessageToSend += 1;
|
725 | childCall.call.halfClose();
|
726 | break;
|
727 | case 'FREED':
|
728 |
|
729 | break;
|
730 | }
|
731 | }
|
732 | }
|
733 |
|
734 | sendMessageWithContext(context: MessageContext, message: Buffer): void {
|
735 | this.trace('write() called with message of length ' + message.length);
|
736 | const writeObj: WriteObject = {
|
737 | message,
|
738 | flags: context.flags,
|
739 | };
|
740 | const messageIndex = this.getNextBufferIndex();
|
741 | const bufferEntry: WriteBufferEntry = {
|
742 | entryType: 'MESSAGE',
|
743 | message: writeObj,
|
744 | allocated: this.bufferTracker.allocate(message.length, this.callNumber),
|
745 | };
|
746 | this.writeBuffer.push(bufferEntry);
|
747 | if (bufferEntry.allocated) {
|
748 | context.callback?.();
|
749 | for (const [callIndex, call] of this.underlyingCalls.entries()) {
|
750 | if (
|
751 | call.state === 'ACTIVE' &&
|
752 | call.nextMessageToSend === messageIndex
|
753 | ) {
|
754 | call.call.sendMessageWithContext(
|
755 | {
|
756 | callback: error => {
|
757 |
|
758 | this.handleChildWriteCompleted(callIndex);
|
759 | },
|
760 | },
|
761 | message
|
762 | );
|
763 | }
|
764 | }
|
765 | } else {
|
766 | this.commitCallWithMostMessages();
|
767 |
|
768 | if (this.committedCallIndex === null) {
|
769 | return;
|
770 | }
|
771 | const call = this.underlyingCalls[this.committedCallIndex];
|
772 | bufferEntry.callback = context.callback;
|
773 | if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
|
774 | call.call.sendMessageWithContext(
|
775 | {
|
776 | callback: error => {
|
777 |
|
778 | this.handleChildWriteCompleted(this.committedCallIndex!);
|
779 | },
|
780 | },
|
781 | message
|
782 | );
|
783 | }
|
784 | }
|
785 | }
|
786 | startRead(): void {
|
787 | this.trace('startRead called');
|
788 | this.readStarted = true;
|
789 | for (const underlyingCall of this.underlyingCalls) {
|
790 | if (underlyingCall?.state === 'ACTIVE') {
|
791 | underlyingCall.call.startRead();
|
792 | }
|
793 | }
|
794 | }
|
795 | halfClose(): void {
|
796 | this.trace('halfClose called');
|
797 | const halfCloseIndex = this.getNextBufferIndex();
|
798 | this.writeBuffer.push({
|
799 | entryType: 'HALF_CLOSE',
|
800 | allocated: false,
|
801 | });
|
802 | for (const call of this.underlyingCalls) {
|
803 | if (
|
804 | call?.state === 'ACTIVE' &&
|
805 | call.nextMessageToSend === halfCloseIndex
|
806 | ) {
|
807 | call.nextMessageToSend += 1;
|
808 | call.call.halfClose();
|
809 | }
|
810 | }
|
811 | }
|
812 | setCredentials(newCredentials: CallCredentials): void {
|
813 | throw new Error('Method not implemented.');
|
814 | }
|
815 | getMethod(): string {
|
816 | return this.methodName;
|
817 | }
|
818 | getHost(): string {
|
819 | return this.host;
|
820 | }
|
821 | }
|