1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { CallCredentials } from './call-credentials';
|
19 | import {
|
20 | Call,
|
21 | DeadlineInfoProvider,
|
22 | InterceptingListener,
|
23 | MessageContext,
|
24 | StatusObject,
|
25 | } from './call-interface';
|
26 | import { SubchannelCall } from './subchannel-call';
|
27 | import { ConnectivityState } from './connectivity-state';
|
28 | import { LogVerbosity, Status } from './constants';
|
29 | import { Deadline, formatDateDifference, getDeadlineTimeoutString } from './deadline';
|
30 | import { InternalChannel } from './internal-channel';
|
31 | import { Metadata } from './metadata';
|
32 | import { PickResultType } from './picker';
|
33 | import { CallConfig } from './resolver';
|
34 | import { splitHostPort } from './uri-parser';
|
35 | import * as logging from './logging';
|
36 | import { restrictControlPlaneStatusCode } from './control-plane-status';
|
37 | import * as http2 from 'http2';
|
38 |
|
39 | const TRACER_NAME = 'load_balancing_call';
|
40 |
|
41 | export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED';
|
42 |
|
43 | export interface StatusObjectWithProgress extends StatusObject {
|
44 | progress: RpcProgress;
|
45 | }
|
46 |
|
47 | export interface LoadBalancingCallInterceptingListener
|
48 | extends InterceptingListener {
|
49 | onReceiveStatus(status: StatusObjectWithProgress): void;
|
50 | }
|
51 |
|
52 | export class LoadBalancingCall implements Call, DeadlineInfoProvider {
|
53 | private child: SubchannelCall | null = null;
|
54 | private readPending = false;
|
55 | private pendingMessage: { context: MessageContext; message: Buffer } | null =
|
56 | null;
|
57 | private pendingHalfClose = false;
|
58 | private ended = false;
|
59 | private serviceUrl: string;
|
60 | private metadata: Metadata | null = null;
|
61 | private listener: InterceptingListener | null = null;
|
62 | private onCallEnded: ((statusCode: Status) => void) | null = null;
|
63 | private startTime: Date;
|
64 | private childStartTime: Date | null = null;
|
65 | constructor(
|
66 | private readonly channel: InternalChannel,
|
67 | private readonly callConfig: CallConfig,
|
68 | private readonly methodName: string,
|
69 | private readonly host: string,
|
70 | private readonly credentials: CallCredentials,
|
71 | private readonly deadline: Deadline,
|
72 | private readonly callNumber: number
|
73 | ) {
|
74 | const splitPath: string[] = this.methodName.split('/');
|
75 | let serviceName = '';
|
76 | /* The standard path format is "/{serviceName}/{methodName}", so if we split
|
77 | * by '/', the first item should be empty and the second should be the
|
78 | * service name */
|
79 | if (splitPath.length >= 2) {
|
80 | serviceName = splitPath[1];
|
81 | }
|
82 | const hostname = splitHostPort(this.host)?.host ?? 'localhost';
|
83 | /* Currently, call credentials are only allowed on HTTPS connections, so we
|
84 | * can assume that the scheme is "https" */
|
85 | this.serviceUrl = `https://${hostname}/${serviceName}`;
|
86 | this.startTime = new Date();
|
87 | }
|
88 | getDeadlineInfo(): string[] {
|
89 | const deadlineInfo: string[] = [];
|
90 | if (this.childStartTime) {
|
91 | if (this.childStartTime > this.startTime) {
|
92 | if (this.metadata?.getOptions().waitForReady) {
|
93 | deadlineInfo.push('wait_for_ready');
|
94 | }
|
95 | deadlineInfo.push(`LB pick: ${formatDateDifference(this.startTime, this.childStartTime)}`);
|
96 | }
|
97 | deadlineInfo.push(...this.child!.getDeadlineInfo());
|
98 | return deadlineInfo;
|
99 | } else {
|
100 | if (this.metadata?.getOptions().waitForReady) {
|
101 | deadlineInfo.push('wait_for_ready');
|
102 | }
|
103 | deadlineInfo.push('Waiting for LB pick');
|
104 | }
|
105 | return deadlineInfo;
|
106 | }
|
107 |
|
108 | private trace(text: string): void {
|
109 | logging.trace(
|
110 | LogVerbosity.DEBUG,
|
111 | TRACER_NAME,
|
112 | '[' + this.callNumber + '] ' + text
|
113 | );
|
114 | }
|
115 |
|
116 | private outputStatus(status: StatusObject, progress: RpcProgress) {
|
117 | if (!this.ended) {
|
118 | this.ended = true;
|
119 | this.trace(
|
120 | 'ended with status: code=' +
|
121 | status.code +
|
122 | ' details="' +
|
123 | status.details +
|
124 | '" start time=' +
|
125 | this.startTime.toISOString()
|
126 | );
|
127 | const finalStatus = { ...status, progress };
|
128 | this.listener?.onReceiveStatus(finalStatus);
|
129 | this.onCallEnded?.(finalStatus.code);
|
130 | }
|
131 | }
|
132 |
|
133 | doPick() {
|
134 | if (this.ended) {
|
135 | return;
|
136 | }
|
137 | if (!this.metadata) {
|
138 | throw new Error('doPick called before start');
|
139 | }
|
140 | this.trace('Pick called');
|
141 | const finalMetadata = this.metadata.clone();
|
142 | const pickResult = this.channel.doPick(
|
143 | finalMetadata,
|
144 | this.callConfig.pickInformation
|
145 | );
|
146 | const subchannelString = pickResult.subchannel
|
147 | ? '(' +
|
148 | pickResult.subchannel.getChannelzRef().id +
|
149 | ') ' +
|
150 | pickResult.subchannel.getAddress()
|
151 | : '' + pickResult.subchannel;
|
152 | this.trace(
|
153 | 'Pick result: ' +
|
154 | PickResultType[pickResult.pickResultType] +
|
155 | ' subchannel: ' +
|
156 | subchannelString +
|
157 | ' status: ' +
|
158 | pickResult.status?.code +
|
159 | ' ' +
|
160 | pickResult.status?.details
|
161 | );
|
162 | switch (pickResult.pickResultType) {
|
163 | case PickResultType.COMPLETE:
|
164 | this.credentials
|
165 | .generateMetadata({ service_url: this.serviceUrl })
|
166 | .then(
|
167 | credsMetadata => {
|
168 | |
169 |
|
170 |
|
171 | if (this.ended) {
|
172 | this.trace(
|
173 | 'Credentials metadata generation finished after call ended'
|
174 | );
|
175 | return;
|
176 | }
|
177 | finalMetadata.merge(credsMetadata);
|
178 | if (finalMetadata.get('authorization').length > 1) {
|
179 | this.outputStatus(
|
180 | {
|
181 | code: Status.INTERNAL,
|
182 | details:
|
183 | '"authorization" metadata cannot have multiple values',
|
184 | metadata: new Metadata(),
|
185 | },
|
186 | 'PROCESSED'
|
187 | );
|
188 | }
|
189 | if (
|
190 | pickResult.subchannel!.getConnectivityState() !==
|
191 | ConnectivityState.READY
|
192 | ) {
|
193 | this.trace(
|
194 | 'Picked subchannel ' +
|
195 | subchannelString +
|
196 | ' has state ' +
|
197 | ConnectivityState[
|
198 | pickResult.subchannel!.getConnectivityState()
|
199 | ] +
|
200 | ' after getting credentials metadata. Retrying pick'
|
201 | );
|
202 | this.doPick();
|
203 | return;
|
204 | }
|
205 |
|
206 | if (this.deadline !== Infinity) {
|
207 | finalMetadata.set(
|
208 | 'grpc-timeout',
|
209 | getDeadlineTimeoutString(this.deadline)
|
210 | );
|
211 | }
|
212 | try {
|
213 | this.child = pickResult
|
214 | .subchannel!.getRealSubchannel()
|
215 | .createCall(finalMetadata, this.host, this.methodName, {
|
216 | onReceiveMetadata: metadata => {
|
217 | this.trace('Received metadata');
|
218 | this.listener!.onReceiveMetadata(metadata);
|
219 | },
|
220 | onReceiveMessage: message => {
|
221 | this.trace('Received message');
|
222 | this.listener!.onReceiveMessage(message);
|
223 | },
|
224 | onReceiveStatus: status => {
|
225 | this.trace('Received status');
|
226 | if (
|
227 | status.rstCode ===
|
228 | http2.constants.NGHTTP2_REFUSED_STREAM
|
229 | ) {
|
230 | this.outputStatus(status, 'REFUSED');
|
231 | } else {
|
232 | this.outputStatus(status, 'PROCESSED');
|
233 | }
|
234 | },
|
235 | });
|
236 | this.childStartTime = new Date();
|
237 | } catch (error) {
|
238 | this.trace(
|
239 | 'Failed to start call on picked subchannel ' +
|
240 | subchannelString +
|
241 | ' with error ' +
|
242 | (error as Error).message
|
243 | );
|
244 | this.outputStatus(
|
245 | {
|
246 | code: Status.INTERNAL,
|
247 | details:
|
248 | 'Failed to start HTTP/2 stream with error ' +
|
249 | (error as Error).message,
|
250 | metadata: new Metadata(),
|
251 | },
|
252 | 'NOT_STARTED'
|
253 | );
|
254 | return;
|
255 | }
|
256 | this.callConfig.onCommitted?.();
|
257 | pickResult.onCallStarted?.();
|
258 | this.onCallEnded = pickResult.onCallEnded;
|
259 | this.trace(
|
260 | 'Created child call [' + this.child.getCallNumber() + ']'
|
261 | );
|
262 | if (this.readPending) {
|
263 | this.child.startRead();
|
264 | }
|
265 | if (this.pendingMessage) {
|
266 | this.child.sendMessageWithContext(
|
267 | this.pendingMessage.context,
|
268 | this.pendingMessage.message
|
269 | );
|
270 | }
|
271 | if (this.pendingHalfClose) {
|
272 | this.child.halfClose();
|
273 | }
|
274 | },
|
275 | (error: Error & { code: number }) => {
|
276 |
|
277 | const { code, details } = restrictControlPlaneStatusCode(
|
278 | typeof error.code === 'number' ? error.code : Status.UNKNOWN,
|
279 | `Getting metadata from plugin failed with error: ${error.message}`
|
280 | );
|
281 | this.outputStatus(
|
282 | {
|
283 | code: code,
|
284 | details: details,
|
285 | metadata: new Metadata(),
|
286 | },
|
287 | 'PROCESSED'
|
288 | );
|
289 | }
|
290 | );
|
291 | break;
|
292 | case PickResultType.DROP:
|
293 | const { code, details } = restrictControlPlaneStatusCode(
|
294 | pickResult.status!.code,
|
295 | pickResult.status!.details
|
296 | );
|
297 | setImmediate(() => {
|
298 | this.outputStatus(
|
299 | { code, details, metadata: pickResult.status!.metadata },
|
300 | 'DROP'
|
301 | );
|
302 | });
|
303 | break;
|
304 | case PickResultType.TRANSIENT_FAILURE:
|
305 | if (this.metadata.getOptions().waitForReady) {
|
306 | this.channel.queueCallForPick(this);
|
307 | } else {
|
308 | const { code, details } = restrictControlPlaneStatusCode(
|
309 | pickResult.status!.code,
|
310 | pickResult.status!.details
|
311 | );
|
312 | setImmediate(() => {
|
313 | this.outputStatus(
|
314 | { code, details, metadata: pickResult.status!.metadata },
|
315 | 'PROCESSED'
|
316 | );
|
317 | });
|
318 | }
|
319 | break;
|
320 | case PickResultType.QUEUE:
|
321 | this.channel.queueCallForPick(this);
|
322 | }
|
323 | }
|
324 |
|
325 | cancelWithStatus(status: Status, details: string): void {
|
326 | this.trace(
|
327 | 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
|
328 | );
|
329 | this.child?.cancelWithStatus(status, details);
|
330 | this.outputStatus(
|
331 | { code: status, details: details, metadata: new Metadata() },
|
332 | 'PROCESSED'
|
333 | );
|
334 | }
|
335 | getPeer(): string {
|
336 | return this.child?.getPeer() ?? this.channel.getTarget();
|
337 | }
|
338 | start(
|
339 | metadata: Metadata,
|
340 | listener: LoadBalancingCallInterceptingListener
|
341 | ): void {
|
342 | this.trace('start called');
|
343 | this.listener = listener;
|
344 | this.metadata = metadata;
|
345 | this.doPick();
|
346 | }
|
347 | sendMessageWithContext(context: MessageContext, message: Buffer): void {
|
348 | this.trace('write() called with message of length ' + message.length);
|
349 | if (this.child) {
|
350 | this.child.sendMessageWithContext(context, message);
|
351 | } else {
|
352 | this.pendingMessage = { context, message };
|
353 | }
|
354 | }
|
355 | startRead(): void {
|
356 | this.trace('startRead called');
|
357 | if (this.child) {
|
358 | this.child.startRead();
|
359 | } else {
|
360 | this.readPending = true;
|
361 | }
|
362 | }
|
363 | halfClose(): void {
|
364 | this.trace('halfClose called');
|
365 | if (this.child) {
|
366 | this.child.halfClose();
|
367 | } else {
|
368 | this.pendingHalfClose = true;
|
369 | }
|
370 | }
|
371 | setCredentials(credentials: CallCredentials): void {
|
372 | throw new Error('Method not implemented.');
|
373 | }
|
374 |
|
375 | getCallNumber(): number {
|
376 | return this.callNumber;
|
377 | }
|
378 | }
|
379 |
|
\ | No newline at end of file |