UNPKG

11.8 kBPlain TextView Raw
1/*
2 * Copyright 2022 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { CallCredentials } from './call-credentials';
19import {
20 Call,
21 InterceptingListener,
22 MessageContext,
23 StatusObject,
24} from './call-interface';
25import { SubchannelCall } from './subchannel-call';
26import { ConnectivityState } from './connectivity-state';
27import { LogVerbosity, Status } from './constants';
28import { Deadline, getDeadlineTimeoutString } from './deadline';
29import { InternalChannel } from './internal-channel';
30import { Metadata } from './metadata';
31import { PickResultType } from './picker';
32import { CallConfig } from './resolver';
33import { splitHostPort } from './uri-parser';
34import * as logging from './logging';
35import { restrictControlPlaneStatusCode } from './control-plane-status';
36import * as http2 from 'http2';
37
38const TRACER_NAME = 'load_balancing_call';
39
40export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED';
41
42export interface StatusObjectWithProgress extends StatusObject {
43 progress: RpcProgress;
44}
45
46export interface LoadBalancingCallInterceptingListener
47 extends InterceptingListener {
48 onReceiveStatus(status: StatusObjectWithProgress): void;
49}
50
51export class LoadBalancingCall implements Call {
52 private child: SubchannelCall | null = null;
53 private readPending = false;
54 private pendingMessage: { context: MessageContext; message: Buffer } | null =
55 null;
56 private pendingHalfClose = false;
57 private ended = false;
58 private serviceUrl: string;
59 private metadata: Metadata | null = null;
60 private listener: InterceptingListener | null = null;
61 private onCallEnded: ((statusCode: Status) => void) | null = null;
62 constructor(
63 private readonly channel: InternalChannel,
64 private readonly callConfig: CallConfig,
65 private readonly methodName: string,
66 private readonly host: string,
67 private readonly credentials: CallCredentials,
68 private readonly deadline: Deadline,
69 private readonly callNumber: number
70 ) {
71 const splitPath: string[] = this.methodName.split('/');
72 let serviceName = '';
73 /* The standard path format is "/{serviceName}/{methodName}", so if we split
74 * by '/', the first item should be empty and the second should be the
75 * service name */
76 if (splitPath.length >= 2) {
77 serviceName = splitPath[1];
78 }
79 const hostname = splitHostPort(this.host)?.host ?? 'localhost';
80 /* Currently, call credentials are only allowed on HTTPS connections, so we
81 * can assume that the scheme is "https" */
82 this.serviceUrl = `https://${hostname}/${serviceName}`;
83 }
84
85 private trace(text: string): void {
86 logging.trace(
87 LogVerbosity.DEBUG,
88 TRACER_NAME,
89 '[' + this.callNumber + '] ' + text
90 );
91 }
92
93 private outputStatus(status: StatusObject, progress: RpcProgress) {
94 if (!this.ended) {
95 this.ended = true;
96 this.trace(
97 'ended with status: code=' +
98 status.code +
99 ' details="' +
100 status.details +
101 '"'
102 );
103 const finalStatus = { ...status, progress };
104 this.listener?.onReceiveStatus(finalStatus);
105 this.onCallEnded?.(finalStatus.code);
106 }
107 }
108
109 doPick() {
110 if (this.ended) {
111 return;
112 }
113 if (!this.metadata) {
114 throw new Error('doPick called before start');
115 }
116 this.trace('Pick called');
117 const pickResult = this.channel.doPick(
118 this.metadata,
119 this.callConfig.pickInformation
120 );
121 const subchannelString = pickResult.subchannel
122 ? '(' +
123 pickResult.subchannel.getChannelzRef().id +
124 ') ' +
125 pickResult.subchannel.getAddress()
126 : '' + pickResult.subchannel;
127 this.trace(
128 'Pick result: ' +
129 PickResultType[pickResult.pickResultType] +
130 ' subchannel: ' +
131 subchannelString +
132 ' status: ' +
133 pickResult.status?.code +
134 ' ' +
135 pickResult.status?.details
136 );
137 switch (pickResult.pickResultType) {
138 case PickResultType.COMPLETE:
139 this.credentials
140 .generateMetadata({ service_url: this.serviceUrl })
141 .then(
142 credsMetadata => {
143 /* If this call was cancelled (e.g. by the deadline) before
144 * metadata generation finished, we shouldn't do anything with
145 * it. */
146 if (this.ended) {
147 this.trace('Credentials metadata generation finished after call ended');
148 return;
149 }
150 const finalMetadata = this.metadata!.clone();
151 finalMetadata.merge(credsMetadata);
152 if (finalMetadata.get('authorization').length > 1) {
153 this.outputStatus(
154 {
155 code: Status.INTERNAL,
156 details:
157 '"authorization" metadata cannot have multiple values',
158 metadata: new Metadata(),
159 },
160 'PROCESSED'
161 );
162 }
163 if (
164 pickResult.subchannel!.getConnectivityState() !==
165 ConnectivityState.READY
166 ) {
167 this.trace(
168 'Picked subchannel ' +
169 subchannelString +
170 ' has state ' +
171 ConnectivityState[
172 pickResult.subchannel!.getConnectivityState()
173 ] +
174 ' after getting credentials metadata. Retrying pick'
175 );
176 this.doPick();
177 return;
178 }
179
180 if (this.deadline !== Infinity) {
181 finalMetadata.set(
182 'grpc-timeout',
183 getDeadlineTimeoutString(this.deadline)
184 );
185 }
186 try {
187 this.child = pickResult
188 .subchannel!.getRealSubchannel()
189 .createCall(finalMetadata, this.host, this.methodName, {
190 onReceiveMetadata: metadata => {
191 this.trace('Received metadata');
192 this.listener!.onReceiveMetadata(metadata);
193 },
194 onReceiveMessage: message => {
195 this.trace('Received message');
196 this.listener!.onReceiveMessage(message);
197 },
198 onReceiveStatus: status => {
199 this.trace('Received status');
200 if (
201 status.rstCode ===
202 http2.constants.NGHTTP2_REFUSED_STREAM
203 ) {
204 this.outputStatus(status, 'REFUSED');
205 } else {
206 this.outputStatus(status, 'PROCESSED');
207 }
208 },
209 });
210 } catch (error) {
211 this.trace(
212 'Failed to start call on picked subchannel ' +
213 subchannelString +
214 ' with error ' +
215 (error as Error).message
216 );
217 this.outputStatus(
218 {
219 code: Status.INTERNAL,
220 details:
221 'Failed to start HTTP/2 stream with error ' +
222 (error as Error).message,
223 metadata: new Metadata(),
224 },
225 'NOT_STARTED'
226 );
227 return;
228 }
229 this.callConfig.onCommitted?.();
230 pickResult.onCallStarted?.();
231 this.onCallEnded = pickResult.onCallEnded;
232 this.trace(
233 'Created child call [' + this.child.getCallNumber() + ']'
234 );
235 if (this.readPending) {
236 this.child.startRead();
237 }
238 if (this.pendingMessage) {
239 this.child.sendMessageWithContext(
240 this.pendingMessage.context,
241 this.pendingMessage.message
242 );
243 }
244 if (this.pendingHalfClose) {
245 this.child.halfClose();
246 }
247 },
248 (error: Error & { code: number }) => {
249 // We assume the error code isn't 0 (Status.OK)
250 const { code, details } = restrictControlPlaneStatusCode(
251 typeof error.code === 'number' ? error.code : Status.UNKNOWN,
252 `Getting metadata from plugin failed with error: ${error.message}`
253 );
254 this.outputStatus(
255 {
256 code: code,
257 details: details,
258 metadata: new Metadata(),
259 },
260 'PROCESSED'
261 );
262 }
263 );
264 break;
265 case PickResultType.DROP:
266 const { code, details } = restrictControlPlaneStatusCode(
267 pickResult.status!.code,
268 pickResult.status!.details
269 );
270 setImmediate(() => {
271 this.outputStatus(
272 { code, details, metadata: pickResult.status!.metadata },
273 'DROP'
274 );
275 });
276 break;
277 case PickResultType.TRANSIENT_FAILURE:
278 if (this.metadata.getOptions().waitForReady) {
279 this.channel.queueCallForPick(this);
280 } else {
281 const { code, details } = restrictControlPlaneStatusCode(
282 pickResult.status!.code,
283 pickResult.status!.details
284 );
285 setImmediate(() => {
286 this.outputStatus(
287 { code, details, metadata: pickResult.status!.metadata },
288 'PROCESSED'
289 );
290 });
291 }
292 break;
293 case PickResultType.QUEUE:
294 this.channel.queueCallForPick(this);
295 }
296 }
297
298 cancelWithStatus(status: Status, details: string): void {
299 this.trace(
300 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
301 );
302 this.child?.cancelWithStatus(status, details);
303 this.outputStatus(
304 { code: status, details: details, metadata: new Metadata() },
305 'PROCESSED'
306 );
307 }
308 getPeer(): string {
309 return this.child?.getPeer() ?? this.channel.getTarget();
310 }
311 start(
312 metadata: Metadata,
313 listener: LoadBalancingCallInterceptingListener
314 ): void {
315 this.trace('start called');
316 this.listener = listener;
317 this.metadata = metadata;
318 this.doPick();
319 }
320 sendMessageWithContext(context: MessageContext, message: Buffer): void {
321 this.trace('write() called with message of length ' + message.length);
322 if (this.child) {
323 this.child.sendMessageWithContext(context, message);
324 } else {
325 this.pendingMessage = { context, message };
326 }
327 }
328 startRead(): void {
329 this.trace('startRead called');
330 if (this.child) {
331 this.child.startRead();
332 } else {
333 this.readPending = true;
334 }
335 }
336 halfClose(): void {
337 this.trace('halfClose called');
338 if (this.child) {
339 this.child.halfClose();
340 } else {
341 this.pendingHalfClose = true;
342 }
343 }
344 setCredentials(credentials: CallCredentials): void {
345 throw new Error('Method not implemented.');
346 }
347
348 getCallNumber(): number {
349 return this.callNumber;
350 }
351}
352
\No newline at end of file