1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import {
|
19 | Deadline,
|
20 | Call,
|
21 | Http2CallStream,
|
22 | CallStreamOptions,
|
23 | StatusObject,
|
24 | } from './call-stream';
|
25 | import { ChannelCredentials } from './channel-credentials';
|
26 | import { ChannelOptions } from './channel-options';
|
27 | import { ResolvingLoadBalancer } from './resolving-load-balancer';
|
28 | import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
|
29 | import { ChannelControlHelper } from './load-balancer';
|
30 | import { UnavailablePicker, Picker, PickResultType } from './picker';
|
31 | import { Metadata } from './metadata';
|
32 | import { Status, LogVerbosity, Propagate } from './constants';
|
33 | import { FilterStackFactory } from './filter-stack';
|
34 | import { CallCredentialsFilterFactory } from './call-credentials-filter';
|
35 | import { DeadlineFilterFactory } from './deadline-filter';
|
36 | import { CompressionFilterFactory } from './compression-filter';
|
37 | import {
|
38 | CallConfig,
|
39 | ConfigSelector,
|
40 | getDefaultAuthority,
|
41 | mapUriDefaultScheme,
|
42 | } from './resolver';
|
43 | import { trace, log } from './logging';
|
44 | import { SubchannelAddress } from './subchannel-address';
|
45 | import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
|
46 | import { mapProxyName } from './http_proxy';
|
47 | import { GrpcUri, parseUri, uriToString } from './uri-parser';
|
48 | import { ServerSurfaceCall } from './server-call';
|
49 | import { Filter } from './filter';
|
50 |
|
51 | import { ConnectivityState } from './connectivity-state';
|
52 | import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz';
|
53 | import { Subchannel } from './subchannel';
|
54 |
|
55 |
|
56 |
|
57 |
|
58 | const MAX_TIMEOUT_TIME = 2147483647;
|
59 |
|
60 | let nextCallNumber = 0;
|
61 |
|
62 | function getNewCallNumber(): number {
|
63 | const callNumber = nextCallNumber;
|
64 | nextCallNumber += 1;
|
65 | if (nextCallNumber >= Number.MAX_SAFE_INTEGER) {
|
66 | nextCallNumber = 0;
|
67 | }
|
68 | return callNumber;
|
69 | }
|
70 |
|
71 | const INAPPROPRIATE_CONTROL_PLANE_CODES: Status[] = [
|
72 | Status.OK,
|
73 | Status.INVALID_ARGUMENT,
|
74 | Status.NOT_FOUND,
|
75 | Status.ALREADY_EXISTS,
|
76 | Status.FAILED_PRECONDITION,
|
77 | Status.ABORTED,
|
78 | Status.OUT_OF_RANGE,
|
79 | Status.DATA_LOSS
|
80 | ]
|
81 |
|
82 | function restrictControlPlaneStatusCode(code: Status, details: string): {code: Status, details: string} {
|
83 | if (INAPPROPRIATE_CONTROL_PLANE_CODES.includes(code)) {
|
84 | return {
|
85 | code: Status.INTERNAL,
|
86 | details: `Invalid status from control plane: ${code} ${Status[code]} ${details}`
|
87 | }
|
88 | } else {
|
89 | return {code, details};
|
90 | }
|
91 | }
|
92 |
|
93 |
|
94 |
|
95 |
|
96 |
|
97 | export interface Channel {
|
98 | |
99 |
|
100 |
|
101 |
|
102 | close(): void;
|
103 | |
104 |
|
105 |
|
106 | getTarget(): string;
|
107 | |
108 |
|
109 |
|
110 |
|
111 |
|
112 |
|
113 |
|
114 |
|
115 | getConnectivityState(tryToConnect: boolean): ConnectivityState;
|
116 | |
117 |
|
118 |
|
119 |
|
120 |
|
121 |
|
122 |
|
123 |
|
124 |
|
125 |
|
126 | watchConnectivityState(
|
127 | currentState: ConnectivityState,
|
128 | deadline: Date | number,
|
129 | callback: (error?: Error) => void
|
130 | ): void;
|
131 | |
132 |
|
133 |
|
134 |
|
135 |
|
136 | getChannelzRef(): ChannelRef;
|
137 | |
138 |
|
139 |
|
140 |
|
141 |
|
142 |
|
143 |
|
144 |
|
145 |
|
146 |
|
147 |
|
148 |
|
149 | createCall(
|
150 | method: string,
|
151 | deadline: Deadline,
|
152 | host: string | null | undefined,
|
153 | parentCall: ServerSurfaceCall | null,
|
154 | propagateFlags: number | null | undefined
|
155 | ): Call;
|
156 | }
|
157 |
|
158 | interface ConnectivityStateWatcher {
|
159 | currentState: ConnectivityState;
|
160 | timer: NodeJS.Timeout | null;
|
161 | callback: (error?: Error) => void;
|
162 | }
|
163 |
|
164 | export class ChannelImplementation implements Channel {
|
165 | private resolvingLoadBalancer: ResolvingLoadBalancer;
|
166 | private subchannelPool: SubchannelPool;
|
167 | private connectivityState: ConnectivityState = ConnectivityState.IDLE;
|
168 | private currentPicker: Picker = new UnavailablePicker();
|
169 | |
170 |
|
171 |
|
172 |
|
173 | private configSelectionQueue: Array<{
|
174 | callStream: Http2CallStream;
|
175 | callMetadata: Metadata;
|
176 | }> = [];
|
177 | private pickQueue: Array<{
|
178 | callStream: Http2CallStream;
|
179 | callMetadata: Metadata;
|
180 | callConfig: CallConfig;
|
181 | dynamicFilters: Filter[];
|
182 | }> = [];
|
183 | private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
|
184 | private defaultAuthority: string;
|
185 | private filterStackFactory: FilterStackFactory;
|
186 | private target: GrpcUri;
|
187 | |
188 |
|
189 |
|
190 |
|
191 |
|
192 |
|
193 |
|
194 | private callRefTimer: NodeJS.Timer;
|
195 | private configSelector: ConfigSelector | null = null;
|
196 | |
197 |
|
198 |
|
199 |
|
200 |
|
201 |
|
202 |
|
203 | private currentResolutionError: StatusObject | null = null;
|
204 |
|
205 |
|
206 | private readonly channelzEnabled: boolean = true;
|
207 | private originalTarget: string;
|
208 | private channelzRef: ChannelRef;
|
209 | private channelzTrace: ChannelzTrace;
|
210 | private callTracker = new ChannelzCallTracker();
|
211 | private childrenTracker = new ChannelzChildrenTracker();
|
212 |
|
213 | constructor(
|
214 | target: string,
|
215 | private readonly credentials: ChannelCredentials,
|
216 | private readonly options: ChannelOptions
|
217 | ) {
|
218 | if (typeof target !== 'string') {
|
219 | throw new TypeError('Channel target must be a string');
|
220 | }
|
221 | if (!(credentials instanceof ChannelCredentials)) {
|
222 | throw new TypeError(
|
223 | 'Channel credentials must be a ChannelCredentials object'
|
224 | );
|
225 | }
|
226 | if (options) {
|
227 | if (typeof options !== 'object') {
|
228 | throw new TypeError('Channel options must be an object');
|
229 | }
|
230 | }
|
231 | this.originalTarget = target;
|
232 | const originalTargetUri = parseUri(target);
|
233 | if (originalTargetUri === null) {
|
234 | throw new Error(`Could not parse target name "${target}"`);
|
235 | }
|
236 | |
237 |
|
238 | const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri);
|
239 | if (defaultSchemeMapResult === null) {
|
240 | throw new Error(
|
241 | `Could not find a default scheme for target name "${target}"`
|
242 | );
|
243 | }
|
244 |
|
245 | this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
|
246 | this.callRefTimer.unref?.();
|
247 |
|
248 | if (this.options['grpc.enable_channelz'] === 0) {
|
249 | this.channelzEnabled = false;
|
250 | }
|
251 |
|
252 | this.channelzTrace = new ChannelzTrace();
|
253 | this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled);
|
254 | if (this.channelzEnabled) {
|
255 | this.channelzTrace.addTrace('CT_INFO', 'Channel created');
|
256 | }
|
257 |
|
258 | if (this.options['grpc.default_authority']) {
|
259 | this.defaultAuthority = this.options['grpc.default_authority'] as string;
|
260 | } else {
|
261 | this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult);
|
262 | }
|
263 | const proxyMapResult = mapProxyName(defaultSchemeMapResult, options);
|
264 | this.target = proxyMapResult.target;
|
265 | this.options = Object.assign({}, this.options, proxyMapResult.extraOptions);
|
266 |
|
267 | |
268 |
|
269 | this.subchannelPool = getSubchannelPool(
|
270 | (options['grpc.use_local_subchannel_pool'] ?? 0) === 0
|
271 | );
|
272 | const channelControlHelper: ChannelControlHelper = {
|
273 | createSubchannel: (
|
274 | subchannelAddress: SubchannelAddress,
|
275 | subchannelArgs: ChannelOptions
|
276 | ) => {
|
277 | const subchannel = this.subchannelPool.getOrCreateSubchannel(
|
278 | this.target,
|
279 | subchannelAddress,
|
280 | Object.assign({}, this.options, subchannelArgs),
|
281 | this.credentials
|
282 | );
|
283 | if (this.channelzEnabled) {
|
284 | this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
|
285 | }
|
286 | return subchannel;
|
287 | },
|
288 | updateState: (connectivityState: ConnectivityState, picker: Picker) => {
|
289 | this.currentPicker = picker;
|
290 | const queueCopy = this.pickQueue.slice();
|
291 | this.pickQueue = [];
|
292 | this.callRefTimerUnref();
|
293 | for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) {
|
294 | this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
295 | }
|
296 | this.updateState(connectivityState);
|
297 | },
|
298 | requestReresolution: () => {
|
299 |
|
300 | throw new Error(
|
301 | 'Resolving load balancer should never call requestReresolution'
|
302 | );
|
303 | },
|
304 | addChannelzChild: (child: ChannelRef | SubchannelRef) => {
|
305 | if (this.channelzEnabled) {
|
306 | this.childrenTracker.refChild(child);
|
307 | }
|
308 | },
|
309 | removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
|
310 | if (this.channelzEnabled) {
|
311 | this.childrenTracker.unrefChild(child);
|
312 | }
|
313 | }
|
314 | };
|
315 | this.resolvingLoadBalancer = new ResolvingLoadBalancer(
|
316 | this.target,
|
317 | channelControlHelper,
|
318 | options,
|
319 | (configSelector) => {
|
320 | if (this.channelzEnabled) {
|
321 | this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
|
322 | }
|
323 | this.configSelector = configSelector;
|
324 | this.currentResolutionError = null;
|
325 | |
326 |
|
327 | process.nextTick(() => {
|
328 | const localQueue = this.configSelectionQueue;
|
329 | this.configSelectionQueue = [];
|
330 | this.callRefTimerUnref();
|
331 | for (const { callStream, callMetadata } of localQueue) {
|
332 | this.tryGetConfig(callStream, callMetadata);
|
333 | }
|
334 | this.configSelectionQueue = [];
|
335 | });
|
336 | },
|
337 | (status) => {
|
338 | if (this.channelzEnabled) {
|
339 | this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
|
340 | }
|
341 | if (this.configSelectionQueue.length > 0) {
|
342 | this.trace('Name resolution failed with calls queued for config selection');
|
343 | }
|
344 | if (this.configSelector === null) {
|
345 | this.currentResolutionError = {...restrictControlPlaneStatusCode(status.code, status.details), metadata: status.metadata};
|
346 | }
|
347 | const localQueue = this.configSelectionQueue;
|
348 | this.configSelectionQueue = [];
|
349 | this.callRefTimerUnref();
|
350 | for (const { callStream, callMetadata } of localQueue) {
|
351 | if (callMetadata.getOptions().waitForReady) {
|
352 | this.callRefTimerRef();
|
353 | this.configSelectionQueue.push({ callStream, callMetadata });
|
354 | } else {
|
355 | callStream.cancelWithStatus(status.code, status.details);
|
356 | }
|
357 | }
|
358 | }
|
359 | );
|
360 | this.filterStackFactory = new FilterStackFactory([
|
361 | new CallCredentialsFilterFactory(this),
|
362 | new DeadlineFilterFactory(this),
|
363 | new MaxMessageSizeFilterFactory(this.options),
|
364 | new CompressionFilterFactory(this, this.options),
|
365 | ]);
|
366 | this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
|
367 | const error = new Error();
|
368 | trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1));
|
369 | }
|
370 |
|
371 | private getChannelzInfo(): ChannelInfo {
|
372 | return {
|
373 | target: this.originalTarget,
|
374 | state: this.connectivityState,
|
375 | trace: this.channelzTrace,
|
376 | callTracker: this.callTracker,
|
377 | children: this.childrenTracker.getChildLists()
|
378 | };
|
379 | }
|
380 |
|
381 | private trace(text: string, verbosityOverride?: LogVerbosity) {
|
382 | trace(verbosityOverride ?? LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text);
|
383 | }
|
384 |
|
385 | private callRefTimerRef() {
|
386 |
|
387 | if (!this.callRefTimer.hasRef?.()) {
|
388 | this.trace(
|
389 | 'callRefTimer.ref | configSelectionQueue.length=' +
|
390 | this.configSelectionQueue.length +
|
391 | ' pickQueue.length=' +
|
392 | this.pickQueue.length
|
393 | );
|
394 | this.callRefTimer.ref?.();
|
395 | }
|
396 | }
|
397 |
|
398 | private callRefTimerUnref() {
|
399 |
|
400 | if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
|
401 | this.trace(
|
402 | 'callRefTimer.unref | configSelectionQueue.length=' +
|
403 | this.configSelectionQueue.length +
|
404 | ' pickQueue.length=' +
|
405 | this.pickQueue.length
|
406 | );
|
407 | this.callRefTimer.unref?.();
|
408 | }
|
409 | }
|
410 |
|
411 | private pushPick(
|
412 | callStream: Http2CallStream,
|
413 | callMetadata: Metadata,
|
414 | callConfig: CallConfig,
|
415 | dynamicFilters: Filter[]
|
416 | ) {
|
417 | this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters });
|
418 | this.callRefTimerRef();
|
419 | }
|
420 |
|
421 | |
422 |
|
423 |
|
424 |
|
425 |
|
426 |
|
427 |
|
428 | private tryPick(
|
429 | callStream: Http2CallStream,
|
430 | callMetadata: Metadata,
|
431 | callConfig: CallConfig,
|
432 | dynamicFilters: Filter[]
|
433 | ) {
|
434 | const pickResult = this.currentPicker.pick({
|
435 | metadata: callMetadata,
|
436 | extraPickInfo: callConfig.pickInformation,
|
437 | });
|
438 | const subchannelString = pickResult.subchannel ?
|
439 | '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
|
440 | '' + pickResult.subchannel;
|
441 | this.trace(
|
442 | 'Pick result for call [' +
|
443 | callStream.getCallNumber() +
|
444 | ']: ' +
|
445 | PickResultType[pickResult.pickResultType] +
|
446 | ' subchannel: ' +
|
447 | subchannelString +
|
448 | ' status: ' +
|
449 | pickResult.status?.code +
|
450 | ' ' +
|
451 | pickResult.status?.details
|
452 | );
|
453 | switch (pickResult.pickResultType) {
|
454 | case PickResultType.COMPLETE:
|
455 | if (pickResult.subchannel === null) {
|
456 | callStream.cancelWithStatus(
|
457 | Status.UNAVAILABLE,
|
458 | 'Request dropped by load balancing policy'
|
459 | );
|
460 |
|
461 | } else {
|
462 | |
463 |
|
464 |
|
465 | if (
|
466 | pickResult.subchannel!.getConnectivityState() !==
|
467 | ConnectivityState.READY
|
468 | ) {
|
469 | log(
|
470 | LogVerbosity.ERROR,
|
471 | 'Error: COMPLETE pick result subchannel ' +
|
472 | subchannelString +
|
473 | ' has state ' +
|
474 | ConnectivityState[pickResult.subchannel!.getConnectivityState()]
|
475 | );
|
476 | this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
477 | break;
|
478 | }
|
479 | |
480 |
|
481 |
|
482 | callStream.filterStack
|
483 | .sendMetadata(Promise.resolve(callMetadata.clone()))
|
484 | .then(
|
485 | (finalMetadata) => {
|
486 | const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState();
|
487 | if (subchannelState === ConnectivityState.READY) {
|
488 | try {
|
489 | const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
|
490 | pickResult.subchannel?.getRealSubchannel().startCallStream(
|
491 | finalMetadata,
|
492 | callStream,
|
493 | [...dynamicFilters, ...pickExtraFilters]
|
494 | );
|
495 | |
496 |
|
497 | callConfig.onCommitted?.();
|
498 | pickResult.onCallStarted?.();
|
499 | } catch (error) {
|
500 | const errorCode = (error as NodeJS.ErrnoException).code;
|
501 | if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' ||
|
502 | errorCode === 'ERR_HTTP2_INVALID_SESSION'
|
503 | ) {
|
504 | |
505 |
|
506 |
|
507 |
|
508 |
|
509 |
|
510 |
|
511 |
|
512 |
|
513 |
|
514 |
|
515 |
|
516 |
|
517 | this.trace(
|
518 | 'Failed to start call on picked subchannel ' +
|
519 | subchannelString +
|
520 | ' with error ' +
|
521 | (error as Error).message +
|
522 | '. Retrying pick',
|
523 | LogVerbosity.INFO
|
524 | );
|
525 | this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
526 | } else {
|
527 | this.trace(
|
528 | 'Failed to start call on picked subchanel ' +
|
529 | subchannelString +
|
530 | ' with error ' +
|
531 | (error as Error).message +
|
532 | '. Ending call',
|
533 | LogVerbosity.INFO
|
534 | );
|
535 | callStream.cancelWithStatus(
|
536 | Status.INTERNAL,
|
537 | `Failed to start HTTP/2 stream with error: ${
|
538 | (error as Error).message
|
539 | }`
|
540 | );
|
541 | }
|
542 | }
|
543 | } else {
|
544 | |
545 |
|
546 | this.trace(
|
547 | 'Picked subchannel ' +
|
548 | subchannelString +
|
549 | ' has state ' +
|
550 | ConnectivityState[subchannelState] +
|
551 | ' after metadata filters. Retrying pick',
|
552 | LogVerbosity.INFO
|
553 | );
|
554 | this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
555 | }
|
556 | },
|
557 | (error: Error & { code: number }) => {
|
558 |
|
559 | const {code, details} = restrictControlPlaneStatusCode(
|
560 | typeof error.code === 'number' ? error.code : Status.UNKNOWN,
|
561 | `Getting metadata from plugin failed with error: ${error.message}`
|
562 | )
|
563 | callStream.cancelWithStatus(code, details);
|
564 | }
|
565 | );
|
566 | }
|
567 | break;
|
568 | case PickResultType.QUEUE:
|
569 | this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
570 | break;
|
571 | case PickResultType.TRANSIENT_FAILURE:
|
572 | if (callMetadata.getOptions().waitForReady) {
|
573 | this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
574 | } else {
|
575 | const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
|
576 | callStream.cancelWithStatus(code, details);
|
577 | }
|
578 | break;
|
579 | case PickResultType.DROP:
|
580 | const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
|
581 | callStream.cancelWithStatus(code, details);
|
582 | break;
|
583 | default:
|
584 | throw new Error(
|
585 | `Invalid state: unknown pickResultType ${pickResult.pickResultType}`
|
586 | );
|
587 | }
|
588 | }
|
589 |
|
590 | private removeConnectivityStateWatcher(
|
591 | watcherObject: ConnectivityStateWatcher
|
592 | ) {
|
593 | const watcherIndex = this.connectivityStateWatchers.findIndex(
|
594 | (value) => value === watcherObject
|
595 | );
|
596 | if (watcherIndex >= 0) {
|
597 | this.connectivityStateWatchers.splice(watcherIndex, 1);
|
598 | }
|
599 | }
|
600 |
|
601 | private updateState(newState: ConnectivityState): void {
|
602 | trace(
|
603 | LogVerbosity.DEBUG,
|
604 | 'connectivity_state',
|
605 | '(' + this.channelzRef.id + ') ' +
|
606 | uriToString(this.target) +
|
607 | ' ' +
|
608 | ConnectivityState[this.connectivityState] +
|
609 | ' -> ' +
|
610 | ConnectivityState[newState]
|
611 | );
|
612 | if (this.channelzEnabled) {
|
613 | this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
|
614 | }
|
615 | this.connectivityState = newState;
|
616 | const watchersCopy = this.connectivityStateWatchers.slice();
|
617 | for (const watcherObject of watchersCopy) {
|
618 | if (newState !== watcherObject.currentState) {
|
619 | if (watcherObject.timer) {
|
620 | clearTimeout(watcherObject.timer);
|
621 | }
|
622 | this.removeConnectivityStateWatcher(watcherObject);
|
623 | watcherObject.callback();
|
624 | }
|
625 | }
|
626 | if (newState !== ConnectivityState.TRANSIENT_FAILURE) {
|
627 | this.currentResolutionError = null;
|
628 | }
|
629 | }
|
630 |
|
631 | private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
|
632 | if (stream.getStatus() !== null) {
|
633 | |
634 |
|
635 | return;
|
636 | }
|
637 | if (this.configSelector === null) {
|
638 | |
639 |
|
640 |
|
641 |
|
642 | this.resolvingLoadBalancer.exitIdle();
|
643 | if (this.currentResolutionError && !metadata.getOptions().waitForReady) {
|
644 | stream.cancelWithStatus(this.currentResolutionError.code, this.currentResolutionError.details);
|
645 | } else {
|
646 | this.configSelectionQueue.push({
|
647 | callStream: stream,
|
648 | callMetadata: metadata,
|
649 | });
|
650 | this.callRefTimerRef();
|
651 | }
|
652 | } else {
|
653 | const callConfig = this.configSelector(stream.getMethod(), metadata);
|
654 | if (callConfig.status === Status.OK) {
|
655 | if (callConfig.methodConfig.timeout) {
|
656 | const deadline = new Date();
|
657 | deadline.setSeconds(
|
658 | deadline.getSeconds() + callConfig.methodConfig.timeout.seconds
|
659 | );
|
660 | deadline.setMilliseconds(
|
661 | deadline.getMilliseconds() +
|
662 | callConfig.methodConfig.timeout.nanos / 1_000_000
|
663 | );
|
664 | stream.setConfigDeadline(deadline);
|
665 |
|
666 | stream.filterStack.refresh();
|
667 | }
|
668 | if (callConfig.dynamicFilterFactories.length > 0) {
|
669 | |
670 |
|
671 |
|
672 |
|
673 |
|
674 |
|
675 |
|
676 |
|
677 |
|
678 |
|
679 |
|
680 |
|
681 | const dynamicFilterStackFactory = new FilterStackFactory(callConfig.dynamicFilterFactories);
|
682 | const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream);
|
683 | dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
|
684 | this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters());
|
685 | });
|
686 | } else {
|
687 | this.tryPick(stream, metadata, callConfig, []);
|
688 | }
|
689 | } else {
|
690 | const {code, details} = restrictControlPlaneStatusCode(callConfig.status, 'Failed to route call to method ' + stream.getMethod());
|
691 | stream.cancelWithStatus(code, details);
|
692 | }
|
693 | }
|
694 | }
|
695 |
|
696 | _startCallStream(stream: Http2CallStream, metadata: Metadata) {
|
697 | this.tryGetConfig(stream, metadata.clone());
|
698 | }
|
699 |
|
700 | close() {
|
701 | this.resolvingLoadBalancer.destroy();
|
702 | this.updateState(ConnectivityState.SHUTDOWN);
|
703 | clearInterval(this.callRefTimer);
|
704 | if (this.channelzEnabled) {
|
705 | unregisterChannelzRef(this.channelzRef);
|
706 | }
|
707 |
|
708 | this.subchannelPool.unrefUnusedSubchannels();
|
709 | }
|
710 |
|
711 | getTarget() {
|
712 | return uriToString(this.target);
|
713 | }
|
714 |
|
715 | getConnectivityState(tryToConnect: boolean) {
|
716 | const connectivityState = this.connectivityState;
|
717 | if (tryToConnect) {
|
718 | this.resolvingLoadBalancer.exitIdle();
|
719 | }
|
720 | return connectivityState;
|
721 | }
|
722 |
|
723 | watchConnectivityState(
|
724 | currentState: ConnectivityState,
|
725 | deadline: Date | number,
|
726 | callback: (error?: Error) => void
|
727 | ): void {
|
728 | if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
729 | throw new Error('Channel has been shut down');
|
730 | }
|
731 | let timer = null;
|
732 | if (deadline !== Infinity) {
|
733 | const deadlineDate: Date =
|
734 | deadline instanceof Date ? deadline : new Date(deadline);
|
735 | const now = new Date();
|
736 | if (deadline === -Infinity || deadlineDate <= now) {
|
737 | process.nextTick(
|
738 | callback,
|
739 | new Error('Deadline passed without connectivity state change')
|
740 | );
|
741 | return;
|
742 | }
|
743 | timer = setTimeout(() => {
|
744 | this.removeConnectivityStateWatcher(watcherObject);
|
745 | callback(
|
746 | new Error('Deadline passed without connectivity state change')
|
747 | );
|
748 | }, deadlineDate.getTime() - now.getTime());
|
749 | }
|
750 | const watcherObject = {
|
751 | currentState,
|
752 | callback,
|
753 | timer,
|
754 | };
|
755 | this.connectivityStateWatchers.push(watcherObject);
|
756 | }
|
757 |
|
758 | |
759 |
|
760 |
|
761 |
|
762 |
|
763 | getChannelzRef() {
|
764 | return this.channelzRef;
|
765 | }
|
766 |
|
767 | createCall(
|
768 | method: string,
|
769 | deadline: Deadline,
|
770 | host: string | null | undefined,
|
771 | parentCall: ServerSurfaceCall | null,
|
772 | propagateFlags: number | null | undefined
|
773 | ): Call {
|
774 | if (typeof method !== 'string') {
|
775 | throw new TypeError('Channel#createCall: method must be a string');
|
776 | }
|
777 | if (!(typeof deadline === 'number' || deadline instanceof Date)) {
|
778 | throw new TypeError(
|
779 | 'Channel#createCall: deadline must be a number or Date'
|
780 | );
|
781 | }
|
782 | if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
783 | throw new Error('Channel has been shut down');
|
784 | }
|
785 | const callNumber = getNewCallNumber();
|
786 | this.trace(
|
787 | 'createCall [' +
|
788 | callNumber +
|
789 | '] method="' +
|
790 | method +
|
791 | '", deadline=' +
|
792 | deadline
|
793 | );
|
794 | const finalOptions: CallStreamOptions = {
|
795 | deadline: deadline,
|
796 | flags: propagateFlags ?? Propagate.DEFAULTS,
|
797 | host: host ?? this.defaultAuthority,
|
798 | parentCall: parentCall,
|
799 | };
|
800 | const stream: Http2CallStream = new Http2CallStream(
|
801 | method,
|
802 | this,
|
803 | finalOptions,
|
804 | this.filterStackFactory,
|
805 | this.credentials._getCallCredentials(),
|
806 | callNumber
|
807 | );
|
808 | if (this.channelzEnabled) {
|
809 | this.callTracker.addCallStarted();
|
810 | stream.addStatusWatcher(status => {
|
811 | if (status.code === Status.OK) {
|
812 | this.callTracker.addCallSucceeded();
|
813 | } else {
|
814 | this.callTracker.addCallFailed();
|
815 | }
|
816 | });
|
817 | }
|
818 | return stream;
|
819 | }
|
820 | }
|