/* * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ import { Deadline, Call, Http2CallStream, CallStreamOptions, StatusObject, } from './call-stream'; import { ChannelCredentials } from './channel-credentials'; import { ChannelOptions } from './channel-options'; import { ResolvingLoadBalancer } from './resolving-load-balancer'; import { SubchannelPool, getSubchannelPool } from './subchannel-pool'; import { ChannelControlHelper } from './load-balancer'; import { UnavailablePicker, Picker, PickResultType } from './picker'; import { Metadata } from './metadata'; import { Status, LogVerbosity, Propagate } from './constants'; import { FilterStackFactory } from './filter-stack'; import { CallCredentialsFilterFactory } from './call-credentials-filter'; import { DeadlineFilterFactory } from './deadline-filter'; import { CompressionFilterFactory } from './compression-filter'; import { CallConfig, ConfigSelector, getDefaultAuthority, mapUriDefaultScheme, } from './resolver'; import { trace, log } from './logging'; import { SubchannelAddress } from './subchannel-address'; import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; import { mapProxyName } from './http_proxy'; import { GrpcUri, parseUri, uriToString } from './uri-parser'; import { ServerSurfaceCall } from './server-call'; import { Filter } from './filter'; import { ConnectivityState } from './connectivity-state'; import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz'; import { Subchannel } from './subchannel'; /** * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args */ const MAX_TIMEOUT_TIME = 2147483647; let nextCallNumber = 0; function getNewCallNumber(): number { const callNumber = nextCallNumber; nextCallNumber += 1; if (nextCallNumber >= Number.MAX_SAFE_INTEGER) { nextCallNumber = 0; } return callNumber; } const INAPPROPRIATE_CONTROL_PLANE_CODES: Status[] = [ Status.OK, Status.INVALID_ARGUMENT, Status.NOT_FOUND, Status.ALREADY_EXISTS, Status.FAILED_PRECONDITION, Status.ABORTED, Status.OUT_OF_RANGE, Status.DATA_LOSS ] function restrictControlPlaneStatusCode(code: Status, details: string): {code: Status, details: string} { if (INAPPROPRIATE_CONTROL_PLANE_CODES.includes(code)) { return { code: Status.INTERNAL, details: `Invalid status from control plane: ${code} ${Status[code]} ${details}` } } else { return {code, details}; } } /** * An interface that represents a communication channel to a server specified * by a given address. */ export interface Channel { /** * Close the channel. This has the same functionality as the existing * grpc.Client.prototype.close */ close(): void; /** * Return the target that this channel connects to */ getTarget(): string; /** * Get the channel's current connectivity state. This method is here mainly * because it is in the existing internal Channel class, and there isn't * another good place to put it. * @param tryToConnect If true, the channel will start connecting if it is * idle. Otherwise, idle channels will only start connecting when a * call starts. */ getConnectivityState(tryToConnect: boolean): ConnectivityState; /** * Watch for connectivity state changes. This is also here mainly because * it is in the existing external Channel class. * @param currentState The state to watch for transitions from. This should * always be populated by calling getConnectivityState immediately * before. * @param deadline A deadline for waiting for a state change * @param callback Called with no error when a state change, or with an * error if the deadline passes without a state change. */ watchConnectivityState( currentState: ConnectivityState, deadline: Date | number, callback: (error?: Error) => void ): void; /** * Get the channelz reference object for this channel. A request to the * channelz service for the id in this object will provide information * about this channel. */ getChannelzRef(): ChannelRef; /** * Create a call object. Call is an opaque type that is used by the Client * class. This function is called by the gRPC library when starting a * request. Implementers should return an instance of Call that is returned * from calling createCall on an instance of the provided Channel class. * @param method The full method string to request. * @param deadline The call deadline * @param host A host string override for making the request * @param parentCall A server call to propagate some information from * @param propagateFlags A bitwise combination of elements of grpc.propagate * that indicates what information to propagate from parentCall. */ createCall( method: string, deadline: Deadline, host: string | null | undefined, parentCall: ServerSurfaceCall | null, propagateFlags: number | null | undefined ): Call; } interface ConnectivityStateWatcher { currentState: ConnectivityState; timer: NodeJS.Timeout | null; callback: (error?: Error) => void; } export class ChannelImplementation implements Channel { private resolvingLoadBalancer: ResolvingLoadBalancer; private subchannelPool: SubchannelPool; private connectivityState: ConnectivityState = ConnectivityState.IDLE; private currentPicker: Picker = new UnavailablePicker(); /** * Calls queued up to get a call config. Should only be populated before the * first time the resolver returns a result, which includes the ConfigSelector. */ private configSelectionQueue: Array<{ callStream: Http2CallStream; callMetadata: Metadata; }> = []; private pickQueue: Array<{ callStream: Http2CallStream; callMetadata: Metadata; callConfig: CallConfig; dynamicFilters: Filter[]; }> = []; private connectivityStateWatchers: ConnectivityStateWatcher[] = []; private defaultAuthority: string; private filterStackFactory: FilterStackFactory; private target: GrpcUri; /** * This timer does not do anything on its own. Its purpose is to hold the * event loop open while there are any pending calls for the channel that * have not yet been assigned to specific subchannels. In other words, * the invariant is that callRefTimer is reffed if and only if pickQueue * is non-empty. */ private callRefTimer: NodeJS.Timer; private configSelector: ConfigSelector | null = null; /** * This is the error from the name resolver if it failed most recently. It * is only used to end calls that start while there is no config selector * and the name resolver is in backoff, so it should be nulled if * configSelector becomes set or the channel state becomes anything other * than TRANSIENT_FAILURE. */ private currentResolutionError: StatusObject | null = null; // Channelz info private readonly channelzEnabled: boolean = true; private originalTarget: string; private channelzRef: ChannelRef; private channelzTrace: ChannelzTrace; private callTracker = new ChannelzCallTracker(); private childrenTracker = new ChannelzChildrenTracker(); constructor( target: string, private readonly credentials: ChannelCredentials, private readonly options: ChannelOptions ) { if (typeof target !== 'string') { throw new TypeError('Channel target must be a string'); } if (!(credentials instanceof ChannelCredentials)) { throw new TypeError( 'Channel credentials must be a ChannelCredentials object' ); } if (options) { if (typeof options !== 'object') { throw new TypeError('Channel options must be an object'); } } this.originalTarget = target; const originalTargetUri = parseUri(target); if (originalTargetUri === null) { throw new Error(`Could not parse target name "${target}"`); } /* This ensures that the target has a scheme that is registered with the * resolver */ const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri); if (defaultSchemeMapResult === null) { throw new Error( `Could not find a default scheme for target name "${target}"` ); } this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME); this.callRefTimer.unref?.(); if (this.options['grpc.enable_channelz'] === 0) { this.channelzEnabled = false; } this.channelzTrace = new ChannelzTrace(); this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled); if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Channel created'); } if (this.options['grpc.default_authority']) { this.defaultAuthority = this.options['grpc.default_authority'] as string; } else { this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult); } const proxyMapResult = mapProxyName(defaultSchemeMapResult, options); this.target = proxyMapResult.target; this.options = Object.assign({}, this.options, proxyMapResult.extraOptions); /* The global boolean parameter to getSubchannelPool has the inverse meaning to what * the grpc.use_local_subchannel_pool channel option means. */ this.subchannelPool = getSubchannelPool( (options['grpc.use_local_subchannel_pool'] ?? 0) === 0 ); const channelControlHelper: ChannelControlHelper = { createSubchannel: ( subchannelAddress: SubchannelAddress, subchannelArgs: ChannelOptions ) => { const subchannel = this.subchannelPool.getOrCreateSubchannel( this.target, subchannelAddress, Object.assign({}, this.options, subchannelArgs), this.credentials ); if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef()); } return subchannel; }, updateState: (connectivityState: ConnectivityState, picker: Picker) => { this.currentPicker = picker; const queueCopy = this.pickQueue.slice(); this.pickQueue = []; this.callRefTimerUnref(); for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) { this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); } this.updateState(connectivityState); }, requestReresolution: () => { // This should never be called. throw new Error( 'Resolving load balancer should never call requestReresolution' ); }, addChannelzChild: (child: ChannelRef | SubchannelRef) => { if (this.channelzEnabled) { this.childrenTracker.refChild(child); } }, removeChannelzChild: (child: ChannelRef | SubchannelRef) => { if (this.channelzEnabled) { this.childrenTracker.unrefChild(child); } } }; this.resolvingLoadBalancer = new ResolvingLoadBalancer( this.target, channelControlHelper, options, (configSelector) => { if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded'); } this.configSelector = configSelector; this.currentResolutionError = null; /* We process the queue asynchronously to ensure that the corresponding * load balancer update has completed. */ process.nextTick(() => { const localQueue = this.configSelectionQueue; this.configSelectionQueue = []; this.callRefTimerUnref(); for (const { callStream, callMetadata } of localQueue) { this.tryGetConfig(callStream, callMetadata); } this.configSelectionQueue = []; }); }, (status) => { if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"'); } if (this.configSelectionQueue.length > 0) { this.trace('Name resolution failed with calls queued for config selection'); } if (this.configSelector === null) { this.currentResolutionError = {...restrictControlPlaneStatusCode(status.code, status.details), metadata: status.metadata}; } const localQueue = this.configSelectionQueue; this.configSelectionQueue = []; this.callRefTimerUnref(); for (const { callStream, callMetadata } of localQueue) { if (callMetadata.getOptions().waitForReady) { this.callRefTimerRef(); this.configSelectionQueue.push({ callStream, callMetadata }); } else { callStream.cancelWithStatus(status.code, status.details); } } } ); this.filterStackFactory = new FilterStackFactory([ new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this), new MaxMessageSizeFilterFactory(this.options), new CompressionFilterFactory(this, this.options), ]); this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2)); const error = new Error(); trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1)); } private getChannelzInfo(): ChannelInfo { return { target: this.originalTarget, state: this.connectivityState, trace: this.channelzTrace, callTracker: this.callTracker, children: this.childrenTracker.getChildLists() }; } private trace(text: string, verbosityOverride?: LogVerbosity) { trace(verbosityOverride ?? LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text); } private callRefTimerRef() { // If the hasRef function does not exist, always run the code if (!this.callRefTimer.hasRef?.()) { this.trace( 'callRefTimer.ref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length ); this.callRefTimer.ref?.(); } } private callRefTimerUnref() { // If the hasRef function does not exist, always run the code if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) { this.trace( 'callRefTimer.unref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length ); this.callRefTimer.unref?.(); } } private pushPick( callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig, dynamicFilters: Filter[] ) { this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters }); this.callRefTimerRef(); } /** * Check the picker output for the given call and corresponding metadata, * and take any relevant actions. Should not be called while iterating * over pickQueue. * @param callStream * @param callMetadata */ private tryPick( callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig, dynamicFilters: Filter[] ) { const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation, }); const subchannelString = pickResult.subchannel ? '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() : '' + pickResult.subchannel; this.trace( 'Pick result for call [' + callStream.getCallNumber() + ']: ' + PickResultType[pickResult.pickResultType] + ' subchannel: ' + subchannelString + ' status: ' + pickResult.status?.code + ' ' + pickResult.status?.details ); switch (pickResult.pickResultType) { case PickResultType.COMPLETE: if (pickResult.subchannel === null) { callStream.cancelWithStatus( Status.UNAVAILABLE, 'Request dropped by load balancing policy' ); // End the call with an error } else { /* If the subchannel is not in the READY state, that indicates a bug * somewhere in the load balancer or picker. So, we log an error and * queue the pick to be tried again later. */ if ( pickResult.subchannel!.getConnectivityState() !== ConnectivityState.READY ) { log( LogVerbosity.ERROR, 'Error: COMPLETE pick result subchannel ' + subchannelString + ' has state ' + ConnectivityState[pickResult.subchannel!.getConnectivityState()] ); this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); break; } /* We need to clone the callMetadata here because the transparent * retry code in the promise resolution handler use the same * callMetadata object, so it needs to stay unmodified */ callStream.filterStack .sendMetadata(Promise.resolve(callMetadata.clone())) .then( (finalMetadata) => { const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState(); if (subchannelState === ConnectivityState.READY) { try { const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream)); pickResult.subchannel?.getRealSubchannel().startCallStream( finalMetadata, callStream, [...dynamicFilters, ...pickExtraFilters] ); /* If we reach this point, the call stream has started * successfully */ callConfig.onCommitted?.(); pickResult.onCallStarted?.(); } catch (error) { const errorCode = (error as NodeJS.ErrnoException).code; if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' || errorCode === 'ERR_HTTP2_INVALID_SESSION' ) { /* An error here indicates that something went wrong with * the picked subchannel's http2 stream right before we * tried to start the stream. We are handling a promise * result here, so this is asynchronous with respect to the * original tryPick call, so calling it again is not * recursive. We call tryPick immediately instead of * queueing this pick again because handling the queue is * triggered by state changes, and we want to immediately * check if the state has already changed since the * previous tryPick call. We do this instead of cancelling * the stream because the correct behavior may be * re-queueing instead, based on the logic in the rest of * tryPick */ this.trace( 'Failed to start call on picked subchannel ' + subchannelString + ' with error ' + (error as Error).message + '. Retrying pick', LogVerbosity.INFO ); this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); } else { this.trace( 'Failed to start call on picked subchanel ' + subchannelString + ' with error ' + (error as Error).message + '. Ending call', LogVerbosity.INFO ); callStream.cancelWithStatus( Status.INTERNAL, `Failed to start HTTP/2 stream with error: ${ (error as Error).message }` ); } } } else { /* The logic for doing this here is the same as in the catch * block above */ this.trace( 'Picked subchannel ' + subchannelString + ' has state ' + ConnectivityState[subchannelState] + ' after metadata filters. Retrying pick', LogVerbosity.INFO ); this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); } }, (error: Error & { code: number }) => { // We assume the error code isn't 0 (Status.OK) const {code, details} = restrictControlPlaneStatusCode( typeof error.code === 'number' ? error.code : Status.UNKNOWN, `Getting metadata from plugin failed with error: ${error.message}` ) callStream.cancelWithStatus(code, details); } ); } break; case PickResultType.QUEUE: this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); break; case PickResultType.TRANSIENT_FAILURE: if (callMetadata.getOptions().waitForReady) { this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); } else { const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details); callStream.cancelWithStatus(code, details); } break; case PickResultType.DROP: const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details); callStream.cancelWithStatus(code, details); break; default: throw new Error( `Invalid state: unknown pickResultType ${pickResult.pickResultType}` ); } } private removeConnectivityStateWatcher( watcherObject: ConnectivityStateWatcher ) { const watcherIndex = this.connectivityStateWatchers.findIndex( (value) => value === watcherObject ); if (watcherIndex >= 0) { this.connectivityStateWatchers.splice(watcherIndex, 1); } } private updateState(newState: ConnectivityState): void { trace( LogVerbosity.DEBUG, 'connectivity_state', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState] ); if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]); } this.connectivityState = newState; const watchersCopy = this.connectivityStateWatchers.slice(); for (const watcherObject of watchersCopy) { if (newState !== watcherObject.currentState) { if (watcherObject.timer) { clearTimeout(watcherObject.timer); } this.removeConnectivityStateWatcher(watcherObject); watcherObject.callback(); } } if (newState !== ConnectivityState.TRANSIENT_FAILURE) { this.currentResolutionError = null; } } private tryGetConfig(stream: Http2CallStream, metadata: Metadata) { if (stream.getStatus() !== null) { /* If the stream has a status, it has already finished and we don't need * to take any more actions on it. */ return; } if (this.configSelector === null) { /* This branch will only be taken at the beginning of the channel's life, * before the resolver ever returns a result. So, the * ResolvingLoadBalancer may be idle and if so it needs to be kicked * because it now has a pending request. */ this.resolvingLoadBalancer.exitIdle(); if (this.currentResolutionError && !metadata.getOptions().waitForReady) { stream.cancelWithStatus(this.currentResolutionError.code, this.currentResolutionError.details); } else { this.configSelectionQueue.push({ callStream: stream, callMetadata: metadata, }); this.callRefTimerRef(); } } else { const callConfig = this.configSelector(stream.getMethod(), metadata); if (callConfig.status === Status.OK) { if (callConfig.methodConfig.timeout) { const deadline = new Date(); deadline.setSeconds( deadline.getSeconds() + callConfig.methodConfig.timeout.seconds ); deadline.setMilliseconds( deadline.getMilliseconds() + callConfig.methodConfig.timeout.nanos / 1_000_000 ); stream.setConfigDeadline(deadline); // Refreshing the filters makes the deadline filter pick up the new deadline stream.filterStack.refresh(); } if (callConfig.dynamicFilterFactories.length > 0) { /* These dynamicFilters are the mechanism for implementing gRFC A39: * https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md * We run them here instead of with the rest of the filters because * that spec says "the xDS HTTP filters will run in between name * resolution and load balancing". * * We use the filter stack here to simplify the multi-filter async * waterfall logic, but we pass along the underlying list of filters * to avoid having nested filter stacks when combining it with the * original filter stack. We do not pass along the original filter * factory list because these filters may need to persist data * between sending headers and other operations. */ const dynamicFilterStackFactory = new FilterStackFactory(callConfig.dynamicFilterFactories); const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream); dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => { this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters()); }); } else { this.tryPick(stream, metadata, callConfig, []); } } else { const {code, details} = restrictControlPlaneStatusCode(callConfig.status, 'Failed to route call to method ' + stream.getMethod()); stream.cancelWithStatus(code, details); } } } _startCallStream(stream: Http2CallStream, metadata: Metadata) { this.tryGetConfig(stream, metadata.clone()); } close() { this.resolvingLoadBalancer.destroy(); this.updateState(ConnectivityState.SHUTDOWN); clearInterval(this.callRefTimer); if (this.channelzEnabled) { unregisterChannelzRef(this.channelzRef); } this.subchannelPool.unrefUnusedSubchannels(); } getTarget() { return uriToString(this.target); } getConnectivityState(tryToConnect: boolean) { const connectivityState = this.connectivityState; if (tryToConnect) { this.resolvingLoadBalancer.exitIdle(); } return connectivityState; } watchConnectivityState( currentState: ConnectivityState, deadline: Date | number, callback: (error?: Error) => void ): void { if (this.connectivityState === ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); } let timer = null; if (deadline !== Infinity) { const deadlineDate: Date = deadline instanceof Date ? deadline : new Date(deadline); const now = new Date(); if (deadline === -Infinity || deadlineDate <= now) { process.nextTick( callback, new Error('Deadline passed without connectivity state change') ); return; } timer = setTimeout(() => { this.removeConnectivityStateWatcher(watcherObject); callback( new Error('Deadline passed without connectivity state change') ); }, deadlineDate.getTime() - now.getTime()); } const watcherObject = { currentState, callback, timer, }; this.connectivityStateWatchers.push(watcherObject); } /** * Get the channelz reference object for this channel. The returned value is * garbage if channelz is disabled for this channel. * @returns */ getChannelzRef() { return this.channelzRef; } createCall( method: string, deadline: Deadline, host: string | null | undefined, parentCall: ServerSurfaceCall | null, propagateFlags: number | null | undefined ): Call { if (typeof method !== 'string') { throw new TypeError('Channel#createCall: method must be a string'); } if (!(typeof deadline === 'number' || deadline instanceof Date)) { throw new TypeError( 'Channel#createCall: deadline must be a number or Date' ); } if (this.connectivityState === ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); } const callNumber = getNewCallNumber(); this.trace( 'createCall [' + callNumber + '] method="' + method + '", deadline=' + deadline ); const finalOptions: CallStreamOptions = { deadline: deadline, flags: propagateFlags ?? Propagate.DEFAULTS, host: host ?? this.defaultAuthority, parentCall: parentCall, }; const stream: Http2CallStream = new Http2CallStream( method, this, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials(), callNumber ); if (this.channelzEnabled) { this.callTracker.addCallStarted(); stream.addStatusWatcher(status => { if (status.code === Status.OK) { this.callTracker.addCallSucceeded(); } else { this.callTracker.addCallFailed(); } }); } return stream; } }