1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { CallCredentials } from "./call-credentials";
|
19 | import { LogVerbosity, Status } from "./constants";
|
20 | import { Deadline } from "./deadline";
|
21 | import { Metadata } from "./metadata";
|
22 | import { CallConfig } from "./resolver";
|
23 | import * as logging from './logging';
|
24 | import { Call, InterceptingListener, MessageContext, StatusObject, WriteCallback, WriteObject } from "./call-interface";
|
25 | import { LoadBalancingCall, StatusObjectWithProgress } from "./load-balancing-call";
|
26 | import { InternalChannel } from "./internal-channel";
|
27 |
|
28 | const TRACER_NAME = 'retrying_call';
|
29 |
|
30 | export class RetryThrottler {
|
31 | private tokens: number;
|
32 | constructor(private readonly maxTokens: number, private readonly tokenRatio: number, previousRetryThrottler?: RetryThrottler) {
|
33 | if (previousRetryThrottler) {
|
34 | |
35 |
|
36 | this.tokens = previousRetryThrottler.tokens * (maxTokens / previousRetryThrottler.maxTokens);
|
37 | } else {
|
38 | this.tokens = maxTokens;
|
39 | }
|
40 | }
|
41 |
|
42 | addCallSucceeded() {
|
43 | this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
|
44 | }
|
45 |
|
46 | addCallFailed() {
|
47 | this.tokens = Math.min(this.tokens - 1, 0);
|
48 | }
|
49 |
|
50 | canRetryCall() {
|
51 | return this.tokens > this.maxTokens / 2;
|
52 | }
|
53 | }
|
54 |
|
55 | export class MessageBufferTracker {
|
56 | private totalAllocated: number = 0;
|
57 | private allocatedPerCall: Map<number, number> = new Map<number, number>();
|
58 |
|
59 | constructor(private totalLimit: number, private limitPerCall: number) {}
|
60 |
|
61 | allocate(size: number, callId: number): boolean {
|
62 | const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
|
63 | if (this.limitPerCall - currentPerCall < size || this.totalLimit - this.totalAllocated < size) {
|
64 | return false;
|
65 | }
|
66 | this.allocatedPerCall.set(callId, currentPerCall + size);
|
67 | this.totalAllocated += size;
|
68 | return true;
|
69 | }
|
70 |
|
71 | free(size: number, callId: number) {
|
72 | if (this.totalAllocated < size) {
|
73 | throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`);
|
74 | }
|
75 | this.totalAllocated -= size;
|
76 | const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
|
77 | if (currentPerCall < size) {
|
78 | throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`);
|
79 | }
|
80 | this.allocatedPerCall.set(callId, currentPerCall - size);
|
81 | }
|
82 |
|
83 | freeAll(callId: number) {
|
84 | const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
|
85 | if (this.totalAllocated < currentPerCall) {
|
86 | throw new Error(`Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`);
|
87 | }
|
88 | this.totalAllocated -= currentPerCall;
|
89 | this.allocatedPerCall.delete(callId);
|
90 | }
|
91 | }
|
92 |
|
93 | type UnderlyingCallState = 'ACTIVE' | 'COMPLETED';
|
94 |
|
95 | interface UnderlyingCall {
|
96 | state: UnderlyingCallState;
|
97 | call: LoadBalancingCall;
|
98 | nextMessageToSend: number;
|
99 | }
|
100 |
|
101 |
|
102 |
|
103 |
|
104 |
|
105 |
|
106 |
|
107 |
|
108 |
|
109 |
|
110 | type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED';
|
111 |
|
112 |
|
113 |
|
114 |
|
115 |
|
116 |
|
117 |
|
118 |
|
119 |
|
120 | type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED';
|
121 |
|
122 |
|
123 |
|
124 |
|
125 | interface WriteBufferEntry {
|
126 | entryType: WriteBufferEntryType;
|
127 | |
128 |
|
129 |
|
130 |
|
131 | message?: WriteObject;
|
132 | |
133 |
|
134 |
|
135 |
|
136 |
|
137 | callback?: WriteCallback;
|
138 | |
139 |
|
140 |
|
141 |
|
142 |
|
143 | allocated: boolean;
|
144 | }
|
145 |
|
146 | const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
|
147 |
|
148 | export class RetryingCall implements Call {
|
149 | private state: RetryingCallState;
|
150 | private listener: InterceptingListener | null = null;
|
151 | private initialMetadata: Metadata | null = null;
|
152 | private underlyingCalls: UnderlyingCall[] = [];
|
153 | private writeBuffer: WriteBufferEntry[] = [];
|
154 | |
155 |
|
156 |
|
157 |
|
158 |
|
159 | private writeBufferOffset = 0;
|
160 | |
161 |
|
162 |
|
163 |
|
164 |
|
165 |
|
166 | private readStarted = false;
|
167 | private transparentRetryUsed: boolean = false;
|
168 | |
169 |
|
170 |
|
171 | private attempts: number = 0;
|
172 | private hedgingTimer: NodeJS.Timer | null = null;
|
173 | private committedCallIndex: number | null = null;
|
174 | private initialRetryBackoffSec = 0;
|
175 | private nextRetryBackoffSec = 0;
|
176 | constructor(
|
177 | private readonly channel: InternalChannel,
|
178 | private readonly callConfig: CallConfig,
|
179 | private readonly methodName: string,
|
180 | private readonly host: string,
|
181 | private readonly credentials: CallCredentials,
|
182 | private readonly deadline: Deadline,
|
183 | private readonly callNumber: number,
|
184 | private readonly bufferTracker: MessageBufferTracker,
|
185 | private readonly retryThrottler?: RetryThrottler
|
186 | ) {
|
187 | if (callConfig.methodConfig.retryPolicy) {
|
188 | this.state = 'RETRY';
|
189 | const retryPolicy = callConfig.methodConfig.retryPolicy;
|
190 | this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(retryPolicy.initialBackoff.substring(0, retryPolicy.initialBackoff.length - 1));
|
191 | } else if (callConfig.methodConfig.hedgingPolicy) {
|
192 | this.state = 'HEDGING';
|
193 | } else {
|
194 | this.state = 'TRANSPARENT_ONLY';
|
195 | }
|
196 | }
|
197 | getCallNumber(): number {
|
198 | return this.callNumber;
|
199 | }
|
200 |
|
201 | private trace(text: string): void {
|
202 | logging.trace(
|
203 | LogVerbosity.DEBUG,
|
204 | TRACER_NAME,
|
205 | '[' + this.callNumber + '] ' + text
|
206 | );
|
207 | }
|
208 |
|
209 | private reportStatus(statusObject: StatusObject) {
|
210 | this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"');
|
211 | this.bufferTracker.freeAll(this.callNumber);
|
212 | this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
|
213 | this.writeBuffer = [];
|
214 | process.nextTick(() => {
|
215 |
|
216 | this.listener?.onReceiveStatus({
|
217 | code: statusObject.code,
|
218 | details: statusObject.details,
|
219 | metadata: statusObject.metadata
|
220 | });
|
221 | });
|
222 | }
|
223 |
|
224 | cancelWithStatus(status: Status, details: string): void {
|
225 | this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
|
226 | this.reportStatus({code: status, details, metadata: new Metadata()});
|
227 | for (const {call} of this.underlyingCalls) {
|
228 | call.cancelWithStatus(status, details);
|
229 | }
|
230 | }
|
231 | getPeer(): string {
|
232 | if (this.committedCallIndex !== null) {
|
233 | return this.underlyingCalls[this.committedCallIndex].call.getPeer();
|
234 | } else {
|
235 | return 'unknown';
|
236 | }
|
237 | }
|
238 |
|
239 | private getBufferEntry(messageIndex: number): WriteBufferEntry {
|
240 | return this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {entryType: 'FREED', allocated: false};
|
241 | }
|
242 |
|
243 | private getNextBufferIndex() {
|
244 | return this.writeBufferOffset + this.writeBuffer.length;
|
245 | }
|
246 |
|
247 | private clearSentMessages() {
|
248 | if (this.state !== 'COMMITTED') {
|
249 | return;
|
250 | }
|
251 | const earliestNeededMessageIndex = this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
|
252 | for (let messageIndex = this.writeBufferOffset; messageIndex < earliestNeededMessageIndex; messageIndex++) {
|
253 | const bufferEntry = this.getBufferEntry(messageIndex);
|
254 | if (bufferEntry.allocated) {
|
255 | this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber);
|
256 | }
|
257 | }
|
258 | this.writeBuffer = this.writeBuffer.slice(earliestNeededMessageIndex - this.writeBufferOffset);
|
259 | this.writeBufferOffset = earliestNeededMessageIndex;
|
260 | }
|
261 |
|
262 | private commitCall(index: number) {
|
263 | if (this.state === 'COMMITTED') {
|
264 | return;
|
265 | }
|
266 | if (this.underlyingCalls[index].state === 'COMPLETED') {
|
267 | return;
|
268 | }
|
269 | this.trace('Committing call [' + this.underlyingCalls[index].call.getCallNumber() + '] at index ' + index);
|
270 | this.state = 'COMMITTED';
|
271 | this.committedCallIndex = index;
|
272 | for (let i = 0; i < this.underlyingCalls.length; i++) {
|
273 | if (i === index) {
|
274 | continue;
|
275 | }
|
276 | if (this.underlyingCalls[i].state === 'COMPLETED') {
|
277 | continue;
|
278 | }
|
279 | this.underlyingCalls[i].state = 'COMPLETED';
|
280 | this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt');
|
281 | }
|
282 | this.clearSentMessages();
|
283 | }
|
284 |
|
285 | private commitCallWithMostMessages() {
|
286 | if (this.state === 'COMMITTED') {
|
287 | return;
|
288 | }
|
289 | let mostMessages = -1;
|
290 | let callWithMostMessages = -1;
|
291 | for (const [index, childCall] of this.underlyingCalls.entries()) {
|
292 | if (childCall.state === 'ACTIVE' && childCall.nextMessageToSend > mostMessages) {
|
293 | mostMessages = childCall.nextMessageToSend;
|
294 | callWithMostMessages = index;
|
295 | }
|
296 | }
|
297 | if (callWithMostMessages === -1) {
|
298 | |
299 |
|
300 | this.state = 'TRANSPARENT_ONLY';
|
301 | } else {
|
302 | this.commitCall(callWithMostMessages);
|
303 | }
|
304 | }
|
305 |
|
306 | private isStatusCodeInList(list: (Status | string)[], code: Status) {
|
307 | return list.some((value => value === code || value.toString().toLowerCase() === Status[code].toLowerCase()));
|
308 | }
|
309 |
|
310 | private getNextRetryBackoffMs() {
|
311 | const retryPolicy = this.callConfig?.methodConfig.retryPolicy;
|
312 | if (!retryPolicy) {
|
313 | return 0;
|
314 | }
|
315 | const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
|
316 | const maxBackoffSec = Number(retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1));
|
317 | this.nextRetryBackoffSec = Math.min(this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, maxBackoffSec);
|
318 | return nextBackoffMs
|
319 | }
|
320 |
|
321 | private maybeRetryCall(pushback: number | null, callback: (retried: boolean) => void) {
|
322 | if (this.state !== 'RETRY') {
|
323 | callback(false);
|
324 | return;
|
325 | }
|
326 | const retryPolicy = this.callConfig!.methodConfig.retryPolicy!;
|
327 | if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) {
|
328 | callback(false);
|
329 | return;
|
330 | }
|
331 | let retryDelayMs: number;
|
332 | if (pushback === null) {
|
333 | retryDelayMs = this.getNextRetryBackoffMs();
|
334 | } else if (pushback < 0) {
|
335 | this.state = 'TRANSPARENT_ONLY';
|
336 | callback(false);
|
337 | return;
|
338 | } else {
|
339 | retryDelayMs = pushback;
|
340 | this.nextRetryBackoffSec = this.initialRetryBackoffSec;
|
341 | }
|
342 | setTimeout(() => {
|
343 | if (this.state !== 'RETRY') {
|
344 | callback(false);
|
345 | return;
|
346 | }
|
347 | if (this.retryThrottler?.canRetryCall() ?? true) {
|
348 | callback(true);
|
349 | this.attempts += 1;
|
350 | this.startNewAttempt();
|
351 | }
|
352 | }, retryDelayMs);
|
353 | }
|
354 |
|
355 | private countActiveCalls(): number {
|
356 | let count = 0;
|
357 | for (const call of this.underlyingCalls) {
|
358 | if (call?.state === 'ACTIVE') {
|
359 | count += 1;
|
360 | }
|
361 | }
|
362 | return count;
|
363 | }
|
364 |
|
365 | private handleProcessedStatus(status: StatusObject, callIndex: number, pushback: number | null) {
|
366 | switch (this.state) {
|
367 | case 'COMMITTED':
|
368 | case 'TRANSPARENT_ONLY':
|
369 | this.commitCall(callIndex);
|
370 | this.reportStatus(status);
|
371 | break;
|
372 | case 'HEDGING':
|
373 | if (this.isStatusCodeInList(this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ?? [], status.code)) {
|
374 | this.retryThrottler?.addCallFailed();
|
375 | let delayMs: number;
|
376 | if (pushback === null) {
|
377 | delayMs = 0;
|
378 | } else if (pushback < 0) {
|
379 | this.state = 'TRANSPARENT_ONLY';
|
380 | this.commitCall(callIndex);
|
381 | this.reportStatus(status);
|
382 | return;
|
383 | } else {
|
384 | delayMs = pushback;
|
385 | }
|
386 | setTimeout(() => {
|
387 | this.maybeStartHedgingAttempt();
|
388 |
|
389 | if (this.countActiveCalls() === 0) {
|
390 | this.commitCall(callIndex);
|
391 | this.reportStatus(status);
|
392 | }
|
393 | }, delayMs);
|
394 | } else {
|
395 | this.commitCall(callIndex);
|
396 | this.reportStatus(status);
|
397 | }
|
398 | break;
|
399 | case 'RETRY':
|
400 | if (this.isStatusCodeInList(this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes, status.code)) {
|
401 | this.retryThrottler?.addCallFailed();
|
402 | this.maybeRetryCall(pushback, (retried) => {
|
403 | if (!retried) {
|
404 | this.commitCall(callIndex);
|
405 | this.reportStatus(status);
|
406 | }
|
407 | });
|
408 | } else {
|
409 | this.commitCall(callIndex);
|
410 | this.reportStatus(status);
|
411 | }
|
412 | break;
|
413 | }
|
414 | }
|
415 |
|
416 | private getPushback(metadata: Metadata): number | null {
|
417 | const mdValue = metadata.get('grpc-retry-pushback-ms');
|
418 | if (mdValue.length === 0) {
|
419 | return null;
|
420 | }
|
421 | try {
|
422 | return parseInt(mdValue[0] as string);
|
423 | } catch (e) {
|
424 | return -1;
|
425 | }
|
426 | }
|
427 |
|
428 | private handleChildStatus(status: StatusObjectWithProgress, callIndex: number) {
|
429 | if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
|
430 | return;
|
431 | }
|
432 | this.trace('state=' + this.state + ' handling status with progress ' + status.progress + ' from child [' + this.underlyingCalls[callIndex].call.getCallNumber() + '] in state ' + this.underlyingCalls[callIndex].state);
|
433 | this.underlyingCalls[callIndex].state = 'COMPLETED';
|
434 | if (status.code === Status.OK) {
|
435 | this.retryThrottler?.addCallSucceeded();
|
436 | this.commitCall(callIndex);
|
437 | this.reportStatus(status);
|
438 | return;
|
439 | }
|
440 | if (this.state === 'COMMITTED') {
|
441 | this.reportStatus(status);
|
442 | return;
|
443 | }
|
444 | const pushback = this.getPushback(status.metadata);
|
445 | switch (status.progress) {
|
446 | case 'NOT_STARTED':
|
447 |
|
448 | this.startNewAttempt();
|
449 | break;
|
450 | case 'REFUSED':
|
451 |
|
452 | if (this.transparentRetryUsed) {
|
453 | this.handleProcessedStatus(status, callIndex, pushback);
|
454 | } else {
|
455 | this.transparentRetryUsed = true;
|
456 | this.startNewAttempt();
|
457 | };
|
458 | break;
|
459 | case 'DROP':
|
460 | this.commitCall(callIndex);
|
461 | this.reportStatus(status);
|
462 | break;
|
463 | case 'PROCESSED':
|
464 | this.handleProcessedStatus(status, callIndex, pushback);
|
465 | break;
|
466 | }
|
467 | }
|
468 |
|
469 | private maybeStartHedgingAttempt() {
|
470 | if (this.state !== 'HEDGING') {
|
471 | return;
|
472 | }
|
473 | if (!this.callConfig.methodConfig.hedgingPolicy) {
|
474 | return;
|
475 | }
|
476 | const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
|
477 | if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
|
478 | return;
|
479 | }
|
480 | this.attempts += 1;
|
481 | this.startNewAttempt();
|
482 | this.maybeStartHedgingTimer();
|
483 | }
|
484 |
|
485 | private maybeStartHedgingTimer() {
|
486 | if (this.hedgingTimer) {
|
487 | clearTimeout(this.hedgingTimer);
|
488 | }
|
489 | if (this.state !== 'HEDGING') {
|
490 | return;
|
491 | }
|
492 | if (!this.callConfig.methodConfig.hedgingPolicy) {
|
493 | return;
|
494 | }
|
495 | const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
|
496 | if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
|
497 | return;
|
498 | }
|
499 | const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s';
|
500 | const hedgingDelaySec = Number(hedgingDelayString.substring(0, hedgingDelayString.length - 1));
|
501 | this.hedgingTimer = setTimeout(() => {
|
502 | this.maybeStartHedgingAttempt();
|
503 | }, hedgingDelaySec * 1000);
|
504 | this.hedgingTimer.unref?.();
|
505 | }
|
506 |
|
507 | private startNewAttempt() {
|
508 | const child = this.channel.createLoadBalancingCall(this.callConfig, this.methodName, this.host, this.credentials, this.deadline);
|
509 | this.trace('Created child call [' + child.getCallNumber() + '] for attempt ' + this.attempts);
|
510 | const index = this.underlyingCalls.length;
|
511 | this.underlyingCalls.push({state: 'ACTIVE', call: child, nextMessageToSend: 0});
|
512 | const previousAttempts = this.attempts - 1;
|
513 | const initialMetadata = this.initialMetadata!.clone();
|
514 | if (previousAttempts > 0) {
|
515 | initialMetadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
|
516 | }
|
517 | let receivedMetadata = false;
|
518 | child.start(initialMetadata, {
|
519 | onReceiveMetadata: metadata => {
|
520 | this.trace('Received metadata from child [' + child.getCallNumber() + ']');
|
521 | this.commitCall(index);
|
522 | receivedMetadata = true;
|
523 | if (previousAttempts > 0) {
|
524 | metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
|
525 | }
|
526 | if (this.underlyingCalls[index].state === 'ACTIVE') {
|
527 | this.listener!.onReceiveMetadata(metadata);
|
528 | }
|
529 | },
|
530 | onReceiveMessage: message => {
|
531 | this.trace('Received message from child [' + child.getCallNumber() + ']');
|
532 | this.commitCall(index);
|
533 | if (this.underlyingCalls[index].state === 'ACTIVE') {
|
534 | this.listener!.onReceiveMessage(message);
|
535 | }
|
536 | },
|
537 | onReceiveStatus: status => {
|
538 | this.trace('Received status from child [' + child.getCallNumber() + ']');
|
539 | if (!receivedMetadata && previousAttempts > 0) {
|
540 | status.metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
|
541 | }
|
542 | this.handleChildStatus(status, index);
|
543 | }
|
544 | });
|
545 | this.sendNextChildMessage(index);
|
546 | if (this.readStarted) {
|
547 | child.startRead();
|
548 | }
|
549 | }
|
550 |
|
551 | start(metadata: Metadata, listener: InterceptingListener): void {
|
552 | this.trace('start called');
|
553 | this.listener = listener;
|
554 | this.initialMetadata = metadata;
|
555 | this.attempts += 1;
|
556 | this.startNewAttempt();
|
557 | this.maybeStartHedgingTimer();
|
558 | }
|
559 |
|
560 | private handleChildWriteCompleted(childIndex: number) {
|
561 | const childCall = this.underlyingCalls[childIndex];
|
562 | const messageIndex = childCall.nextMessageToSend;
|
563 | this.getBufferEntry(messageIndex).callback?.();
|
564 | this.clearSentMessages();
|
565 | childCall.nextMessageToSend += 1;
|
566 | this.sendNextChildMessage(childIndex);
|
567 | }
|
568 |
|
569 | private sendNextChildMessage(childIndex: number) {
|
570 | const childCall = this.underlyingCalls[childIndex];
|
571 | if (childCall.state === 'COMPLETED') {
|
572 | return;
|
573 | }
|
574 | if (this.getBufferEntry(childCall.nextMessageToSend)) {
|
575 | const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
|
576 | switch (bufferEntry.entryType) {
|
577 | case 'MESSAGE':
|
578 | childCall.call.sendMessageWithContext({
|
579 | callback: (error) => {
|
580 |
|
581 | this.handleChildWriteCompleted(childIndex);
|
582 | }
|
583 | }, bufferEntry.message!.message);
|
584 | break;
|
585 | case 'HALF_CLOSE':
|
586 | childCall.nextMessageToSend += 1;
|
587 | childCall.call.halfClose();
|
588 | break;
|
589 | case 'FREED':
|
590 |
|
591 | break;
|
592 | }
|
593 | }
|
594 | }
|
595 |
|
596 | sendMessageWithContext(context: MessageContext, message: Buffer): void {
|
597 | this.trace('write() called with message of length ' + message.length);
|
598 | const writeObj: WriteObject = {
|
599 | message,
|
600 | flags: context.flags,
|
601 | };
|
602 | const messageIndex = this.getNextBufferIndex();
|
603 | const bufferEntry: WriteBufferEntry = {
|
604 | entryType: 'MESSAGE',
|
605 | message: writeObj,
|
606 | allocated: this.bufferTracker.allocate(message.length, this.callNumber)
|
607 | };
|
608 | this.writeBuffer.push(bufferEntry);
|
609 | if (bufferEntry.allocated) {
|
610 | context.callback?.();
|
611 | for (const [callIndex, call] of this.underlyingCalls.entries()) {
|
612 | if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
|
613 | call.call.sendMessageWithContext({
|
614 | callback: (error) => {
|
615 |
|
616 | this.handleChildWriteCompleted(callIndex);
|
617 | }
|
618 | }, message);
|
619 | }
|
620 | }
|
621 | } else {
|
622 | this.commitCallWithMostMessages();
|
623 |
|
624 | if (this.committedCallIndex === null) {
|
625 | return;
|
626 | }
|
627 | const call = this.underlyingCalls[this.committedCallIndex];
|
628 | bufferEntry.callback = context.callback;
|
629 | if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
|
630 | call.call.sendMessageWithContext({
|
631 | callback: (error) => {
|
632 |
|
633 | this.handleChildWriteCompleted(this.committedCallIndex!);
|
634 | }
|
635 | }, message);
|
636 | }
|
637 | }
|
638 | }
|
639 | startRead(): void {
|
640 | this.trace('startRead called');
|
641 | this.readStarted = true;
|
642 | for (const underlyingCall of this.underlyingCalls) {
|
643 | if (underlyingCall?.state === 'ACTIVE') {
|
644 | underlyingCall.call.startRead();
|
645 | }
|
646 | }
|
647 | }
|
648 | halfClose(): void {
|
649 | this.trace('halfClose called');
|
650 | const halfCloseIndex = this.getNextBufferIndex();
|
651 | this.writeBuffer.push({
|
652 | entryType: 'HALF_CLOSE',
|
653 | allocated: false
|
654 | });
|
655 | for (const call of this.underlyingCalls) {
|
656 | if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) {
|
657 | call.nextMessageToSend += 1;
|
658 | call.call.halfClose();
|
659 | }
|
660 | }
|
661 | }
|
662 | setCredentials(newCredentials: CallCredentials): void {
|
663 | throw new Error("Method not implemented.");
|
664 | }
|
665 | getMethod(): string {
|
666 | return this.methodName;
|
667 | }
|
668 | getHost(): string {
|
669 | return this.host;
|
670 | }
|
671 | } |
\ | No newline at end of file |