/* * Copyright 2022 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ import { CallCredentials } from './call-credentials'; import { LogVerbosity, Status } from './constants'; import { Deadline } from './deadline'; import { Metadata } from './metadata'; import { CallConfig } from './resolver'; import * as logging from './logging'; import { Call, InterceptingListener, MessageContext, StatusObject, WriteCallback, WriteObject, } from './call-interface'; import { LoadBalancingCall, StatusObjectWithProgress, } from './load-balancing-call'; import { InternalChannel } from './internal-channel'; const TRACER_NAME = 'retrying_call'; export class RetryThrottler { private tokens: number; constructor( private readonly maxTokens: number, private readonly tokenRatio: number, previousRetryThrottler?: RetryThrottler ) { if (previousRetryThrottler) { /* When carrying over tokens from a previous config, rescale them to the * new max value */ this.tokens = previousRetryThrottler.tokens * (maxTokens / previousRetryThrottler.maxTokens); } else { this.tokens = maxTokens; } } addCallSucceeded() { this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens); } addCallFailed() { this.tokens = Math.min(this.tokens - 1, 0); } canRetryCall() { return this.tokens > this.maxTokens / 2; } } export class MessageBufferTracker { private totalAllocated = 0; private allocatedPerCall: Map = new Map(); constructor(private totalLimit: number, private limitPerCall: number) {} allocate(size: number, callId: number): boolean { const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; if ( this.limitPerCall - currentPerCall < size || this.totalLimit - this.totalAllocated < size ) { return false; } this.allocatedPerCall.set(callId, currentPerCall + size); this.totalAllocated += size; return true; } free(size: number, callId: number) { if (this.totalAllocated < size) { throw new Error( `Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}` ); } this.totalAllocated -= size; const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; if (currentPerCall < size) { throw new Error( `Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}` ); } this.allocatedPerCall.set(callId, currentPerCall - size); } freeAll(callId: number) { const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; if (this.totalAllocated < currentPerCall) { throw new Error( `Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}` ); } this.totalAllocated -= currentPerCall; this.allocatedPerCall.delete(callId); } } type UnderlyingCallState = 'ACTIVE' | 'COMPLETED'; interface UnderlyingCall { state: UnderlyingCallState; call: LoadBalancingCall; nextMessageToSend: number; } /** * A retrying call can be in one of these states: * RETRY: Retries are configured and new attempts may be sent * HEDGING: Hedging is configured and new attempts may be sent * TRANSPARENT_ONLY: Neither retries nor hedging are configured, and * transparent retry attempts may still be sent * COMMITTED: One attempt is committed, and no new attempts will be * sent */ type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED'; /** * The different types of objects that can be stored in the write buffer, with * the following meanings: * MESSAGE: This is a message to be sent. * HALF_CLOSE: When this entry is reached, the calls should send a half-close. * FREED: This slot previously contained a message that has been sent on all * child calls and is no longer needed. */ type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED'; /** * Entry in the buffer of messages to send to the remote end. */ interface WriteBufferEntry { entryType: WriteBufferEntryType; /** * Message to send. * Only populated if entryType is MESSAGE. */ message?: WriteObject; /** * Callback to call after sending the message. * Only populated if entryType is MESSAGE and the call is in the COMMITTED * state. */ callback?: WriteCallback; /** * Indicates whether the message is allocated in the buffer tracker. Ignored * if entryType is not MESSAGE. Should be the return value of * bufferTracker.allocate. */ allocated: boolean; } const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts'; export class RetryingCall implements Call { private state: RetryingCallState; private listener: InterceptingListener | null = null; private initialMetadata: Metadata | null = null; private underlyingCalls: UnderlyingCall[] = []; private writeBuffer: WriteBufferEntry[] = []; /** * The offset of message indices in the writeBuffer. For example, if * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15 * is in writeBuffer[5]. */ private writeBufferOffset = 0; /** * Tracks whether a read has been started, so that we know whether to start * reads on new child calls. This only matters for the first read, because * once a message comes in the child call becomes committed and there will * be no new child calls. */ private readStarted = false; private transparentRetryUsed = false; /** * Number of attempts so far */ private attempts = 0; private hedgingTimer: NodeJS.Timeout | null = null; private committedCallIndex: number | null = null; private initialRetryBackoffSec = 0; private nextRetryBackoffSec = 0; constructor( private readonly channel: InternalChannel, private readonly callConfig: CallConfig, private readonly methodName: string, private readonly host: string, private readonly credentials: CallCredentials, private readonly deadline: Deadline, private readonly callNumber: number, private readonly bufferTracker: MessageBufferTracker, private readonly retryThrottler?: RetryThrottler ) { if (callConfig.methodConfig.retryPolicy) { this.state = 'RETRY'; const retryPolicy = callConfig.methodConfig.retryPolicy; this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number( retryPolicy.initialBackoff.substring( 0, retryPolicy.initialBackoff.length - 1 ) ); } else if (callConfig.methodConfig.hedgingPolicy) { this.state = 'HEDGING'; } else { this.state = 'TRANSPARENT_ONLY'; } } getCallNumber(): number { return this.callNumber; } private trace(text: string): void { logging.trace( LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callNumber + '] ' + text ); } private reportStatus(statusObject: StatusObject) { this.trace( 'ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"' ); this.bufferTracker.freeAll(this.callNumber); this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length; this.writeBuffer = []; process.nextTick(() => { // Explicitly construct status object to remove progress field this.listener?.onReceiveStatus({ code: statusObject.code, details: statusObject.details, metadata: statusObject.metadata, }); }); } cancelWithStatus(status: Status, details: string): void { this.trace( 'cancelWithStatus code: ' + status + ' details: "' + details + '"' ); this.reportStatus({ code: status, details, metadata: new Metadata() }); for (const { call } of this.underlyingCalls) { call.cancelWithStatus(status, details); } } getPeer(): string { if (this.committedCallIndex !== null) { return this.underlyingCalls[this.committedCallIndex].call.getPeer(); } else { return 'unknown'; } } private getBufferEntry(messageIndex: number): WriteBufferEntry { return ( this.writeBuffer[messageIndex - this.writeBufferOffset] ?? { entryType: 'FREED', allocated: false, } ); } private getNextBufferIndex() { return this.writeBufferOffset + this.writeBuffer.length; } private clearSentMessages() { if (this.state !== 'COMMITTED') { return; } const earliestNeededMessageIndex = this.underlyingCalls[this.committedCallIndex!].nextMessageToSend; for ( let messageIndex = this.writeBufferOffset; messageIndex < earliestNeededMessageIndex; messageIndex++ ) { const bufferEntry = this.getBufferEntry(messageIndex); if (bufferEntry.allocated) { this.bufferTracker.free( bufferEntry.message!.message.length, this.callNumber ); } } this.writeBuffer = this.writeBuffer.slice( earliestNeededMessageIndex - this.writeBufferOffset ); this.writeBufferOffset = earliestNeededMessageIndex; } private commitCall(index: number) { if (this.state === 'COMMITTED') { return; } if (this.underlyingCalls[index].state === 'COMPLETED') { return; } this.trace( 'Committing call [' + this.underlyingCalls[index].call.getCallNumber() + '] at index ' + index ); this.state = 'COMMITTED'; this.committedCallIndex = index; for (let i = 0; i < this.underlyingCalls.length; i++) { if (i === index) { continue; } if (this.underlyingCalls[i].state === 'COMPLETED') { continue; } this.underlyingCalls[i].state = 'COMPLETED'; this.underlyingCalls[i].call.cancelWithStatus( Status.CANCELLED, 'Discarded in favor of other hedged attempt' ); } this.clearSentMessages(); } private commitCallWithMostMessages() { if (this.state === 'COMMITTED') { return; } let mostMessages = -1; let callWithMostMessages = -1; for (const [index, childCall] of this.underlyingCalls.entries()) { if ( childCall.state === 'ACTIVE' && childCall.nextMessageToSend > mostMessages ) { mostMessages = childCall.nextMessageToSend; callWithMostMessages = index; } } if (callWithMostMessages === -1) { /* There are no active calls, disable retries to force the next call that * is started to be committed. */ this.state = 'TRANSPARENT_ONLY'; } else { this.commitCall(callWithMostMessages); } } private isStatusCodeInList(list: (Status | string)[], code: Status) { return list.some( value => value === code || value.toString().toLowerCase() === Status[code].toLowerCase() ); } private getNextRetryBackoffMs() { const retryPolicy = this.callConfig?.methodConfig.retryPolicy; if (!retryPolicy) { return 0; } const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000; const maxBackoffSec = Number( retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1) ); this.nextRetryBackoffSec = Math.min( this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, maxBackoffSec ); return nextBackoffMs; } private maybeRetryCall( pushback: number | null, callback: (retried: boolean) => void ) { if (this.state !== 'RETRY') { callback(false); return; } const retryPolicy = this.callConfig!.methodConfig.retryPolicy!; if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) { callback(false); return; } let retryDelayMs: number; if (pushback === null) { retryDelayMs = this.getNextRetryBackoffMs(); } else if (pushback < 0) { this.state = 'TRANSPARENT_ONLY'; callback(false); return; } else { retryDelayMs = pushback; this.nextRetryBackoffSec = this.initialRetryBackoffSec; } setTimeout(() => { if (this.state !== 'RETRY') { callback(false); return; } if (this.retryThrottler?.canRetryCall() ?? true) { callback(true); this.attempts += 1; this.startNewAttempt(); } }, retryDelayMs); } private countActiveCalls(): number { let count = 0; for (const call of this.underlyingCalls) { if (call?.state === 'ACTIVE') { count += 1; } } return count; } private handleProcessedStatus( status: StatusObject, callIndex: number, pushback: number | null ) { switch (this.state) { case 'COMMITTED': case 'TRANSPARENT_ONLY': this.commitCall(callIndex); this.reportStatus(status); break; case 'HEDGING': if ( this.isStatusCodeInList( this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ?? [], status.code ) ) { this.retryThrottler?.addCallFailed(); let delayMs: number; if (pushback === null) { delayMs = 0; } else if (pushback < 0) { this.state = 'TRANSPARENT_ONLY'; this.commitCall(callIndex); this.reportStatus(status); return; } else { delayMs = pushback; } setTimeout(() => { this.maybeStartHedgingAttempt(); // If after trying to start a call there are no active calls, this was the last one if (this.countActiveCalls() === 0) { this.commitCall(callIndex); this.reportStatus(status); } }, delayMs); } else { this.commitCall(callIndex); this.reportStatus(status); } break; case 'RETRY': if ( this.isStatusCodeInList( this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes, status.code ) ) { this.retryThrottler?.addCallFailed(); this.maybeRetryCall(pushback, retried => { if (!retried) { this.commitCall(callIndex); this.reportStatus(status); } }); } else { this.commitCall(callIndex); this.reportStatus(status); } break; } } private getPushback(metadata: Metadata): number | null { const mdValue = metadata.get('grpc-retry-pushback-ms'); if (mdValue.length === 0) { return null; } try { return parseInt(mdValue[0] as string); } catch (e) { return -1; } } private handleChildStatus( status: StatusObjectWithProgress, callIndex: number ) { if (this.underlyingCalls[callIndex].state === 'COMPLETED') { return; } this.trace( 'state=' + this.state + ' handling status with progress ' + status.progress + ' from child [' + this.underlyingCalls[callIndex].call.getCallNumber() + '] in state ' + this.underlyingCalls[callIndex].state ); this.underlyingCalls[callIndex].state = 'COMPLETED'; if (status.code === Status.OK) { this.retryThrottler?.addCallSucceeded(); this.commitCall(callIndex); this.reportStatus(status); return; } if (this.state === 'COMMITTED') { this.reportStatus(status); return; } const pushback = this.getPushback(status.metadata); switch (status.progress) { case 'NOT_STARTED': // RPC never leaves the client, always safe to retry this.startNewAttempt(); break; case 'REFUSED': // RPC reaches the server library, but not the server application logic if (this.transparentRetryUsed) { this.handleProcessedStatus(status, callIndex, pushback); } else { this.transparentRetryUsed = true; this.startNewAttempt(); } break; case 'DROP': this.commitCall(callIndex); this.reportStatus(status); break; case 'PROCESSED': this.handleProcessedStatus(status, callIndex, pushback); break; } } private maybeStartHedgingAttempt() { if (this.state !== 'HEDGING') { return; } if (!this.callConfig.methodConfig.hedgingPolicy) { return; } const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy; if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) { return; } this.attempts += 1; this.startNewAttempt(); this.maybeStartHedgingTimer(); } private maybeStartHedgingTimer() { if (this.hedgingTimer) { clearTimeout(this.hedgingTimer); } if (this.state !== 'HEDGING') { return; } if (!this.callConfig.methodConfig.hedgingPolicy) { return; } const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy; if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) { return; } const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s'; const hedgingDelaySec = Number( hedgingDelayString.substring(0, hedgingDelayString.length - 1) ); this.hedgingTimer = setTimeout(() => { this.maybeStartHedgingAttempt(); }, hedgingDelaySec * 1000); this.hedgingTimer.unref?.(); } private startNewAttempt() { const child = this.channel.createLoadBalancingCall( this.callConfig, this.methodName, this.host, this.credentials, this.deadline ); this.trace( 'Created child call [' + child.getCallNumber() + '] for attempt ' + this.attempts ); const index = this.underlyingCalls.length; this.underlyingCalls.push({ state: 'ACTIVE', call: child, nextMessageToSend: 0, }); const previousAttempts = this.attempts - 1; const initialMetadata = this.initialMetadata!.clone(); if (previousAttempts > 0) { initialMetadata.set( PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}` ); } let receivedMetadata = false; child.start(initialMetadata, { onReceiveMetadata: metadata => { this.trace( 'Received metadata from child [' + child.getCallNumber() + ']' ); this.commitCall(index); receivedMetadata = true; if (previousAttempts > 0) { metadata.set( PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}` ); } if (this.underlyingCalls[index].state === 'ACTIVE') { this.listener!.onReceiveMetadata(metadata); } }, onReceiveMessage: message => { this.trace( 'Received message from child [' + child.getCallNumber() + ']' ); this.commitCall(index); if (this.underlyingCalls[index].state === 'ACTIVE') { this.listener!.onReceiveMessage(message); } }, onReceiveStatus: status => { this.trace( 'Received status from child [' + child.getCallNumber() + ']' ); if (!receivedMetadata && previousAttempts > 0) { status.metadata.set( PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}` ); } this.handleChildStatus(status, index); }, }); this.sendNextChildMessage(index); if (this.readStarted) { child.startRead(); } } start(metadata: Metadata, listener: InterceptingListener): void { this.trace('start called'); this.listener = listener; this.initialMetadata = metadata; this.attempts += 1; this.startNewAttempt(); this.maybeStartHedgingTimer(); } private handleChildWriteCompleted(childIndex: number) { const childCall = this.underlyingCalls[childIndex]; const messageIndex = childCall.nextMessageToSend; this.getBufferEntry(messageIndex).callback?.(); this.clearSentMessages(); childCall.nextMessageToSend += 1; this.sendNextChildMessage(childIndex); } private sendNextChildMessage(childIndex: number) { const childCall = this.underlyingCalls[childIndex]; if (childCall.state === 'COMPLETED') { return; } if (this.getBufferEntry(childCall.nextMessageToSend)) { const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend); switch (bufferEntry.entryType) { case 'MESSAGE': childCall.call.sendMessageWithContext( { callback: error => { // Ignore error this.handleChildWriteCompleted(childIndex); }, }, bufferEntry.message!.message ); break; case 'HALF_CLOSE': childCall.nextMessageToSend += 1; childCall.call.halfClose(); break; case 'FREED': // Should not be possible break; } } } sendMessageWithContext(context: MessageContext, message: Buffer): void { this.trace('write() called with message of length ' + message.length); const writeObj: WriteObject = { message, flags: context.flags, }; const messageIndex = this.getNextBufferIndex(); const bufferEntry: WriteBufferEntry = { entryType: 'MESSAGE', message: writeObj, allocated: this.bufferTracker.allocate(message.length, this.callNumber), }; this.writeBuffer.push(bufferEntry); if (bufferEntry.allocated) { context.callback?.(); for (const [callIndex, call] of this.underlyingCalls.entries()) { if ( call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex ) { call.call.sendMessageWithContext( { callback: error => { // Ignore error this.handleChildWriteCompleted(callIndex); }, }, message ); } } } else { this.commitCallWithMostMessages(); // commitCallWithMostMessages can fail if we are between ping attempts if (this.committedCallIndex === null) { return; } const call = this.underlyingCalls[this.committedCallIndex]; bufferEntry.callback = context.callback; if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) { call.call.sendMessageWithContext( { callback: error => { // Ignore error this.handleChildWriteCompleted(this.committedCallIndex!); }, }, message ); } } } startRead(): void { this.trace('startRead called'); this.readStarted = true; for (const underlyingCall of this.underlyingCalls) { if (underlyingCall?.state === 'ACTIVE') { underlyingCall.call.startRead(); } } } halfClose(): void { this.trace('halfClose called'); const halfCloseIndex = this.getNextBufferIndex(); this.writeBuffer.push({ entryType: 'HALF_CLOSE', allocated: false, }); for (const call of this.underlyingCalls) { if ( call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex ) { call.nextMessageToSend += 1; call.call.halfClose(); } } } setCredentials(newCredentials: CallCredentials): void { throw new Error('Method not implemented.'); } getMethod(): string { return this.methodName; } getHost(): string { return this.host; } }