UNPKG

30.3 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import {
19 Deadline,
20 Call,
21 Http2CallStream,
22 CallStreamOptions,
23 StatusObject,
24} from './call-stream';
25import { ChannelCredentials } from './channel-credentials';
26import { ChannelOptions } from './channel-options';
27import { ResolvingLoadBalancer } from './resolving-load-balancer';
28import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
29import { ChannelControlHelper } from './load-balancer';
30import { UnavailablePicker, Picker, PickResultType } from './picker';
31import { Metadata } from './metadata';
32import { Status, LogVerbosity, Propagate } from './constants';
33import { FilterStackFactory } from './filter-stack';
34import { CallCredentialsFilterFactory } from './call-credentials-filter';
35import { DeadlineFilterFactory } from './deadline-filter';
36import { CompressionFilterFactory } from './compression-filter';
37import {
38 CallConfig,
39 ConfigSelector,
40 getDefaultAuthority,
41 mapUriDefaultScheme,
42} from './resolver';
43import { trace, log } from './logging';
44import { SubchannelAddress } from './subchannel-address';
45import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
46import { mapProxyName } from './http_proxy';
47import { GrpcUri, parseUri, uriToString } from './uri-parser';
48import { ServerSurfaceCall } from './server-call';
49import { Filter } from './filter';
50
51import { ConnectivityState } from './connectivity-state';
52import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz';
53import { Subchannel } from './subchannel';
54
55/**
56 * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
57 */
58const MAX_TIMEOUT_TIME = 2147483647;
59
60let nextCallNumber = 0;
61
62function getNewCallNumber(): number {
63 const callNumber = nextCallNumber;
64 nextCallNumber += 1;
65 if (nextCallNumber >= Number.MAX_SAFE_INTEGER) {
66 nextCallNumber = 0;
67 }
68 return callNumber;
69}
70
71/**
72 * An interface that represents a communication channel to a server specified
73 * by a given address.
74 */
75export interface Channel {
76 /**
77 * Close the channel. This has the same functionality as the existing
78 * grpc.Client.prototype.close
79 */
80 close(): void;
81 /**
82 * Return the target that this channel connects to
83 */
84 getTarget(): string;
85 /**
86 * Get the channel's current connectivity state. This method is here mainly
87 * because it is in the existing internal Channel class, and there isn't
88 * another good place to put it.
89 * @param tryToConnect If true, the channel will start connecting if it is
90 * idle. Otherwise, idle channels will only start connecting when a
91 * call starts.
92 */
93 getConnectivityState(tryToConnect: boolean): ConnectivityState;
94 /**
95 * Watch for connectivity state changes. This is also here mainly because
96 * it is in the existing external Channel class.
97 * @param currentState The state to watch for transitions from. This should
98 * always be populated by calling getConnectivityState immediately
99 * before.
100 * @param deadline A deadline for waiting for a state change
101 * @param callback Called with no error when a state change, or with an
102 * error if the deadline passes without a state change.
103 */
104 watchConnectivityState(
105 currentState: ConnectivityState,
106 deadline: Date | number,
107 callback: (error?: Error) => void
108 ): void;
109 /**
110 * Get the channelz reference object for this channel. A request to the
111 * channelz service for the id in this object will provide information
112 * about this channel.
113 */
114 getChannelzRef(): ChannelRef;
115 /**
116 * Create a call object. Call is an opaque type that is used by the Client
117 * class. This function is called by the gRPC library when starting a
118 * request. Implementers should return an instance of Call that is returned
119 * from calling createCall on an instance of the provided Channel class.
120 * @param method The full method string to request.
121 * @param deadline The call deadline
122 * @param host A host string override for making the request
123 * @param parentCall A server call to propagate some information from
124 * @param propagateFlags A bitwise combination of elements of grpc.propagate
125 * that indicates what information to propagate from parentCall.
126 */
127 createCall(
128 method: string,
129 deadline: Deadline,
130 host: string | null | undefined,
131 parentCall: ServerSurfaceCall | null,
132 propagateFlags: number | null | undefined
133 ): Call;
134}
135
136interface ConnectivityStateWatcher {
137 currentState: ConnectivityState;
138 timer: NodeJS.Timeout | null;
139 callback: (error?: Error) => void;
140}
141
142export class ChannelImplementation implements Channel {
143 private resolvingLoadBalancer: ResolvingLoadBalancer;
144 private subchannelPool: SubchannelPool;
145 private connectivityState: ConnectivityState = ConnectivityState.IDLE;
146 private currentPicker: Picker = new UnavailablePicker();
147 /**
148 * Calls queued up to get a call config. Should only be populated before the
149 * first time the resolver returns a result, which includes the ConfigSelector.
150 */
151 private configSelectionQueue: Array<{
152 callStream: Http2CallStream;
153 callMetadata: Metadata;
154 }> = [];
155 private pickQueue: Array<{
156 callStream: Http2CallStream;
157 callMetadata: Metadata;
158 callConfig: CallConfig;
159 dynamicFilters: Filter[];
160 }> = [];
161 private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
162 private defaultAuthority: string;
163 private filterStackFactory: FilterStackFactory;
164 private target: GrpcUri;
165 /**
166 * This timer does not do anything on its own. Its purpose is to hold the
167 * event loop open while there are any pending calls for the channel that
168 * have not yet been assigned to specific subchannels. In other words,
169 * the invariant is that callRefTimer is reffed if and only if pickQueue
170 * is non-empty.
171 */
172 private callRefTimer: NodeJS.Timer;
173 private configSelector: ConfigSelector | null = null;
174 /**
175 * This is the error from the name resolver if it failed most recently. It
176 * is only used to end calls that start while there is no config selector
177 * and the name resolver is in backoff, so it should be nulled if
178 * configSelector becomes set or the channel state becomes anything other
179 * than TRANSIENT_FAILURE.
180 */
181 private currentResolutionError: StatusObject | null = null;
182
183 // Channelz info
184 private readonly channelzEnabled: boolean = true;
185 private originalTarget: string;
186 private channelzRef: ChannelRef;
187 private channelzTrace: ChannelzTrace;
188 private callTracker = new ChannelzCallTracker();
189 private childrenTracker = new ChannelzChildrenTracker();
190
191 constructor(
192 target: string,
193 private readonly credentials: ChannelCredentials,
194 private readonly options: ChannelOptions
195 ) {
196 if (typeof target !== 'string') {
197 throw new TypeError('Channel target must be a string');
198 }
199 if (!(credentials instanceof ChannelCredentials)) {
200 throw new TypeError(
201 'Channel credentials must be a ChannelCredentials object'
202 );
203 }
204 if (options) {
205 if (typeof options !== 'object') {
206 throw new TypeError('Channel options must be an object');
207 }
208 }
209 this.originalTarget = target;
210 const originalTargetUri = parseUri(target);
211 if (originalTargetUri === null) {
212 throw new Error(`Could not parse target name "${target}"`);
213 }
214 /* This ensures that the target has a scheme that is registered with the
215 * resolver */
216 const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri);
217 if (defaultSchemeMapResult === null) {
218 throw new Error(
219 `Could not find a default scheme for target name "${target}"`
220 );
221 }
222
223 this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
224 this.callRefTimer.unref?.();
225
226 if (this.options['grpc.enable_channelz'] === 0) {
227 this.channelzEnabled = false;
228 }
229
230 this.channelzTrace = new ChannelzTrace();
231 this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled);
232 if (this.channelzEnabled) {
233 this.channelzTrace.addTrace('CT_INFO', 'Channel created');
234 }
235
236 if (this.options['grpc.default_authority']) {
237 this.defaultAuthority = this.options['grpc.default_authority'] as string;
238 } else {
239 this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult);
240 }
241 const proxyMapResult = mapProxyName(defaultSchemeMapResult, options);
242 this.target = proxyMapResult.target;
243 this.options = Object.assign({}, this.options, proxyMapResult.extraOptions);
244
245 /* The global boolean parameter to getSubchannelPool has the inverse meaning to what
246 * the grpc.use_local_subchannel_pool channel option means. */
247 this.subchannelPool = getSubchannelPool(
248 (options['grpc.use_local_subchannel_pool'] ?? 0) === 0
249 );
250 const channelControlHelper: ChannelControlHelper = {
251 createSubchannel: (
252 subchannelAddress: SubchannelAddress,
253 subchannelArgs: ChannelOptions
254 ) => {
255 const subchannel = this.subchannelPool.getOrCreateSubchannel(
256 this.target,
257 subchannelAddress,
258 Object.assign({}, this.options, subchannelArgs),
259 this.credentials
260 );
261 if (this.channelzEnabled) {
262 this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
263 }
264 return subchannel;
265 },
266 updateState: (connectivityState: ConnectivityState, picker: Picker) => {
267 this.currentPicker = picker;
268 const queueCopy = this.pickQueue.slice();
269 this.pickQueue = [];
270 this.callRefTimerUnref();
271 for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) {
272 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
273 }
274 this.updateState(connectivityState);
275 },
276 requestReresolution: () => {
277 // This should never be called.
278 throw new Error(
279 'Resolving load balancer should never call requestReresolution'
280 );
281 },
282 addChannelzChild: (child: ChannelRef | SubchannelRef) => {
283 if (this.channelzEnabled) {
284 this.childrenTracker.refChild(child);
285 }
286 },
287 removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
288 if (this.channelzEnabled) {
289 this.childrenTracker.unrefChild(child);
290 }
291 }
292 };
293 this.resolvingLoadBalancer = new ResolvingLoadBalancer(
294 this.target,
295 channelControlHelper,
296 options,
297 (configSelector) => {
298 if (this.channelzEnabled) {
299 this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
300 }
301 this.configSelector = configSelector;
302 this.currentResolutionError = null;
303 /* We process the queue asynchronously to ensure that the corresponding
304 * load balancer update has completed. */
305 process.nextTick(() => {
306 const localQueue = this.configSelectionQueue;
307 this.configSelectionQueue = [];
308 this.callRefTimerUnref();
309 for (const { callStream, callMetadata } of localQueue) {
310 this.tryGetConfig(callStream, callMetadata);
311 }
312 this.configSelectionQueue = [];
313 });
314 },
315 (status) => {
316 if (this.channelzEnabled) {
317 this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
318 }
319 if (this.configSelectionQueue.length > 0) {
320 this.trace('Name resolution failed with calls queued for config selection');
321 }
322 if (this.configSelector === null) {
323 this.currentResolutionError = status;
324 }
325 const localQueue = this.configSelectionQueue;
326 this.configSelectionQueue = [];
327 this.callRefTimerUnref();
328 for (const { callStream, callMetadata } of localQueue) {
329 if (callMetadata.getOptions().waitForReady) {
330 this.callRefTimerRef();
331 this.configSelectionQueue.push({ callStream, callMetadata });
332 } else {
333 callStream.cancelWithStatus(status.code, status.details);
334 }
335 }
336 }
337 );
338 this.filterStackFactory = new FilterStackFactory([
339 new CallCredentialsFilterFactory(this),
340 new DeadlineFilterFactory(this),
341 new MaxMessageSizeFilterFactory(this.options),
342 new CompressionFilterFactory(this, this.options),
343 ]);
344 this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
345 const error = new Error();
346 trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1));
347 }
348
349 private getChannelzInfo(): ChannelInfo {
350 return {
351 target: this.originalTarget,
352 state: this.connectivityState,
353 trace: this.channelzTrace,
354 callTracker: this.callTracker,
355 children: this.childrenTracker.getChildLists()
356 };
357 }
358
359 private trace(text: string, verbosityOverride?: LogVerbosity) {
360 trace(verbosityOverride ?? LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text);
361 }
362
363 private callRefTimerRef() {
364 // If the hasRef function does not exist, always run the code
365 if (!this.callRefTimer.hasRef?.()) {
366 this.trace(
367 'callRefTimer.ref | configSelectionQueue.length=' +
368 this.configSelectionQueue.length +
369 ' pickQueue.length=' +
370 this.pickQueue.length
371 );
372 this.callRefTimer.ref?.();
373 }
374 }
375
376 private callRefTimerUnref() {
377 // If the hasRef function does not exist, always run the code
378 if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
379 this.trace(
380 'callRefTimer.unref | configSelectionQueue.length=' +
381 this.configSelectionQueue.length +
382 ' pickQueue.length=' +
383 this.pickQueue.length
384 );
385 this.callRefTimer.unref?.();
386 }
387 }
388
389 private pushPick(
390 callStream: Http2CallStream,
391 callMetadata: Metadata,
392 callConfig: CallConfig,
393 dynamicFilters: Filter[]
394 ) {
395 this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters });
396 this.callRefTimerRef();
397 }
398
399 /**
400 * Check the picker output for the given call and corresponding metadata,
401 * and take any relevant actions. Should not be called while iterating
402 * over pickQueue.
403 * @param callStream
404 * @param callMetadata
405 */
406 private tryPick(
407 callStream: Http2CallStream,
408 callMetadata: Metadata,
409 callConfig: CallConfig,
410 dynamicFilters: Filter[]
411 ) {
412 const pickResult = this.currentPicker.pick({
413 metadata: callMetadata,
414 extraPickInfo: callConfig.pickInformation,
415 });
416 const subchannelString = pickResult.subchannel ?
417 '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
418 '' + pickResult.subchannel;
419 this.trace(
420 'Pick result for call [' +
421 callStream.getCallNumber() +
422 ']: ' +
423 PickResultType[pickResult.pickResultType] +
424 ' subchannel: ' +
425 subchannelString +
426 ' status: ' +
427 pickResult.status?.code +
428 ' ' +
429 pickResult.status?.details
430 );
431 switch (pickResult.pickResultType) {
432 case PickResultType.COMPLETE:
433 if (pickResult.subchannel === null) {
434 callStream.cancelWithStatus(
435 Status.UNAVAILABLE,
436 'Request dropped by load balancing policy'
437 );
438 // End the call with an error
439 } else {
440 /* If the subchannel is not in the READY state, that indicates a bug
441 * somewhere in the load balancer or picker. So, we log an error and
442 * queue the pick to be tried again later. */
443 if (
444 pickResult.subchannel!.getConnectivityState() !==
445 ConnectivityState.READY
446 ) {
447 log(
448 LogVerbosity.ERROR,
449 'Error: COMPLETE pick result subchannel ' +
450 subchannelString +
451 ' has state ' +
452 ConnectivityState[pickResult.subchannel!.getConnectivityState()]
453 );
454 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
455 break;
456 }
457 /* We need to clone the callMetadata here because the transparent
458 * retry code in the promise resolution handler use the same
459 * callMetadata object, so it needs to stay unmodified */
460 callStream.filterStack
461 .sendMetadata(Promise.resolve(callMetadata.clone()))
462 .then(
463 (finalMetadata) => {
464 const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState();
465 if (subchannelState === ConnectivityState.READY) {
466 try {
467 const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
468 pickResult.subchannel?.getRealSubchannel().startCallStream(
469 finalMetadata,
470 callStream,
471 [...dynamicFilters, ...pickExtraFilters]
472 );
473 /* If we reach this point, the call stream has started
474 * successfully */
475 callConfig.onCommitted?.();
476 pickResult.onCallStarted?.();
477 } catch (error) {
478 const errorCode = (error as NodeJS.ErrnoException).code;
479 if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' ||
480 errorCode === 'ERR_HTTP2_INVALID_SESSION'
481 ) {
482 /* An error here indicates that something went wrong with
483 * the picked subchannel's http2 stream right before we
484 * tried to start the stream. We are handling a promise
485 * result here, so this is asynchronous with respect to the
486 * original tryPick call, so calling it again is not
487 * recursive. We call tryPick immediately instead of
488 * queueing this pick again because handling the queue is
489 * triggered by state changes, and we want to immediately
490 * check if the state has already changed since the
491 * previous tryPick call. We do this instead of cancelling
492 * the stream because the correct behavior may be
493 * re-queueing instead, based on the logic in the rest of
494 * tryPick */
495 this.trace(
496 'Failed to start call on picked subchannel ' +
497 subchannelString +
498 ' with error ' +
499 (error as Error).message +
500 '. Retrying pick',
501 LogVerbosity.INFO
502 );
503 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
504 } else {
505 this.trace(
506 'Failed to start call on picked subchanel ' +
507 subchannelString +
508 ' with error ' +
509 (error as Error).message +
510 '. Ending call',
511 LogVerbosity.INFO
512 );
513 callStream.cancelWithStatus(
514 Status.INTERNAL,
515 `Failed to start HTTP/2 stream with error: ${
516 (error as Error).message
517 }`
518 );
519 }
520 }
521 } else {
522 /* The logic for doing this here is the same as in the catch
523 * block above */
524 this.trace(
525 'Picked subchannel ' +
526 subchannelString +
527 ' has state ' +
528 ConnectivityState[subchannelState] +
529 ' after metadata filters. Retrying pick',
530 LogVerbosity.INFO
531 );
532 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
533 }
534 },
535 (error: Error & { code: number }) => {
536 // We assume the error code isn't 0 (Status.OK)
537 callStream.cancelWithStatus(
538 typeof error.code === 'number' ? error.code : Status.UNKNOWN,
539 `Getting metadata from plugin failed with error: ${error.message}`
540 );
541 }
542 );
543 }
544 break;
545 case PickResultType.QUEUE:
546 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
547 break;
548 case PickResultType.TRANSIENT_FAILURE:
549 if (callMetadata.getOptions().waitForReady) {
550 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
551 } else {
552 callStream.cancelWithStatus(
553 pickResult.status!.code,
554 pickResult.status!.details
555 );
556 }
557 break;
558 case PickResultType.DROP:
559 callStream.cancelWithStatus(
560 pickResult.status!.code,
561 pickResult.status!.details
562 );
563 break;
564 default:
565 throw new Error(
566 `Invalid state: unknown pickResultType ${pickResult.pickResultType}`
567 );
568 }
569 }
570
571 private removeConnectivityStateWatcher(
572 watcherObject: ConnectivityStateWatcher
573 ) {
574 const watcherIndex = this.connectivityStateWatchers.findIndex(
575 (value) => value === watcherObject
576 );
577 if (watcherIndex >= 0) {
578 this.connectivityStateWatchers.splice(watcherIndex, 1);
579 }
580 }
581
582 private updateState(newState: ConnectivityState): void {
583 trace(
584 LogVerbosity.DEBUG,
585 'connectivity_state',
586 '(' + this.channelzRef.id + ') ' +
587 uriToString(this.target) +
588 ' ' +
589 ConnectivityState[this.connectivityState] +
590 ' -> ' +
591 ConnectivityState[newState]
592 );
593 if (this.channelzEnabled) {
594 this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
595 }
596 this.connectivityState = newState;
597 const watchersCopy = this.connectivityStateWatchers.slice();
598 for (const watcherObject of watchersCopy) {
599 if (newState !== watcherObject.currentState) {
600 if (watcherObject.timer) {
601 clearTimeout(watcherObject.timer);
602 }
603 this.removeConnectivityStateWatcher(watcherObject);
604 watcherObject.callback();
605 }
606 }
607 if (newState !== ConnectivityState.TRANSIENT_FAILURE) {
608 this.currentResolutionError = null;
609 }
610 }
611
612 private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
613 if (stream.getStatus() !== null) {
614 /* If the stream has a status, it has already finished and we don't need
615 * to take any more actions on it. */
616 return;
617 }
618 if (this.configSelector === null) {
619 /* This branch will only be taken at the beginning of the channel's life,
620 * before the resolver ever returns a result. So, the
621 * ResolvingLoadBalancer may be idle and if so it needs to be kicked
622 * because it now has a pending request. */
623 this.resolvingLoadBalancer.exitIdle();
624 if (this.currentResolutionError && !metadata.getOptions().waitForReady) {
625 stream.cancelWithStatus(this.currentResolutionError.code, this.currentResolutionError.details);
626 } else {
627 this.configSelectionQueue.push({
628 callStream: stream,
629 callMetadata: metadata,
630 });
631 this.callRefTimerRef();
632 }
633 } else {
634 const callConfig = this.configSelector(stream.getMethod(), metadata);
635 if (callConfig.status === Status.OK) {
636 if (callConfig.methodConfig.timeout) {
637 const deadline = new Date();
638 deadline.setSeconds(
639 deadline.getSeconds() + callConfig.methodConfig.timeout.seconds
640 );
641 deadline.setMilliseconds(
642 deadline.getMilliseconds() +
643 callConfig.methodConfig.timeout.nanos / 1_000_000
644 );
645 stream.setConfigDeadline(deadline);
646 // Refreshing the filters makes the deadline filter pick up the new deadline
647 stream.filterStack.refresh();
648 }
649 if (callConfig.dynamicFilterFactories.length > 0) {
650 /* These dynamicFilters are the mechanism for implementing gRFC A39:
651 * https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md
652 * We run them here instead of with the rest of the filters because
653 * that spec says "the xDS HTTP filters will run in between name
654 * resolution and load balancing".
655 *
656 * We use the filter stack here to simplify the multi-filter async
657 * waterfall logic, but we pass along the underlying list of filters
658 * to avoid having nested filter stacks when combining it with the
659 * original filter stack. We do not pass along the original filter
660 * factory list because these filters may need to persist data
661 * between sending headers and other operations. */
662 const dynamicFilterStackFactory = new FilterStackFactory(callConfig.dynamicFilterFactories);
663 const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream);
664 dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
665 this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters());
666 });
667 } else {
668 this.tryPick(stream, metadata, callConfig, []);
669 }
670 } else {
671 stream.cancelWithStatus(
672 callConfig.status,
673 'Failed to route call to method ' + stream.getMethod()
674 );
675 }
676 }
677 }
678
679 _startCallStream(stream: Http2CallStream, metadata: Metadata) {
680 this.tryGetConfig(stream, metadata.clone());
681 }
682
683 close() {
684 this.resolvingLoadBalancer.destroy();
685 this.updateState(ConnectivityState.SHUTDOWN);
686 clearInterval(this.callRefTimer);
687 if (this.channelzEnabled) {
688 unregisterChannelzRef(this.channelzRef);
689 }
690
691 this.subchannelPool.unrefUnusedSubchannels();
692 }
693
694 getTarget() {
695 return uriToString(this.target);
696 }
697
698 getConnectivityState(tryToConnect: boolean) {
699 const connectivityState = this.connectivityState;
700 if (tryToConnect) {
701 this.resolvingLoadBalancer.exitIdle();
702 }
703 return connectivityState;
704 }
705
706 watchConnectivityState(
707 currentState: ConnectivityState,
708 deadline: Date | number,
709 callback: (error?: Error) => void
710 ): void {
711 if (this.connectivityState === ConnectivityState.SHUTDOWN) {
712 throw new Error('Channel has been shut down');
713 }
714 let timer = null;
715 if (deadline !== Infinity) {
716 const deadlineDate: Date =
717 deadline instanceof Date ? deadline : new Date(deadline);
718 const now = new Date();
719 if (deadline === -Infinity || deadlineDate <= now) {
720 process.nextTick(
721 callback,
722 new Error('Deadline passed without connectivity state change')
723 );
724 return;
725 }
726 timer = setTimeout(() => {
727 this.removeConnectivityStateWatcher(watcherObject);
728 callback(
729 new Error('Deadline passed without connectivity state change')
730 );
731 }, deadlineDate.getTime() - now.getTime());
732 }
733 const watcherObject = {
734 currentState,
735 callback,
736 timer,
737 };
738 this.connectivityStateWatchers.push(watcherObject);
739 }
740
741 /**
742 * Get the channelz reference object for this channel. The returned value is
743 * garbage if channelz is disabled for this channel.
744 * @returns
745 */
746 getChannelzRef() {
747 return this.channelzRef;
748 }
749
750 createCall(
751 method: string,
752 deadline: Deadline,
753 host: string | null | undefined,
754 parentCall: ServerSurfaceCall | null,
755 propagateFlags: number | null | undefined
756 ): Call {
757 if (typeof method !== 'string') {
758 throw new TypeError('Channel#createCall: method must be a string');
759 }
760 if (!(typeof deadline === 'number' || deadline instanceof Date)) {
761 throw new TypeError(
762 'Channel#createCall: deadline must be a number or Date'
763 );
764 }
765 if (this.connectivityState === ConnectivityState.SHUTDOWN) {
766 throw new Error('Channel has been shut down');
767 }
768 const callNumber = getNewCallNumber();
769 this.trace(
770 'createCall [' +
771 callNumber +
772 '] method="' +
773 method +
774 '", deadline=' +
775 deadline
776 );
777 const finalOptions: CallStreamOptions = {
778 deadline: deadline,
779 flags: propagateFlags ?? Propagate.DEFAULTS,
780 host: host ?? this.defaultAuthority,
781 parentCall: parentCall,
782 };
783 const stream: Http2CallStream = new Http2CallStream(
784 method,
785 this,
786 finalOptions,
787 this.filterStackFactory,
788 this.credentials._getCallCredentials(),
789 callNumber
790 );
791 if (this.channelzEnabled) {
792 this.callTracker.addCallStarted();
793 stream.addStatusWatcher(status => {
794 if (status.code === Status.OK) {
795 this.callTracker.addCallSucceeded();
796 } else {
797 this.callTracker.addCallFailed();
798 }
799 });
800 }
801 return stream;
802 }
803}