1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { CallCredentials } from "./call-credentials";
|
19 | import { Call, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
|
20 | import { SubchannelCall } from "./subchannel-call";
|
21 | import { ConnectivityState } from "./connectivity-state";
|
22 | import { LogVerbosity, Status } from "./constants";
|
23 | import { Deadline, getDeadlineTimeoutString } from "./deadline";
|
24 | import { FilterStack, FilterStackFactory } from "./filter-stack";
|
25 | import { InternalChannel } from "./internal-channel";
|
26 | import { Metadata } from "./metadata";
|
27 | import { PickResultType } from "./picker";
|
28 | import { CallConfig } from "./resolver";
|
29 | import { splitHostPort } from "./uri-parser";
|
30 | import * as logging from './logging';
|
31 | import { restrictControlPlaneStatusCode } from "./control-plane-status";
|
32 | import * as http2 from 'http2';
|
33 |
|
34 | const TRACER_NAME = 'load_balancing_call';
|
35 |
|
36 | export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED';
|
37 |
|
38 | export interface StatusObjectWithProgress extends StatusObject {
|
39 | progress: RpcProgress;
|
40 | }
|
41 |
|
42 | export interface LoadBalancingCallInterceptingListener extends InterceptingListener {
|
43 | onReceiveStatus(status: StatusObjectWithProgress): void;
|
44 | }
|
45 |
|
46 | export class LoadBalancingCall implements Call {
|
47 | private child: SubchannelCall | null = null;
|
48 | private readPending = false;
|
49 | private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
|
50 | private pendingHalfClose = false;
|
51 | private pendingChildStatus: StatusObject | null = null;
|
52 | private ended = false;
|
53 | private serviceUrl: string;
|
54 | private metadata: Metadata | null = null;
|
55 | private listener: InterceptingListener | null = null;
|
56 | private onCallEnded: ((statusCode: Status) => void) | null = null;
|
57 | constructor(
|
58 | private readonly channel: InternalChannel,
|
59 | private readonly callConfig: CallConfig,
|
60 | private readonly methodName: string,
|
61 | private readonly host : string,
|
62 | private readonly credentials: CallCredentials,
|
63 | private readonly deadline: Deadline,
|
64 | private readonly callNumber: number
|
65 | ) {
|
66 | const splitPath: string[] = this.methodName.split('/');
|
67 | let serviceName = '';
|
68 | /* The standard path format is "/{serviceName}/{methodName}", so if we split
|
69 | * by '/', the first item should be empty and the second should be the
|
70 | * service name */
|
71 | if (splitPath.length >= 2) {
|
72 | serviceName = splitPath[1];
|
73 | }
|
74 | const hostname = splitHostPort(this.host)?.host ?? 'localhost';
|
75 | /* Currently, call credentials are only allowed on HTTPS connections, so we
|
76 | * can assume that the scheme is "https" */
|
77 | this.serviceUrl = `https://${hostname}/${serviceName}`;
|
78 | }
|
79 |
|
80 | private trace(text: string): void {
|
81 | logging.trace(
|
82 | LogVerbosity.DEBUG,
|
83 | TRACER_NAME,
|
84 | '[' + this.callNumber + '] ' + text
|
85 | );
|
86 | }
|
87 |
|
88 | private outputStatus(status: StatusObject, progress: RpcProgress) {
|
89 | if (!this.ended) {
|
90 | this.ended = true;
|
91 | this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"');
|
92 | const finalStatus = {...status, progress};
|
93 | this.listener?.onReceiveStatus(finalStatus);
|
94 | this.onCallEnded?.(finalStatus.code);
|
95 | }
|
96 | }
|
97 |
|
98 | doPick() {
|
99 | if (this.ended) {
|
100 | return;
|
101 | }
|
102 | if (!this.metadata) {
|
103 | throw new Error('doPick called before start');
|
104 | }
|
105 | this.trace('Pick called')
|
106 | const pickResult = this.channel.doPick(this.metadata, this.callConfig.pickInformation);
|
107 | const subchannelString = pickResult.subchannel ?
|
108 | '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
|
109 | '' + pickResult.subchannel;
|
110 | this.trace(
|
111 | 'Pick result: ' +
|
112 | PickResultType[pickResult.pickResultType] +
|
113 | ' subchannel: ' +
|
114 | subchannelString +
|
115 | ' status: ' +
|
116 | pickResult.status?.code +
|
117 | ' ' +
|
118 | pickResult.status?.details
|
119 | );
|
120 | switch (pickResult.pickResultType) {
|
121 | case PickResultType.COMPLETE:
|
122 | this.credentials.generateMetadata({service_url: this.serviceUrl}).then(
|
123 | (credsMetadata) => {
|
124 | const finalMetadata = this.metadata!.clone();
|
125 | finalMetadata.merge(credsMetadata);
|
126 | if (finalMetadata.get('authorization').length > 1) {
|
127 | this.outputStatus(
|
128 | {
|
129 | code: Status.INTERNAL,
|
130 | details: '"authorization" metadata cannot have multiple values',
|
131 | metadata: new Metadata()
|
132 | },
|
133 | 'PROCESSED'
|
134 | );
|
135 | }
|
136 | if (pickResult.subchannel!.getConnectivityState() !== ConnectivityState.READY) {
|
137 | this.trace(
|
138 | 'Picked subchannel ' +
|
139 | subchannelString +
|
140 | ' has state ' +
|
141 | ConnectivityState[pickResult.subchannel!.getConnectivityState()] +
|
142 | ' after getting credentials metadata. Retrying pick'
|
143 | );
|
144 | this.doPick();
|
145 | return;
|
146 | }
|
147 |
|
148 | if (this.deadline !== Infinity) {
|
149 | finalMetadata.set('grpc-timeout', getDeadlineTimeoutString(this.deadline));
|
150 | }
|
151 | try {
|
152 | this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, {
|
153 | onReceiveMetadata: metadata => {
|
154 | this.trace('Received metadata');
|
155 | this.listener!.onReceiveMetadata(metadata);
|
156 | },
|
157 | onReceiveMessage: message => {
|
158 | this.trace('Received message');
|
159 | this.listener!.onReceiveMessage(message);
|
160 | },
|
161 | onReceiveStatus: status => {
|
162 | this.trace('Received status');
|
163 | if (status.rstCode === http2.constants.NGHTTP2_REFUSED_STREAM) {
|
164 | this.outputStatus(status, 'REFUSED');
|
165 | } else {
|
166 | this.outputStatus(status, 'PROCESSED');
|
167 | }
|
168 | }
|
169 | });
|
170 | } catch (error) {
|
171 | this.trace(
|
172 | 'Failed to start call on picked subchannel ' +
|
173 | subchannelString +
|
174 | ' with error ' +
|
175 | (error as Error).message
|
176 | );
|
177 | this.outputStatus(
|
178 | {
|
179 | code: Status.INTERNAL,
|
180 | details: 'Failed to start HTTP/2 stream with error ' + (error as Error).message,
|
181 | metadata: new Metadata()
|
182 | },
|
183 | 'NOT_STARTED'
|
184 | );
|
185 | return;
|
186 | }
|
187 | this.callConfig.onCommitted?.();
|
188 | pickResult.onCallStarted?.();
|
189 | this.onCallEnded = pickResult.onCallEnded;
|
190 | this.trace('Created child call [' + this.child.getCallNumber() + ']');
|
191 | if (this.readPending) {
|
192 | this.child.startRead();
|
193 | }
|
194 | if (this.pendingMessage) {
|
195 | this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
|
196 | }
|
197 | if (this.pendingHalfClose) {
|
198 | this.child.halfClose();
|
199 | }
|
200 | }, (error: Error & { code: number }) => {
|
201 |
|
202 | const {code, details} = restrictControlPlaneStatusCode(
|
203 | typeof error.code === 'number' ? error.code : Status.UNKNOWN,
|
204 | `Getting metadata from plugin failed with error: ${error.message}`
|
205 | )
|
206 | this.outputStatus(
|
207 | {
|
208 | code: code,
|
209 | details: details,
|
210 | metadata: new Metadata()
|
211 | },
|
212 | 'PROCESSED'
|
213 | );
|
214 | }
|
215 | );
|
216 | break;
|
217 | case PickResultType.DROP:
|
218 | const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
|
219 | this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'DROP');
|
220 | break;
|
221 | case PickResultType.TRANSIENT_FAILURE:
|
222 | if (this.metadata.getOptions().waitForReady) {
|
223 | this.channel.queueCallForPick(this);
|
224 | } else {
|
225 | const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
|
226 | this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'PROCESSED');
|
227 | }
|
228 | break;
|
229 | case PickResultType.QUEUE:
|
230 | this.channel.queueCallForPick(this);
|
231 | }
|
232 | }
|
233 |
|
234 | cancelWithStatus(status: Status, details: string): void {
|
235 | this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
|
236 | this.child?.cancelWithStatus(status, details);
|
237 | this.outputStatus({code: status, details: details, metadata: new Metadata()}, 'PROCESSED');
|
238 | }
|
239 | getPeer(): string {
|
240 | return this.child?.getPeer() ?? this.channel.getTarget();
|
241 | }
|
242 | start(metadata: Metadata, listener: LoadBalancingCallInterceptingListener): void {
|
243 | this.trace('start called');
|
244 | this.listener = listener;
|
245 | this.metadata = metadata;
|
246 | this.doPick();
|
247 | }
|
248 | sendMessageWithContext(context: MessageContext, message: Buffer): void {
|
249 | this.trace('write() called with message of length ' + message.length);
|
250 | if (this.child) {
|
251 | this.child.sendMessageWithContext(context, message);
|
252 | } else {
|
253 | this.pendingMessage = {context, message};
|
254 | }
|
255 | }
|
256 | startRead(): void {
|
257 | this.trace('startRead called');
|
258 | if (this.child) {
|
259 | this.child.startRead();
|
260 | } else {
|
261 | this.readPending = true;
|
262 | }
|
263 | }
|
264 | halfClose(): void {
|
265 | this.trace('halfClose called');
|
266 | if (this.child) {
|
267 | this.child.halfClose();
|
268 | } else {
|
269 | this.pendingHalfClose = true;
|
270 | }
|
271 | }
|
272 | setCredentials(credentials: CallCredentials): void {
|
273 | throw new Error("Method not implemented.");
|
274 | }
|
275 |
|
276 | getCallNumber(): number {
|
277 | return this.callNumber;
|
278 | }
|
279 | } |
\ | No newline at end of file |