/* * Copyright 2022 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ import { CallCredentials } from "./call-credentials"; import { Call, InterceptingListener, MessageContext, StatusObject } from "./call-interface"; import { SubchannelCall } from "./subchannel-call"; import { ConnectivityState } from "./connectivity-state"; import { LogVerbosity, Status } from "./constants"; import { Deadline, getDeadlineTimeoutString } from "./deadline"; import { FilterStack, FilterStackFactory } from "./filter-stack"; import { InternalChannel } from "./internal-channel"; import { Metadata } from "./metadata"; import { PickResultType } from "./picker"; import { CallConfig } from "./resolver"; import { splitHostPort } from "./uri-parser"; import * as logging from './logging'; import { restrictControlPlaneStatusCode } from "./control-plane-status"; import * as http2 from 'http2'; const TRACER_NAME = 'load_balancing_call'; export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED'; export interface StatusObjectWithProgress extends StatusObject { progress: RpcProgress; } export interface LoadBalancingCallInterceptingListener extends InterceptingListener { onReceiveStatus(status: StatusObjectWithProgress): void; } export class LoadBalancingCall implements Call { private child: SubchannelCall | null = null; private readPending = false; private pendingMessage: {context: MessageContext, message: Buffer} | null = null; private pendingHalfClose = false; private pendingChildStatus: StatusObject | null = null; private ended = false; private serviceUrl: string; private metadata: Metadata | null = null; private listener: InterceptingListener | null = null; private onCallEnded: ((statusCode: Status) => void) | null = null; constructor( private readonly channel: InternalChannel, private readonly callConfig: CallConfig, private readonly methodName: string, private readonly host : string, private readonly credentials: CallCredentials, private readonly deadline: Deadline, private readonly callNumber: number ) { const splitPath: string[] = this.methodName.split('/'); let serviceName = ''; /* The standard path format is "/{serviceName}/{methodName}", so if we split * by '/', the first item should be empty and the second should be the * service name */ if (splitPath.length >= 2) { serviceName = splitPath[1]; } const hostname = splitHostPort(this.host)?.host ?? 'localhost'; /* Currently, call credentials are only allowed on HTTPS connections, so we * can assume that the scheme is "https" */ this.serviceUrl = `https://${hostname}/${serviceName}`; } private trace(text: string): void { logging.trace( LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callNumber + '] ' + text ); } private outputStatus(status: StatusObject, progress: RpcProgress) { if (!this.ended) { this.ended = true; this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"'); const finalStatus = {...status, progress}; this.listener?.onReceiveStatus(finalStatus); this.onCallEnded?.(finalStatus.code); } } doPick() { if (this.ended) { return; } if (!this.metadata) { throw new Error('doPick called before start'); } this.trace('Pick called') const pickResult = this.channel.doPick(this.metadata, this.callConfig.pickInformation); const subchannelString = pickResult.subchannel ? '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() : '' + pickResult.subchannel; this.trace( 'Pick result: ' + PickResultType[pickResult.pickResultType] + ' subchannel: ' + subchannelString + ' status: ' + pickResult.status?.code + ' ' + pickResult.status?.details ); switch (pickResult.pickResultType) { case PickResultType.COMPLETE: this.credentials.generateMetadata({service_url: this.serviceUrl}).then( (credsMetadata) => { const finalMetadata = this.metadata!.clone(); finalMetadata.merge(credsMetadata); if (finalMetadata.get('authorization').length > 1) { this.outputStatus( { code: Status.INTERNAL, details: '"authorization" metadata cannot have multiple values', metadata: new Metadata() }, 'PROCESSED' ); } if (pickResult.subchannel!.getConnectivityState() !== ConnectivityState.READY) { this.trace( 'Picked subchannel ' + subchannelString + ' has state ' + ConnectivityState[pickResult.subchannel!.getConnectivityState()] + ' after getting credentials metadata. Retrying pick' ); this.doPick(); return; } if (this.deadline !== Infinity) { finalMetadata.set('grpc-timeout', getDeadlineTimeoutString(this.deadline)); } try { this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, { onReceiveMetadata: metadata => { this.trace('Received metadata'); this.listener!.onReceiveMetadata(metadata); }, onReceiveMessage: message => { this.trace('Received message'); this.listener!.onReceiveMessage(message); }, onReceiveStatus: status => { this.trace('Received status'); if (status.rstCode === http2.constants.NGHTTP2_REFUSED_STREAM) { this.outputStatus(status, 'REFUSED'); } else { this.outputStatus(status, 'PROCESSED'); } } }); } catch (error) { this.trace( 'Failed to start call on picked subchannel ' + subchannelString + ' with error ' + (error as Error).message ); this.outputStatus( { code: Status.INTERNAL, details: 'Failed to start HTTP/2 stream with error ' + (error as Error).message, metadata: new Metadata() }, 'NOT_STARTED' ); return; } this.callConfig.onCommitted?.(); pickResult.onCallStarted?.(); this.onCallEnded = pickResult.onCallEnded; this.trace('Created child call [' + this.child.getCallNumber() + ']'); if (this.readPending) { this.child.startRead(); } if (this.pendingMessage) { this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message); } if (this.pendingHalfClose) { this.child.halfClose(); } }, (error: Error & { code: number }) => { // We assume the error code isn't 0 (Status.OK) const {code, details} = restrictControlPlaneStatusCode( typeof error.code === 'number' ? error.code : Status.UNKNOWN, `Getting metadata from plugin failed with error: ${error.message}` ) this.outputStatus( { code: code, details: details, metadata: new Metadata() }, 'PROCESSED' ); } ); break; case PickResultType.DROP: const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details); setImmediate(() => { this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'DROP'); }); break; case PickResultType.TRANSIENT_FAILURE: if (this.metadata.getOptions().waitForReady) { this.channel.queueCallForPick(this); } else { const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details); setImmediate(() => { this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'PROCESSED'); }); } break; case PickResultType.QUEUE: this.channel.queueCallForPick(this); } } cancelWithStatus(status: Status, details: string): void { this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"'); this.child?.cancelWithStatus(status, details); this.outputStatus({code: status, details: details, metadata: new Metadata()}, 'PROCESSED'); } getPeer(): string { return this.child?.getPeer() ?? this.channel.getTarget(); } start(metadata: Metadata, listener: LoadBalancingCallInterceptingListener): void { this.trace('start called'); this.listener = listener; this.metadata = metadata; this.doPick(); } sendMessageWithContext(context: MessageContext, message: Buffer): void { this.trace('write() called with message of length ' + message.length); if (this.child) { this.child.sendMessageWithContext(context, message); } else { this.pendingMessage = {context, message}; } } startRead(): void { this.trace('startRead called'); if (this.child) { this.child.startRead(); } else { this.readPending = true; } } halfClose(): void { this.trace('halfClose called'); if (this.child) { this.child.halfClose(); } else { this.pendingHalfClose = true; } } setCredentials(credentials: CallCredentials): void { throw new Error("Method not implemented."); } getCallNumber(): number { return this.callNumber; } }