1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { CallCredentials } from './call-credentials';
|
19 | import {
|
20 | Call,
|
21 | InterceptingListener,
|
22 | MessageContext,
|
23 | StatusObject,
|
24 | } from './call-interface';
|
25 | import { SubchannelCall } from './subchannel-call';
|
26 | import { ConnectivityState } from './connectivity-state';
|
27 | import { LogVerbosity, Status } from './constants';
|
28 | import { Deadline, getDeadlineTimeoutString } from './deadline';
|
29 | import { InternalChannel } from './internal-channel';
|
30 | import { Metadata } from './metadata';
|
31 | import { PickResultType } from './picker';
|
32 | import { CallConfig } from './resolver';
|
33 | import { splitHostPort } from './uri-parser';
|
34 | import * as logging from './logging';
|
35 | import { restrictControlPlaneStatusCode } from './control-plane-status';
|
36 | import * as http2 from 'http2';
|
37 |
|
38 | const TRACER_NAME = 'load_balancing_call';
|
39 |
|
40 | export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED';
|
41 |
|
42 | export interface StatusObjectWithProgress extends StatusObject {
|
43 | progress: RpcProgress;
|
44 | }
|
45 |
|
46 | export interface LoadBalancingCallInterceptingListener
|
47 | extends InterceptingListener {
|
48 | onReceiveStatus(status: StatusObjectWithProgress): void;
|
49 | }
|
50 |
|
51 | export class LoadBalancingCall implements Call {
|
52 | private child: SubchannelCall | null = null;
|
53 | private readPending = false;
|
54 | private pendingMessage: { context: MessageContext; message: Buffer } | null =
|
55 | null;
|
56 | private pendingHalfClose = false;
|
57 | private ended = false;
|
58 | private serviceUrl: string;
|
59 | private metadata: Metadata | null = null;
|
60 | private listener: InterceptingListener | null = null;
|
61 | private onCallEnded: ((statusCode: Status) => void) | null = null;
|
62 | constructor(
|
63 | private readonly channel: InternalChannel,
|
64 | private readonly callConfig: CallConfig,
|
65 | private readonly methodName: string,
|
66 | private readonly host: string,
|
67 | private readonly credentials: CallCredentials,
|
68 | private readonly deadline: Deadline,
|
69 | private readonly callNumber: number
|
70 | ) {
|
71 | const splitPath: string[] = this.methodName.split('/');
|
72 | let serviceName = '';
|
73 | /* The standard path format is "/{serviceName}/{methodName}", so if we split
|
74 | * by '/', the first item should be empty and the second should be the
|
75 | * service name */
|
76 | if (splitPath.length >= 2) {
|
77 | serviceName = splitPath[1];
|
78 | }
|
79 | const hostname = splitHostPort(this.host)?.host ?? 'localhost';
|
80 | /* Currently, call credentials are only allowed on HTTPS connections, so we
|
81 | * can assume that the scheme is "https" */
|
82 | this.serviceUrl = `https://${hostname}/${serviceName}`;
|
83 | }
|
84 |
|
85 | private trace(text: string): void {
|
86 | logging.trace(
|
87 | LogVerbosity.DEBUG,
|
88 | TRACER_NAME,
|
89 | '[' + this.callNumber + '] ' + text
|
90 | );
|
91 | }
|
92 |
|
93 | private outputStatus(status: StatusObject, progress: RpcProgress) {
|
94 | if (!this.ended) {
|
95 | this.ended = true;
|
96 | this.trace(
|
97 | 'ended with status: code=' +
|
98 | status.code +
|
99 | ' details="' +
|
100 | status.details +
|
101 | '"'
|
102 | );
|
103 | const finalStatus = { ...status, progress };
|
104 | this.listener?.onReceiveStatus(finalStatus);
|
105 | this.onCallEnded?.(finalStatus.code);
|
106 | }
|
107 | }
|
108 |
|
109 | doPick() {
|
110 | if (this.ended) {
|
111 | return;
|
112 | }
|
113 | if (!this.metadata) {
|
114 | throw new Error('doPick called before start');
|
115 | }
|
116 | this.trace('Pick called');
|
117 | const pickResult = this.channel.doPick(
|
118 | this.metadata,
|
119 | this.callConfig.pickInformation
|
120 | );
|
121 | const subchannelString = pickResult.subchannel
|
122 | ? '(' +
|
123 | pickResult.subchannel.getChannelzRef().id +
|
124 | ') ' +
|
125 | pickResult.subchannel.getAddress()
|
126 | : '' + pickResult.subchannel;
|
127 | this.trace(
|
128 | 'Pick result: ' +
|
129 | PickResultType[pickResult.pickResultType] +
|
130 | ' subchannel: ' +
|
131 | subchannelString +
|
132 | ' status: ' +
|
133 | pickResult.status?.code +
|
134 | ' ' +
|
135 | pickResult.status?.details
|
136 | );
|
137 | switch (pickResult.pickResultType) {
|
138 | case PickResultType.COMPLETE:
|
139 | this.credentials
|
140 | .generateMetadata({ service_url: this.serviceUrl })
|
141 | .then(
|
142 | credsMetadata => {
|
143 | |
144 |
|
145 |
|
146 | if (this.ended) {
|
147 | this.trace('Credentials metadata generation finished after call ended');
|
148 | return;
|
149 | }
|
150 | const finalMetadata = this.metadata!.clone();
|
151 | finalMetadata.merge(credsMetadata);
|
152 | if (finalMetadata.get('authorization').length > 1) {
|
153 | this.outputStatus(
|
154 | {
|
155 | code: Status.INTERNAL,
|
156 | details:
|
157 | '"authorization" metadata cannot have multiple values',
|
158 | metadata: new Metadata(),
|
159 | },
|
160 | 'PROCESSED'
|
161 | );
|
162 | }
|
163 | if (
|
164 | pickResult.subchannel!.getConnectivityState() !==
|
165 | ConnectivityState.READY
|
166 | ) {
|
167 | this.trace(
|
168 | 'Picked subchannel ' +
|
169 | subchannelString +
|
170 | ' has state ' +
|
171 | ConnectivityState[
|
172 | pickResult.subchannel!.getConnectivityState()
|
173 | ] +
|
174 | ' after getting credentials metadata. Retrying pick'
|
175 | );
|
176 | this.doPick();
|
177 | return;
|
178 | }
|
179 |
|
180 | if (this.deadline !== Infinity) {
|
181 | finalMetadata.set(
|
182 | 'grpc-timeout',
|
183 | getDeadlineTimeoutString(this.deadline)
|
184 | );
|
185 | }
|
186 | try {
|
187 | this.child = pickResult
|
188 | .subchannel!.getRealSubchannel()
|
189 | .createCall(finalMetadata, this.host, this.methodName, {
|
190 | onReceiveMetadata: metadata => {
|
191 | this.trace('Received metadata');
|
192 | this.listener!.onReceiveMetadata(metadata);
|
193 | },
|
194 | onReceiveMessage: message => {
|
195 | this.trace('Received message');
|
196 | this.listener!.onReceiveMessage(message);
|
197 | },
|
198 | onReceiveStatus: status => {
|
199 | this.trace('Received status');
|
200 | if (
|
201 | status.rstCode ===
|
202 | http2.constants.NGHTTP2_REFUSED_STREAM
|
203 | ) {
|
204 | this.outputStatus(status, 'REFUSED');
|
205 | } else {
|
206 | this.outputStatus(status, 'PROCESSED');
|
207 | }
|
208 | },
|
209 | });
|
210 | } catch (error) {
|
211 | this.trace(
|
212 | 'Failed to start call on picked subchannel ' +
|
213 | subchannelString +
|
214 | ' with error ' +
|
215 | (error as Error).message
|
216 | );
|
217 | this.outputStatus(
|
218 | {
|
219 | code: Status.INTERNAL,
|
220 | details:
|
221 | 'Failed to start HTTP/2 stream with error ' +
|
222 | (error as Error).message,
|
223 | metadata: new Metadata(),
|
224 | },
|
225 | 'NOT_STARTED'
|
226 | );
|
227 | return;
|
228 | }
|
229 | this.callConfig.onCommitted?.();
|
230 | pickResult.onCallStarted?.();
|
231 | this.onCallEnded = pickResult.onCallEnded;
|
232 | this.trace(
|
233 | 'Created child call [' + this.child.getCallNumber() + ']'
|
234 | );
|
235 | if (this.readPending) {
|
236 | this.child.startRead();
|
237 | }
|
238 | if (this.pendingMessage) {
|
239 | this.child.sendMessageWithContext(
|
240 | this.pendingMessage.context,
|
241 | this.pendingMessage.message
|
242 | );
|
243 | }
|
244 | if (this.pendingHalfClose) {
|
245 | this.child.halfClose();
|
246 | }
|
247 | },
|
248 | (error: Error & { code: number }) => {
|
249 |
|
250 | const { code, details } = restrictControlPlaneStatusCode(
|
251 | typeof error.code === 'number' ? error.code : Status.UNKNOWN,
|
252 | `Getting metadata from plugin failed with error: ${error.message}`
|
253 | );
|
254 | this.outputStatus(
|
255 | {
|
256 | code: code,
|
257 | details: details,
|
258 | metadata: new Metadata(),
|
259 | },
|
260 | 'PROCESSED'
|
261 | );
|
262 | }
|
263 | );
|
264 | break;
|
265 | case PickResultType.DROP:
|
266 | const { code, details } = restrictControlPlaneStatusCode(
|
267 | pickResult.status!.code,
|
268 | pickResult.status!.details
|
269 | );
|
270 | setImmediate(() => {
|
271 | this.outputStatus(
|
272 | { code, details, metadata: pickResult.status!.metadata },
|
273 | 'DROP'
|
274 | );
|
275 | });
|
276 | break;
|
277 | case PickResultType.TRANSIENT_FAILURE:
|
278 | if (this.metadata.getOptions().waitForReady) {
|
279 | this.channel.queueCallForPick(this);
|
280 | } else {
|
281 | const { code, details } = restrictControlPlaneStatusCode(
|
282 | pickResult.status!.code,
|
283 | pickResult.status!.details
|
284 | );
|
285 | setImmediate(() => {
|
286 | this.outputStatus(
|
287 | { code, details, metadata: pickResult.status!.metadata },
|
288 | 'PROCESSED'
|
289 | );
|
290 | });
|
291 | }
|
292 | break;
|
293 | case PickResultType.QUEUE:
|
294 | this.channel.queueCallForPick(this);
|
295 | }
|
296 | }
|
297 |
|
298 | cancelWithStatus(status: Status, details: string): void {
|
299 | this.trace(
|
300 | 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
|
301 | );
|
302 | this.child?.cancelWithStatus(status, details);
|
303 | this.outputStatus(
|
304 | { code: status, details: details, metadata: new Metadata() },
|
305 | 'PROCESSED'
|
306 | );
|
307 | }
|
308 | getPeer(): string {
|
309 | return this.child?.getPeer() ?? this.channel.getTarget();
|
310 | }
|
311 | start(
|
312 | metadata: Metadata,
|
313 | listener: LoadBalancingCallInterceptingListener
|
314 | ): void {
|
315 | this.trace('start called');
|
316 | this.listener = listener;
|
317 | this.metadata = metadata;
|
318 | this.doPick();
|
319 | }
|
320 | sendMessageWithContext(context: MessageContext, message: Buffer): void {
|
321 | this.trace('write() called with message of length ' + message.length);
|
322 | if (this.child) {
|
323 | this.child.sendMessageWithContext(context, message);
|
324 | } else {
|
325 | this.pendingMessage = { context, message };
|
326 | }
|
327 | }
|
328 | startRead(): void {
|
329 | this.trace('startRead called');
|
330 | if (this.child) {
|
331 | this.child.startRead();
|
332 | } else {
|
333 | this.readPending = true;
|
334 | }
|
335 | }
|
336 | halfClose(): void {
|
337 | this.trace('halfClose called');
|
338 | if (this.child) {
|
339 | this.child.halfClose();
|
340 | } else {
|
341 | this.pendingHalfClose = true;
|
342 | }
|
343 | }
|
344 | setCredentials(credentials: CallCredentials): void {
|
345 | throw new Error('Method not implemented.');
|
346 | }
|
347 |
|
348 | getCallNumber(): number {
|
349 | return this.callNumber;
|
350 | }
|
351 | }
|
352 |
|
\ | No newline at end of file |