UNPKG

10.6 kBPlain TextView Raw
1/*
2 * Copyright 2022 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { CallCredentials } from "./call-credentials";
19import { Call, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
20import { SubchannelCall } from "./subchannel-call";
21import { ConnectivityState } from "./connectivity-state";
22import { LogVerbosity, Status } from "./constants";
23import { Deadline, getDeadlineTimeoutString } from "./deadline";
24import { FilterStack, FilterStackFactory } from "./filter-stack";
25import { InternalChannel } from "./internal-channel";
26import { Metadata } from "./metadata";
27import { PickResultType } from "./picker";
28import { CallConfig } from "./resolver";
29import { splitHostPort } from "./uri-parser";
30import * as logging from './logging';
31import { restrictControlPlaneStatusCode } from "./control-plane-status";
32import * as http2 from 'http2';
33
34const TRACER_NAME = 'load_balancing_call';
35
36export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED';
37
38export interface StatusObjectWithProgress extends StatusObject {
39 progress: RpcProgress;
40}
41
42export interface LoadBalancingCallInterceptingListener extends InterceptingListener {
43 onReceiveStatus(status: StatusObjectWithProgress): void;
44}
45
46export class LoadBalancingCall implements Call {
47 private child: SubchannelCall | null = null;
48 private readPending = false;
49 private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
50 private pendingHalfClose = false;
51 private pendingChildStatus: StatusObject | null = null;
52 private ended = false;
53 private serviceUrl: string;
54 private metadata: Metadata | null = null;
55 private listener: InterceptingListener | null = null;
56 private onCallEnded: ((statusCode: Status) => void) | null = null;
57 constructor(
58 private readonly channel: InternalChannel,
59 private readonly callConfig: CallConfig,
60 private readonly methodName: string,
61 private readonly host : string,
62 private readonly credentials: CallCredentials,
63 private readonly deadline: Deadline,
64 private readonly callNumber: number
65 ) {
66 const splitPath: string[] = this.methodName.split('/');
67 let serviceName = '';
68 /* The standard path format is "/{serviceName}/{methodName}", so if we split
69 * by '/', the first item should be empty and the second should be the
70 * service name */
71 if (splitPath.length >= 2) {
72 serviceName = splitPath[1];
73 }
74 const hostname = splitHostPort(this.host)?.host ?? 'localhost';
75 /* Currently, call credentials are only allowed on HTTPS connections, so we
76 * can assume that the scheme is "https" */
77 this.serviceUrl = `https://${hostname}/${serviceName}`;
78 }
79
80 private trace(text: string): void {
81 logging.trace(
82 LogVerbosity.DEBUG,
83 TRACER_NAME,
84 '[' + this.callNumber + '] ' + text
85 );
86 }
87
88 private outputStatus(status: StatusObject, progress: RpcProgress) {
89 if (!this.ended) {
90 this.ended = true;
91 this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"');
92 const finalStatus = {...status, progress};
93 this.listener?.onReceiveStatus(finalStatus);
94 this.onCallEnded?.(finalStatus.code);
95 }
96 }
97
98 doPick() {
99 if (this.ended) {
100 return;
101 }
102 if (!this.metadata) {
103 throw new Error('doPick called before start');
104 }
105 this.trace('Pick called')
106 const pickResult = this.channel.doPick(this.metadata, this.callConfig.pickInformation);
107 const subchannelString = pickResult.subchannel ?
108 '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
109 '' + pickResult.subchannel;
110 this.trace(
111 'Pick result: ' +
112 PickResultType[pickResult.pickResultType] +
113 ' subchannel: ' +
114 subchannelString +
115 ' status: ' +
116 pickResult.status?.code +
117 ' ' +
118 pickResult.status?.details
119 );
120 switch (pickResult.pickResultType) {
121 case PickResultType.COMPLETE:
122 this.credentials.generateMetadata({service_url: this.serviceUrl}).then(
123 (credsMetadata) => {
124 const finalMetadata = this.metadata!.clone();
125 finalMetadata.merge(credsMetadata);
126 if (finalMetadata.get('authorization').length > 1) {
127 this.outputStatus(
128 {
129 code: Status.INTERNAL,
130 details: '"authorization" metadata cannot have multiple values',
131 metadata: new Metadata()
132 },
133 'PROCESSED'
134 );
135 }
136 if (pickResult.subchannel!.getConnectivityState() !== ConnectivityState.READY) {
137 this.trace(
138 'Picked subchannel ' +
139 subchannelString +
140 ' has state ' +
141 ConnectivityState[pickResult.subchannel!.getConnectivityState()] +
142 ' after getting credentials metadata. Retrying pick'
143 );
144 this.doPick();
145 return;
146 }
147
148 if (this.deadline !== Infinity) {
149 finalMetadata.set('grpc-timeout', getDeadlineTimeoutString(this.deadline));
150 }
151 try {
152 this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, {
153 onReceiveMetadata: metadata => {
154 this.trace('Received metadata');
155 this.listener!.onReceiveMetadata(metadata);
156 },
157 onReceiveMessage: message => {
158 this.trace('Received message');
159 this.listener!.onReceiveMessage(message);
160 },
161 onReceiveStatus: status => {
162 this.trace('Received status');
163 if (status.rstCode === http2.constants.NGHTTP2_REFUSED_STREAM) {
164 this.outputStatus(status, 'REFUSED');
165 } else {
166 this.outputStatus(status, 'PROCESSED');
167 }
168 }
169 });
170 } catch (error) {
171 this.trace(
172 'Failed to start call on picked subchannel ' +
173 subchannelString +
174 ' with error ' +
175 (error as Error).message
176 );
177 this.outputStatus(
178 {
179 code: Status.INTERNAL,
180 details: 'Failed to start HTTP/2 stream with error ' + (error as Error).message,
181 metadata: new Metadata()
182 },
183 'NOT_STARTED'
184 );
185 return;
186 }
187 this.callConfig.onCommitted?.();
188 pickResult.onCallStarted?.();
189 this.onCallEnded = pickResult.onCallEnded;
190 this.trace('Created child call [' + this.child.getCallNumber() + ']');
191 if (this.readPending) {
192 this.child.startRead();
193 }
194 if (this.pendingMessage) {
195 this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
196 }
197 if (this.pendingHalfClose) {
198 this.child.halfClose();
199 }
200 }, (error: Error & { code: number }) => {
201 // We assume the error code isn't 0 (Status.OK)
202 const {code, details} = restrictControlPlaneStatusCode(
203 typeof error.code === 'number' ? error.code : Status.UNKNOWN,
204 `Getting metadata from plugin failed with error: ${error.message}`
205 )
206 this.outputStatus(
207 {
208 code: code,
209 details: details,
210 metadata: new Metadata()
211 },
212 'PROCESSED'
213 );
214 }
215 );
216 break;
217 case PickResultType.DROP:
218 const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
219 setImmediate(() => {
220 this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'DROP');
221 });
222 break;
223 case PickResultType.TRANSIENT_FAILURE:
224 if (this.metadata.getOptions().waitForReady) {
225 this.channel.queueCallForPick(this);
226 } else {
227 const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
228 setImmediate(() => {
229 this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'PROCESSED');
230 });
231 }
232 break;
233 case PickResultType.QUEUE:
234 this.channel.queueCallForPick(this);
235 }
236 }
237
238 cancelWithStatus(status: Status, details: string): void {
239 this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
240 this.child?.cancelWithStatus(status, details);
241 this.outputStatus({code: status, details: details, metadata: new Metadata()}, 'PROCESSED');
242 }
243 getPeer(): string {
244 return this.child?.getPeer() ?? this.channel.getTarget();
245 }
246 start(metadata: Metadata, listener: LoadBalancingCallInterceptingListener): void {
247 this.trace('start called');
248 this.listener = listener;
249 this.metadata = metadata;
250 this.doPick();
251 }
252 sendMessageWithContext(context: MessageContext, message: Buffer): void {
253 this.trace('write() called with message of length ' + message.length);
254 if (this.child) {
255 this.child.sendMessageWithContext(context, message);
256 } else {
257 this.pendingMessage = {context, message};
258 }
259 }
260 startRead(): void {
261 this.trace('startRead called');
262 if (this.child) {
263 this.child.startRead();
264 } else {
265 this.readPending = true;
266 }
267 }
268 halfClose(): void {
269 this.trace('halfClose called');
270 if (this.child) {
271 this.child.halfClose();
272 } else {
273 this.pendingHalfClose = true;
274 }
275 }
276 setCredentials(credentials: CallCredentials): void {
277 throw new Error("Method not implemented.");
278 }
279
280 getCallNumber(): number {
281 return this.callNumber;
282 }
283}
\No newline at end of file