UNPKG

23.1 kBPlain TextView Raw
1/*
2 * Copyright 2022 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { CallCredentials } from "./call-credentials";
19import { LogVerbosity, Status } from "./constants";
20import { Deadline } from "./deadline";
21import { Metadata } from "./metadata";
22import { CallConfig } from "./resolver";
23import * as logging from './logging';
24import { Call, InterceptingListener, MessageContext, StatusObject, WriteCallback, WriteObject } from "./call-interface";
25import { LoadBalancingCall, StatusObjectWithProgress } from "./load-balancing-call";
26import { InternalChannel } from "./internal-channel";
27
28const TRACER_NAME = 'retrying_call';
29
30export class RetryThrottler {
31 private tokens: number;
32 constructor(private readonly maxTokens: number, private readonly tokenRatio: number, previousRetryThrottler?: RetryThrottler) {
33 if (previousRetryThrottler) {
34 /* When carrying over tokens from a previous config, rescale them to the
35 * new max value */
36 this.tokens = previousRetryThrottler.tokens * (maxTokens / previousRetryThrottler.maxTokens);
37 } else {
38 this.tokens = maxTokens;
39 }
40 }
41
42 addCallSucceeded() {
43 this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
44 }
45
46 addCallFailed() {
47 this.tokens = Math.min(this.tokens - 1, 0);
48 }
49
50 canRetryCall() {
51 return this.tokens > this.maxTokens / 2;
52 }
53}
54
55export class MessageBufferTracker {
56 private totalAllocated: number = 0;
57 private allocatedPerCall: Map<number, number> = new Map<number, number>();
58
59 constructor(private totalLimit: number, private limitPerCall: number) {}
60
61 allocate(size: number, callId: number): boolean {
62 const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
63 if (this.limitPerCall - currentPerCall < size || this.totalLimit - this.totalAllocated < size) {
64 return false;
65 }
66 this.allocatedPerCall.set(callId, currentPerCall + size);
67 this.totalAllocated += size;
68 return true;
69 }
70
71 free(size: number, callId: number) {
72 if (this.totalAllocated < size) {
73 throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`);
74 }
75 this.totalAllocated -= size;
76 const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
77 if (currentPerCall < size) {
78 throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`);
79 }
80 this.allocatedPerCall.set(callId, currentPerCall - size);
81 }
82
83 freeAll(callId: number) {
84 const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
85 if (this.totalAllocated < currentPerCall) {
86 throw new Error(`Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`);
87 }
88 this.totalAllocated -= currentPerCall;
89 this.allocatedPerCall.delete(callId);
90 }
91}
92
93type UnderlyingCallState = 'ACTIVE' | 'COMPLETED';
94
95interface UnderlyingCall {
96 state: UnderlyingCallState;
97 call: LoadBalancingCall;
98 nextMessageToSend: number;
99}
100
101/**
102 * A retrying call can be in one of these states:
103 * RETRY: Retries are configured and new attempts may be sent
104 * HEDGING: Hedging is configured and new attempts may be sent
105 * TRANSPARENT_ONLY: Neither retries nor hedging are configured, and
106 * transparent retry attempts may still be sent
107 * COMMITTED: One attempt is committed, and no new attempts will be
108 * sent
109 */
110type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED';
111
112/**
113 * The different types of objects that can be stored in the write buffer, with
114 * the following meanings:
115 * MESSAGE: This is a message to be sent.
116 * HALF_CLOSE: When this entry is reached, the calls should send a half-close.
117 * FREED: This slot previously contained a message that has been sent on all
118 * child calls and is no longer needed.
119 */
120type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED';
121
122/**
123 * Entry in the buffer of messages to send to the remote end.
124 */
125interface WriteBufferEntry {
126 entryType: WriteBufferEntryType;
127 /**
128 * Message to send.
129 * Only populated if entryType is MESSAGE.
130 */
131 message?: WriteObject;
132 /**
133 * Callback to call after sending the message.
134 * Only populated if entryType is MESSAGE and the call is in the COMMITTED
135 * state.
136 */
137 callback?: WriteCallback;
138 /**
139 * Indicates whether the message is allocated in the buffer tracker. Ignored
140 * if entryType is not MESSAGE. Should be the return value of
141 * bufferTracker.allocate.
142 */
143 allocated: boolean;
144}
145
146const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
147
148export class RetryingCall implements Call {
149 private state: RetryingCallState;
150 private listener: InterceptingListener | null = null;
151 private initialMetadata: Metadata | null = null;
152 private underlyingCalls: UnderlyingCall[] = [];
153 private writeBuffer: WriteBufferEntry[] = [];
154 /**
155 * The offset of message indices in the writeBuffer. For example, if
156 * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
157 * is in writeBuffer[5].
158 */
159 private writeBufferOffset = 0;
160 /**
161 * Tracks whether a read has been started, so that we know whether to start
162 * reads on new child calls. This only matters for the first read, because
163 * once a message comes in the child call becomes committed and there will
164 * be no new child calls.
165 */
166 private readStarted = false;
167 private transparentRetryUsed: boolean = false;
168 /**
169 * Number of attempts so far
170 */
171 private attempts: number = 0;
172 private hedgingTimer: NodeJS.Timer | null = null;
173 private committedCallIndex: number | null = null;
174 private initialRetryBackoffSec = 0;
175 private nextRetryBackoffSec = 0;
176 constructor(
177 private readonly channel: InternalChannel,
178 private readonly callConfig: CallConfig,
179 private readonly methodName: string,
180 private readonly host: string,
181 private readonly credentials: CallCredentials,
182 private readonly deadline: Deadline,
183 private readonly callNumber: number,
184 private readonly bufferTracker: MessageBufferTracker,
185 private readonly retryThrottler?: RetryThrottler
186 ) {
187 if (callConfig.methodConfig.retryPolicy) {
188 this.state = 'RETRY';
189 const retryPolicy = callConfig.methodConfig.retryPolicy;
190 this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(retryPolicy.initialBackoff.substring(0, retryPolicy.initialBackoff.length - 1));
191 } else if (callConfig.methodConfig.hedgingPolicy) {
192 this.state = 'HEDGING';
193 } else {
194 this.state = 'TRANSPARENT_ONLY';
195 }
196 }
197 getCallNumber(): number {
198 return this.callNumber;
199 }
200
201 private trace(text: string): void {
202 logging.trace(
203 LogVerbosity.DEBUG,
204 TRACER_NAME,
205 '[' + this.callNumber + '] ' + text
206 );
207 }
208
209 private reportStatus(statusObject: StatusObject) {
210 this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"');
211 this.bufferTracker.freeAll(this.callNumber);
212 this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
213 this.writeBuffer = [];
214 process.nextTick(() => {
215 // Explicitly construct status object to remove progress field
216 this.listener?.onReceiveStatus({
217 code: statusObject.code,
218 details: statusObject.details,
219 metadata: statusObject.metadata
220 });
221 });
222 }
223
224 cancelWithStatus(status: Status, details: string): void {
225 this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
226 this.reportStatus({code: status, details, metadata: new Metadata()});
227 for (const {call} of this.underlyingCalls) {
228 call.cancelWithStatus(status, details);
229 }
230 }
231 getPeer(): string {
232 if (this.committedCallIndex !== null) {
233 return this.underlyingCalls[this.committedCallIndex].call.getPeer();
234 } else {
235 return 'unknown';
236 }
237 }
238
239 private getBufferEntry(messageIndex: number): WriteBufferEntry {
240 return this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {entryType: 'FREED', allocated: false};
241 }
242
243 private getNextBufferIndex() {
244 return this.writeBufferOffset + this.writeBuffer.length;
245 }
246
247 private clearSentMessages() {
248 if (this.state !== 'COMMITTED') {
249 return;
250 }
251 const earliestNeededMessageIndex = this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
252 for (let messageIndex = this.writeBufferOffset; messageIndex < earliestNeededMessageIndex; messageIndex++) {
253 const bufferEntry = this.getBufferEntry(messageIndex);
254 if (bufferEntry.allocated) {
255 this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber);
256 }
257 }
258 this.writeBuffer = this.writeBuffer.slice(earliestNeededMessageIndex - this.writeBufferOffset);
259 this.writeBufferOffset = earliestNeededMessageIndex;
260 }
261
262 private commitCall(index: number) {
263 if (this.state === 'COMMITTED') {
264 return;
265 }
266 if (this.underlyingCalls[index].state === 'COMPLETED') {
267 return;
268 }
269 this.trace('Committing call [' + this.underlyingCalls[index].call.getCallNumber() + '] at index ' + index);
270 this.state = 'COMMITTED';
271 this.committedCallIndex = index;
272 for (let i = 0; i < this.underlyingCalls.length; i++) {
273 if (i === index) {
274 continue;
275 }
276 if (this.underlyingCalls[i].state === 'COMPLETED') {
277 continue;
278 }
279 this.underlyingCalls[i].state = 'COMPLETED';
280 this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt');
281 }
282 this.clearSentMessages();
283 }
284
285 private commitCallWithMostMessages() {
286 if (this.state === 'COMMITTED') {
287 return;
288 }
289 let mostMessages = -1;
290 let callWithMostMessages = -1;
291 for (const [index, childCall] of this.underlyingCalls.entries()) {
292 if (childCall.state === 'ACTIVE' && childCall.nextMessageToSend > mostMessages) {
293 mostMessages = childCall.nextMessageToSend;
294 callWithMostMessages = index;
295 }
296 }
297 if (callWithMostMessages === -1) {
298 /* There are no active calls, disable retries to force the next call that
299 * is started to be committed. */
300 this.state = 'TRANSPARENT_ONLY';
301 } else {
302 this.commitCall(callWithMostMessages);
303 }
304 }
305
306 private isStatusCodeInList(list: (Status | string)[], code: Status) {
307 return list.some((value => value === code || value.toString().toLowerCase() === Status[code].toLowerCase()));
308 }
309
310 private getNextRetryBackoffMs() {
311 const retryPolicy = this.callConfig?.methodConfig.retryPolicy;
312 if (!retryPolicy) {
313 return 0;
314 }
315 const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
316 const maxBackoffSec = Number(retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1));
317 this.nextRetryBackoffSec = Math.min(this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, maxBackoffSec);
318 return nextBackoffMs
319 }
320
321 private maybeRetryCall(pushback: number | null, callback: (retried: boolean) => void) {
322 if (this.state !== 'RETRY') {
323 callback(false);
324 return;
325 }
326 const retryPolicy = this.callConfig!.methodConfig.retryPolicy!;
327 if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) {
328 callback(false);
329 return;
330 }
331 let retryDelayMs: number;
332 if (pushback === null) {
333 retryDelayMs = this.getNextRetryBackoffMs();
334 } else if (pushback < 0) {
335 this.state = 'TRANSPARENT_ONLY';
336 callback(false);
337 return;
338 } else {
339 retryDelayMs = pushback;
340 this.nextRetryBackoffSec = this.initialRetryBackoffSec;
341 }
342 setTimeout(() => {
343 if (this.state !== 'RETRY') {
344 callback(false);
345 return;
346 }
347 if (this.retryThrottler?.canRetryCall() ?? true) {
348 callback(true);
349 this.attempts += 1;
350 this.startNewAttempt();
351 }
352 }, retryDelayMs);
353 }
354
355 private countActiveCalls(): number {
356 let count = 0;
357 for (const call of this.underlyingCalls) {
358 if (call?.state === 'ACTIVE') {
359 count += 1;
360 }
361 }
362 return count;
363 }
364
365 private handleProcessedStatus(status: StatusObject, callIndex: number, pushback: number | null) {
366 switch (this.state) {
367 case 'COMMITTED':
368 case 'TRANSPARENT_ONLY':
369 this.commitCall(callIndex);
370 this.reportStatus(status);
371 break;
372 case 'HEDGING':
373 if (this.isStatusCodeInList(this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ?? [], status.code)) {
374 this.retryThrottler?.addCallFailed();
375 let delayMs: number;
376 if (pushback === null) {
377 delayMs = 0;
378 } else if (pushback < 0) {
379 this.state = 'TRANSPARENT_ONLY';
380 this.commitCall(callIndex);
381 this.reportStatus(status);
382 return;
383 } else {
384 delayMs = pushback;
385 }
386 setTimeout(() => {
387 this.maybeStartHedgingAttempt();
388 // If after trying to start a call there are no active calls, this was the last one
389 if (this.countActiveCalls() === 0) {
390 this.commitCall(callIndex);
391 this.reportStatus(status);
392 }
393 }, delayMs);
394 } else {
395 this.commitCall(callIndex);
396 this.reportStatus(status);
397 }
398 break;
399 case 'RETRY':
400 if (this.isStatusCodeInList(this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes, status.code)) {
401 this.retryThrottler?.addCallFailed();
402 this.maybeRetryCall(pushback, (retried) => {
403 if (!retried) {
404 this.commitCall(callIndex);
405 this.reportStatus(status);
406 }
407 });
408 } else {
409 this.commitCall(callIndex);
410 this.reportStatus(status);
411 }
412 break;
413 }
414 }
415
416 private getPushback(metadata: Metadata): number | null {
417 const mdValue = metadata.get('grpc-retry-pushback-ms');
418 if (mdValue.length === 0) {
419 return null;
420 }
421 try {
422 return parseInt(mdValue[0] as string);
423 } catch (e) {
424 return -1;
425 }
426 }
427
428 private handleChildStatus(status: StatusObjectWithProgress, callIndex: number) {
429 if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
430 return;
431 }
432 this.trace('state=' + this.state + ' handling status with progress ' + status.progress + ' from child [' + this.underlyingCalls[callIndex].call.getCallNumber() + '] in state ' + this.underlyingCalls[callIndex].state);
433 this.underlyingCalls[callIndex].state = 'COMPLETED';
434 if (status.code === Status.OK) {
435 this.retryThrottler?.addCallSucceeded();
436 this.commitCall(callIndex);
437 this.reportStatus(status);
438 return;
439 }
440 if (this.state === 'COMMITTED') {
441 this.reportStatus(status);
442 return;
443 }
444 const pushback = this.getPushback(status.metadata);
445 switch (status.progress) {
446 case 'NOT_STARTED':
447 // RPC never leaves the client, always safe to retry
448 this.startNewAttempt();
449 break;
450 case 'REFUSED':
451 // RPC reaches the server library, but not the server application logic
452 if (this.transparentRetryUsed) {
453 this.handleProcessedStatus(status, callIndex, pushback);
454 } else {
455 this.transparentRetryUsed = true;
456 this.startNewAttempt();
457 };
458 break;
459 case 'DROP':
460 this.commitCall(callIndex);
461 this.reportStatus(status);
462 break;
463 case 'PROCESSED':
464 this.handleProcessedStatus(status, callIndex, pushback);
465 break;
466 }
467 }
468
469 private maybeStartHedgingAttempt() {
470 if (this.state !== 'HEDGING') {
471 return;
472 }
473 if (!this.callConfig.methodConfig.hedgingPolicy) {
474 return;
475 }
476 const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
477 if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
478 return;
479 }
480 this.attempts += 1;
481 this.startNewAttempt();
482 this.maybeStartHedgingTimer();
483 }
484
485 private maybeStartHedgingTimer() {
486 if (this.hedgingTimer) {
487 clearTimeout(this.hedgingTimer);
488 }
489 if (this.state !== 'HEDGING') {
490 return;
491 }
492 if (!this.callConfig.methodConfig.hedgingPolicy) {
493 return;
494 }
495 const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
496 if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
497 return;
498 }
499 const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s';
500 const hedgingDelaySec = Number(hedgingDelayString.substring(0, hedgingDelayString.length - 1));
501 this.hedgingTimer = setTimeout(() => {
502 this.maybeStartHedgingAttempt();
503 }, hedgingDelaySec * 1000);
504 this.hedgingTimer.unref?.();
505 }
506
507 private startNewAttempt() {
508 const child = this.channel.createLoadBalancingCall(this.callConfig, this.methodName, this.host, this.credentials, this.deadline);
509 this.trace('Created child call [' + child.getCallNumber() + '] for attempt ' + this.attempts);
510 const index = this.underlyingCalls.length;
511 this.underlyingCalls.push({state: 'ACTIVE', call: child, nextMessageToSend: 0});
512 const previousAttempts = this.attempts - 1;
513 const initialMetadata = this.initialMetadata!.clone();
514 if (previousAttempts > 0) {
515 initialMetadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
516 }
517 let receivedMetadata = false;
518 child.start(initialMetadata, {
519 onReceiveMetadata: metadata => {
520 this.trace('Received metadata from child [' + child.getCallNumber() + ']');
521 this.commitCall(index);
522 receivedMetadata = true;
523 if (previousAttempts > 0) {
524 metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
525 }
526 if (this.underlyingCalls[index].state === 'ACTIVE') {
527 this.listener!.onReceiveMetadata(metadata);
528 }
529 },
530 onReceiveMessage: message => {
531 this.trace('Received message from child [' + child.getCallNumber() + ']');
532 this.commitCall(index);
533 if (this.underlyingCalls[index].state === 'ACTIVE') {
534 this.listener!.onReceiveMessage(message);
535 }
536 },
537 onReceiveStatus: status => {
538 this.trace('Received status from child [' + child.getCallNumber() + ']');
539 if (!receivedMetadata && previousAttempts > 0) {
540 status.metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`);
541 }
542 this.handleChildStatus(status, index);
543 }
544 });
545 this.sendNextChildMessage(index);
546 if (this.readStarted) {
547 child.startRead();
548 }
549 }
550
551 start(metadata: Metadata, listener: InterceptingListener): void {
552 this.trace('start called');
553 this.listener = listener;
554 this.initialMetadata = metadata;
555 this.attempts += 1;
556 this.startNewAttempt();
557 this.maybeStartHedgingTimer();
558 }
559
560 private handleChildWriteCompleted(childIndex: number) {
561 const childCall = this.underlyingCalls[childIndex];
562 const messageIndex = childCall.nextMessageToSend;
563 this.getBufferEntry(messageIndex).callback?.();
564 this.clearSentMessages();
565 childCall.nextMessageToSend += 1;
566 this.sendNextChildMessage(childIndex);
567 }
568
569 private sendNextChildMessage(childIndex: number) {
570 const childCall = this.underlyingCalls[childIndex];
571 if (childCall.state === 'COMPLETED') {
572 return;
573 }
574 if (this.getBufferEntry(childCall.nextMessageToSend)) {
575 const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
576 switch (bufferEntry.entryType) {
577 case 'MESSAGE':
578 childCall.call.sendMessageWithContext({
579 callback: (error) => {
580 // Ignore error
581 this.handleChildWriteCompleted(childIndex);
582 }
583 }, bufferEntry.message!.message);
584 break;
585 case 'HALF_CLOSE':
586 childCall.nextMessageToSend += 1;
587 childCall.call.halfClose();
588 break;
589 case 'FREED':
590 // Should not be possible
591 break;
592 }
593 }
594 }
595
596 sendMessageWithContext(context: MessageContext, message: Buffer): void {
597 this.trace('write() called with message of length ' + message.length);
598 const writeObj: WriteObject = {
599 message,
600 flags: context.flags,
601 };
602 const messageIndex = this.getNextBufferIndex();
603 const bufferEntry: WriteBufferEntry = {
604 entryType: 'MESSAGE',
605 message: writeObj,
606 allocated: this.bufferTracker.allocate(message.length, this.callNumber)
607 };
608 this.writeBuffer.push(bufferEntry);
609 if (bufferEntry.allocated) {
610 context.callback?.();
611 for (const [callIndex, call] of this.underlyingCalls.entries()) {
612 if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
613 call.call.sendMessageWithContext({
614 callback: (error) => {
615 // Ignore error
616 this.handleChildWriteCompleted(callIndex);
617 }
618 }, message);
619 }
620 }
621 } else {
622 this.commitCallWithMostMessages();
623 // commitCallWithMostMessages can fail if we are between ping attempts
624 if (this.committedCallIndex === null) {
625 return;
626 }
627 const call = this.underlyingCalls[this.committedCallIndex];
628 bufferEntry.callback = context.callback;
629 if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
630 call.call.sendMessageWithContext({
631 callback: (error) => {
632 // Ignore error
633 this.handleChildWriteCompleted(this.committedCallIndex!);
634 }
635 }, message);
636 }
637 }
638 }
639 startRead(): void {
640 this.trace('startRead called');
641 this.readStarted = true;
642 for (const underlyingCall of this.underlyingCalls) {
643 if (underlyingCall?.state === 'ACTIVE') {
644 underlyingCall.call.startRead();
645 }
646 }
647 }
648 halfClose(): void {
649 this.trace('halfClose called');
650 const halfCloseIndex = this.getNextBufferIndex();
651 this.writeBuffer.push({
652 entryType: 'HALF_CLOSE',
653 allocated: false
654 });
655 for (const call of this.underlyingCalls) {
656 if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) {
657 call.nextMessageToSend += 1;
658 call.call.halfClose();
659 }
660 }
661 }
662 setCredentials(newCredentials: CallCredentials): void {
663 throw new Error("Method not implemented.");
664 }
665 getMethod(): string {
666 return this.methodName;
667 }
668 getHost(): string {
669 return this.host;
670 }
671}
\No newline at end of file