UNPKG

12.7 kBPlain TextView Raw
1/*
2 * Copyright 2022 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { CallCredentials } from './call-credentials';
19import {
20 Call,
21 DeadlineInfoProvider,
22 InterceptingListener,
23 MessageContext,
24 StatusObject,
25} from './call-interface';
26import { SubchannelCall } from './subchannel-call';
27import { ConnectivityState } from './connectivity-state';
28import { LogVerbosity, Status } from './constants';
29import { Deadline, formatDateDifference, getDeadlineTimeoutString } from './deadline';
30import { InternalChannel } from './internal-channel';
31import { Metadata } from './metadata';
32import { PickResultType } from './picker';
33import { CallConfig } from './resolver';
34import { splitHostPort } from './uri-parser';
35import * as logging from './logging';
36import { restrictControlPlaneStatusCode } from './control-plane-status';
37import * as http2 from 'http2';
38
39const TRACER_NAME = 'load_balancing_call';
40
41export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED';
42
43export interface StatusObjectWithProgress extends StatusObject {
44 progress: RpcProgress;
45}
46
47export interface LoadBalancingCallInterceptingListener
48 extends InterceptingListener {
49 onReceiveStatus(status: StatusObjectWithProgress): void;
50}
51
52export class LoadBalancingCall implements Call, DeadlineInfoProvider {
53 private child: SubchannelCall | null = null;
54 private readPending = false;
55 private pendingMessage: { context: MessageContext; message: Buffer } | null =
56 null;
57 private pendingHalfClose = false;
58 private ended = false;
59 private serviceUrl: string;
60 private metadata: Metadata | null = null;
61 private listener: InterceptingListener | null = null;
62 private onCallEnded: ((statusCode: Status) => void) | null = null;
63 private startTime: Date;
64 private childStartTime: Date | null = null;
65 constructor(
66 private readonly channel: InternalChannel,
67 private readonly callConfig: CallConfig,
68 private readonly methodName: string,
69 private readonly host: string,
70 private readonly credentials: CallCredentials,
71 private readonly deadline: Deadline,
72 private readonly callNumber: number
73 ) {
74 const splitPath: string[] = this.methodName.split('/');
75 let serviceName = '';
76 /* The standard path format is "/{serviceName}/{methodName}", so if we split
77 * by '/', the first item should be empty and the second should be the
78 * service name */
79 if (splitPath.length >= 2) {
80 serviceName = splitPath[1];
81 }
82 const hostname = splitHostPort(this.host)?.host ?? 'localhost';
83 /* Currently, call credentials are only allowed on HTTPS connections, so we
84 * can assume that the scheme is "https" */
85 this.serviceUrl = `https://${hostname}/${serviceName}`;
86 this.startTime = new Date();
87 }
88 getDeadlineInfo(): string[] {
89 const deadlineInfo: string[] = [];
90 if (this.childStartTime) {
91 if (this.childStartTime > this.startTime) {
92 if (this.metadata?.getOptions().waitForReady) {
93 deadlineInfo.push('wait_for_ready');
94 }
95 deadlineInfo.push(`LB pick: ${formatDateDifference(this.startTime, this.childStartTime)}`);
96 }
97 deadlineInfo.push(...this.child!.getDeadlineInfo());
98 return deadlineInfo;
99 } else {
100 if (this.metadata?.getOptions().waitForReady) {
101 deadlineInfo.push('wait_for_ready');
102 }
103 deadlineInfo.push('Waiting for LB pick');
104 }
105 return deadlineInfo;
106 }
107
108 private trace(text: string): void {
109 logging.trace(
110 LogVerbosity.DEBUG,
111 TRACER_NAME,
112 '[' + this.callNumber + '] ' + text
113 );
114 }
115
116 private outputStatus(status: StatusObject, progress: RpcProgress) {
117 if (!this.ended) {
118 this.ended = true;
119 this.trace(
120 'ended with status: code=' +
121 status.code +
122 ' details="' +
123 status.details +
124 '" start time=' +
125 this.startTime.toISOString()
126 );
127 const finalStatus = { ...status, progress };
128 this.listener?.onReceiveStatus(finalStatus);
129 this.onCallEnded?.(finalStatus.code);
130 }
131 }
132
133 doPick() {
134 if (this.ended) {
135 return;
136 }
137 if (!this.metadata) {
138 throw new Error('doPick called before start');
139 }
140 this.trace('Pick called');
141 const finalMetadata = this.metadata.clone();
142 const pickResult = this.channel.doPick(
143 finalMetadata,
144 this.callConfig.pickInformation
145 );
146 const subchannelString = pickResult.subchannel
147 ? '(' +
148 pickResult.subchannel.getChannelzRef().id +
149 ') ' +
150 pickResult.subchannel.getAddress()
151 : '' + pickResult.subchannel;
152 this.trace(
153 'Pick result: ' +
154 PickResultType[pickResult.pickResultType] +
155 ' subchannel: ' +
156 subchannelString +
157 ' status: ' +
158 pickResult.status?.code +
159 ' ' +
160 pickResult.status?.details
161 );
162 switch (pickResult.pickResultType) {
163 case PickResultType.COMPLETE:
164 this.credentials
165 .generateMetadata({ service_url: this.serviceUrl })
166 .then(
167 credsMetadata => {
168 /* If this call was cancelled (e.g. by the deadline) before
169 * metadata generation finished, we shouldn't do anything with
170 * it. */
171 if (this.ended) {
172 this.trace(
173 'Credentials metadata generation finished after call ended'
174 );
175 return;
176 }
177 finalMetadata.merge(credsMetadata);
178 if (finalMetadata.get('authorization').length > 1) {
179 this.outputStatus(
180 {
181 code: Status.INTERNAL,
182 details:
183 '"authorization" metadata cannot have multiple values',
184 metadata: new Metadata(),
185 },
186 'PROCESSED'
187 );
188 }
189 if (
190 pickResult.subchannel!.getConnectivityState() !==
191 ConnectivityState.READY
192 ) {
193 this.trace(
194 'Picked subchannel ' +
195 subchannelString +
196 ' has state ' +
197 ConnectivityState[
198 pickResult.subchannel!.getConnectivityState()
199 ] +
200 ' after getting credentials metadata. Retrying pick'
201 );
202 this.doPick();
203 return;
204 }
205
206 if (this.deadline !== Infinity) {
207 finalMetadata.set(
208 'grpc-timeout',
209 getDeadlineTimeoutString(this.deadline)
210 );
211 }
212 try {
213 this.child = pickResult
214 .subchannel!.getRealSubchannel()
215 .createCall(finalMetadata, this.host, this.methodName, {
216 onReceiveMetadata: metadata => {
217 this.trace('Received metadata');
218 this.listener!.onReceiveMetadata(metadata);
219 },
220 onReceiveMessage: message => {
221 this.trace('Received message');
222 this.listener!.onReceiveMessage(message);
223 },
224 onReceiveStatus: status => {
225 this.trace('Received status');
226 if (
227 status.rstCode ===
228 http2.constants.NGHTTP2_REFUSED_STREAM
229 ) {
230 this.outputStatus(status, 'REFUSED');
231 } else {
232 this.outputStatus(status, 'PROCESSED');
233 }
234 },
235 });
236 this.childStartTime = new Date();
237 } catch (error) {
238 this.trace(
239 'Failed to start call on picked subchannel ' +
240 subchannelString +
241 ' with error ' +
242 (error as Error).message
243 );
244 this.outputStatus(
245 {
246 code: Status.INTERNAL,
247 details:
248 'Failed to start HTTP/2 stream with error ' +
249 (error as Error).message,
250 metadata: new Metadata(),
251 },
252 'NOT_STARTED'
253 );
254 return;
255 }
256 this.callConfig.onCommitted?.();
257 pickResult.onCallStarted?.();
258 this.onCallEnded = pickResult.onCallEnded;
259 this.trace(
260 'Created child call [' + this.child.getCallNumber() + ']'
261 );
262 if (this.readPending) {
263 this.child.startRead();
264 }
265 if (this.pendingMessage) {
266 this.child.sendMessageWithContext(
267 this.pendingMessage.context,
268 this.pendingMessage.message
269 );
270 }
271 if (this.pendingHalfClose) {
272 this.child.halfClose();
273 }
274 },
275 (error: Error & { code: number }) => {
276 // We assume the error code isn't 0 (Status.OK)
277 const { code, details } = restrictControlPlaneStatusCode(
278 typeof error.code === 'number' ? error.code : Status.UNKNOWN,
279 `Getting metadata from plugin failed with error: ${error.message}`
280 );
281 this.outputStatus(
282 {
283 code: code,
284 details: details,
285 metadata: new Metadata(),
286 },
287 'PROCESSED'
288 );
289 }
290 );
291 break;
292 case PickResultType.DROP:
293 const { code, details } = restrictControlPlaneStatusCode(
294 pickResult.status!.code,
295 pickResult.status!.details
296 );
297 setImmediate(() => {
298 this.outputStatus(
299 { code, details, metadata: pickResult.status!.metadata },
300 'DROP'
301 );
302 });
303 break;
304 case PickResultType.TRANSIENT_FAILURE:
305 if (this.metadata.getOptions().waitForReady) {
306 this.channel.queueCallForPick(this);
307 } else {
308 const { code, details } = restrictControlPlaneStatusCode(
309 pickResult.status!.code,
310 pickResult.status!.details
311 );
312 setImmediate(() => {
313 this.outputStatus(
314 { code, details, metadata: pickResult.status!.metadata },
315 'PROCESSED'
316 );
317 });
318 }
319 break;
320 case PickResultType.QUEUE:
321 this.channel.queueCallForPick(this);
322 }
323 }
324
325 cancelWithStatus(status: Status, details: string): void {
326 this.trace(
327 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
328 );
329 this.child?.cancelWithStatus(status, details);
330 this.outputStatus(
331 { code: status, details: details, metadata: new Metadata() },
332 'PROCESSED'
333 );
334 }
335 getPeer(): string {
336 return this.child?.getPeer() ?? this.channel.getTarget();
337 }
338 start(
339 metadata: Metadata,
340 listener: LoadBalancingCallInterceptingListener
341 ): void {
342 this.trace('start called');
343 this.listener = listener;
344 this.metadata = metadata;
345 this.doPick();
346 }
347 sendMessageWithContext(context: MessageContext, message: Buffer): void {
348 this.trace('write() called with message of length ' + message.length);
349 if (this.child) {
350 this.child.sendMessageWithContext(context, message);
351 } else {
352 this.pendingMessage = { context, message };
353 }
354 }
355 startRead(): void {
356 this.trace('startRead called');
357 if (this.child) {
358 this.child.startRead();
359 } else {
360 this.readPending = true;
361 }
362 }
363 halfClose(): void {
364 this.trace('halfClose called');
365 if (this.child) {
366 this.child.halfClose();
367 } else {
368 this.pendingHalfClose = true;
369 }
370 }
371 setCredentials(credentials: CallCredentials): void {
372 throw new Error('Method not implemented.');
373 }
374
375 getCallNumber(): number {
376 return this.callNumber;
377 }
378}
379
\No newline at end of file