UNPKG

25.5 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { ChannelCredentials } from './channel-credentials';
19import { ChannelOptions } from './channel-options';
20import { ResolvingLoadBalancer } from './resolving-load-balancer';
21import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
22import { ChannelControlHelper } from './load-balancer';
23import { UnavailablePicker, Picker, QueuePicker } from './picker';
24import { Metadata } from './metadata';
25import { Status, LogVerbosity, Propagate } from './constants';
26import { FilterStackFactory } from './filter-stack';
27import { CompressionFilterFactory } from './compression-filter';
28import {
29 CallConfig,
30 ConfigSelector,
31 getDefaultAuthority,
32 mapUriDefaultScheme,
33} from './resolver';
34import { trace } from './logging';
35import { SubchannelAddress } from './subchannel-address';
36import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
37import { mapProxyName } from './http_proxy';
38import { GrpcUri, parseUri, uriToString } from './uri-parser';
39import { ServerSurfaceCall } from './server-call';
40
41import { ConnectivityState } from './connectivity-state';
42import {
43 ChannelInfo,
44 ChannelRef,
45 ChannelzCallTracker,
46 ChannelzChildrenTracker,
47 ChannelzTrace,
48 registerChannelzChannel,
49 SubchannelRef,
50 unregisterChannelzRef,
51} from './channelz';
52import { LoadBalancingCall } from './load-balancing-call';
53import { CallCredentials } from './call-credentials';
54import { Call, CallStreamOptions, StatusObject } from './call-interface';
55import { Deadline, deadlineToString } from './deadline';
56import { ResolvingCall } from './resolving-call';
57import { getNextCallNumber } from './call-number';
58import { restrictControlPlaneStatusCode } from './control-plane-status';
59import {
60 MessageBufferTracker,
61 RetryingCall,
62 RetryThrottler,
63} from './retrying-call';
64import {
65 BaseSubchannelWrapper,
66 ConnectivityStateListener,
67 SubchannelInterface,
68} from './subchannel-interface';
69
70/**
71 * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
72 */
73const MAX_TIMEOUT_TIME = 2147483647;
74
75const MIN_IDLE_TIMEOUT_MS = 1000;
76
77// 30 minutes
78const DEFAULT_IDLE_TIMEOUT_MS = 30 * 60 * 1000;
79
80interface ConnectivityStateWatcher {
81 currentState: ConnectivityState;
82 timer: NodeJS.Timeout | null;
83 callback: (error?: Error) => void;
84}
85
86interface NoneConfigResult {
87 type: 'NONE';
88}
89
90interface SuccessConfigResult {
91 type: 'SUCCESS';
92 config: CallConfig;
93}
94
95interface ErrorConfigResult {
96 type: 'ERROR';
97 error: StatusObject;
98}
99
100type GetConfigResult =
101 | NoneConfigResult
102 | SuccessConfigResult
103 | ErrorConfigResult;
104
105const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();
106
107const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1 << 24; // 16 MB
108const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1 << 20; // 1 MB
109
110class ChannelSubchannelWrapper
111 extends BaseSubchannelWrapper
112 implements SubchannelInterface
113{
114 private refCount = 0;
115 private subchannelStateListener: ConnectivityStateListener;
116 constructor(
117 childSubchannel: SubchannelInterface,
118 private channel: InternalChannel
119 ) {
120 super(childSubchannel);
121 this.subchannelStateListener = (
122 subchannel,
123 previousState,
124 newState,
125 keepaliveTime
126 ) => {
127 channel.throttleKeepalive(keepaliveTime);
128 };
129 childSubchannel.addConnectivityStateListener(this.subchannelStateListener);
130 }
131
132 ref(): void {
133 this.child.ref();
134 this.refCount += 1;
135 }
136
137 unref(): void {
138 this.child.unref();
139 this.refCount -= 1;
140 if (this.refCount <= 0) {
141 this.child.removeConnectivityStateListener(this.subchannelStateListener);
142 this.channel.removeWrappedSubchannel(this);
143 }
144 }
145}
146
147export class InternalChannel {
148 private readonly resolvingLoadBalancer: ResolvingLoadBalancer;
149 private readonly subchannelPool: SubchannelPool;
150 private connectivityState: ConnectivityState = ConnectivityState.IDLE;
151 private currentPicker: Picker = new UnavailablePicker();
152 /**
153 * Calls queued up to get a call config. Should only be populated before the
154 * first time the resolver returns a result, which includes the ConfigSelector.
155 */
156 private configSelectionQueue: ResolvingCall[] = [];
157 private pickQueue: LoadBalancingCall[] = [];
158 private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
159 private readonly defaultAuthority: string;
160 private readonly filterStackFactory: FilterStackFactory;
161 private readonly target: GrpcUri;
162 /**
163 * This timer does not do anything on its own. Its purpose is to hold the
164 * event loop open while there are any pending calls for the channel that
165 * have not yet been assigned to specific subchannels. In other words,
166 * the invariant is that callRefTimer is reffed if and only if pickQueue
167 * is non-empty.
168 */
169 private readonly callRefTimer: NodeJS.Timeout;
170 private configSelector: ConfigSelector | null = null;
171 /**
172 * This is the error from the name resolver if it failed most recently. It
173 * is only used to end calls that start while there is no config selector
174 * and the name resolver is in backoff, so it should be nulled if
175 * configSelector becomes set or the channel state becomes anything other
176 * than TRANSIENT_FAILURE.
177 */
178 private currentResolutionError: StatusObject | null = null;
179 private readonly retryBufferTracker: MessageBufferTracker;
180 private keepaliveTime: number;
181 private readonly wrappedSubchannels: Set<ChannelSubchannelWrapper> =
182 new Set();
183
184 private callCount = 0;
185 private idleTimer: NodeJS.Timeout | null = null;
186 private readonly idleTimeoutMs: number;
187 private lastActivityTimestamp: Date;
188
189 // Channelz info
190 private readonly channelzEnabled: boolean = true;
191 private readonly originalTarget: string;
192 private readonly channelzRef: ChannelRef;
193 private readonly channelzTrace: ChannelzTrace;
194 private readonly callTracker = new ChannelzCallTracker();
195 private readonly childrenTracker = new ChannelzChildrenTracker();
196
197 /**
198 * Randomly generated ID to be passed to the config selector, for use by
199 * ring_hash in xDS. An integer distributed approximately uniformly between
200 * 0 and MAX_SAFE_INTEGER.
201 */
202 private readonly randomChannelId = Math.floor(
203 Math.random() * Number.MAX_SAFE_INTEGER
204 );
205
206 constructor(
207 target: string,
208 private readonly credentials: ChannelCredentials,
209 private readonly options: ChannelOptions
210 ) {
211 if (typeof target !== 'string') {
212 throw new TypeError('Channel target must be a string');
213 }
214 if (!(credentials instanceof ChannelCredentials)) {
215 throw new TypeError(
216 'Channel credentials must be a ChannelCredentials object'
217 );
218 }
219 if (options) {
220 if (typeof options !== 'object') {
221 throw new TypeError('Channel options must be an object');
222 }
223 }
224 this.originalTarget = target;
225 const originalTargetUri = parseUri(target);
226 if (originalTargetUri === null) {
227 throw new Error(`Could not parse target name "${target}"`);
228 }
229 /* This ensures that the target has a scheme that is registered with the
230 * resolver */
231 const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri);
232 if (defaultSchemeMapResult === null) {
233 throw new Error(
234 `Could not find a default scheme for target name "${target}"`
235 );
236 }
237
238 this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
239 this.callRefTimer.unref?.();
240
241 if (this.options['grpc.enable_channelz'] === 0) {
242 this.channelzEnabled = false;
243 }
244
245 this.channelzTrace = new ChannelzTrace();
246 this.channelzRef = registerChannelzChannel(
247 target,
248 () => this.getChannelzInfo(),
249 this.channelzEnabled
250 );
251 if (this.channelzEnabled) {
252 this.channelzTrace.addTrace('CT_INFO', 'Channel created');
253 }
254
255 if (this.options['grpc.default_authority']) {
256 this.defaultAuthority = this.options['grpc.default_authority'] as string;
257 } else {
258 this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult);
259 }
260 const proxyMapResult = mapProxyName(defaultSchemeMapResult, options);
261 this.target = proxyMapResult.target;
262 this.options = Object.assign({}, this.options, proxyMapResult.extraOptions);
263
264 /* The global boolean parameter to getSubchannelPool has the inverse meaning to what
265 * the grpc.use_local_subchannel_pool channel option means. */
266 this.subchannelPool = getSubchannelPool(
267 (options['grpc.use_local_subchannel_pool'] ?? 0) === 0
268 );
269 this.retryBufferTracker = new MessageBufferTracker(
270 options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
271 options['grpc.per_rpc_retry_buffer_size'] ??
272 DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
273 );
274 this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
275 this.idleTimeoutMs = Math.max(
276 options['grpc.client_idle_timeout_ms'] ?? DEFAULT_IDLE_TIMEOUT_MS,
277 MIN_IDLE_TIMEOUT_MS
278 );
279 const channelControlHelper: ChannelControlHelper = {
280 createSubchannel: (
281 subchannelAddress: SubchannelAddress,
282 subchannelArgs: ChannelOptions
283 ) => {
284 const subchannel = this.subchannelPool.getOrCreateSubchannel(
285 this.target,
286 subchannelAddress,
287 Object.assign({}, this.options, subchannelArgs),
288 this.credentials
289 );
290 subchannel.throttleKeepalive(this.keepaliveTime);
291 if (this.channelzEnabled) {
292 this.channelzTrace.addTrace(
293 'CT_INFO',
294 'Created subchannel or used existing subchannel',
295 subchannel.getChannelzRef()
296 );
297 }
298 const wrappedSubchannel = new ChannelSubchannelWrapper(
299 subchannel,
300 this
301 );
302 this.wrappedSubchannels.add(wrappedSubchannel);
303 return wrappedSubchannel;
304 },
305 updateState: (connectivityState: ConnectivityState, picker: Picker) => {
306 this.currentPicker = picker;
307 const queueCopy = this.pickQueue.slice();
308 this.pickQueue = [];
309 if (queueCopy.length > 0) {
310 this.callRefTimerUnref();
311 }
312 for (const call of queueCopy) {
313 call.doPick();
314 }
315 this.updateState(connectivityState);
316 },
317 requestReresolution: () => {
318 // This should never be called.
319 throw new Error(
320 'Resolving load balancer should never call requestReresolution'
321 );
322 },
323 addChannelzChild: (child: ChannelRef | SubchannelRef) => {
324 if (this.channelzEnabled) {
325 this.childrenTracker.refChild(child);
326 }
327 },
328 removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
329 if (this.channelzEnabled) {
330 this.childrenTracker.unrefChild(child);
331 }
332 },
333 };
334 this.resolvingLoadBalancer = new ResolvingLoadBalancer(
335 this.target,
336 channelControlHelper,
337 options,
338 (serviceConfig, configSelector) => {
339 if (serviceConfig.retryThrottling) {
340 RETRY_THROTTLER_MAP.set(
341 this.getTarget(),
342 new RetryThrottler(
343 serviceConfig.retryThrottling.maxTokens,
344 serviceConfig.retryThrottling.tokenRatio,
345 RETRY_THROTTLER_MAP.get(this.getTarget())
346 )
347 );
348 } else {
349 RETRY_THROTTLER_MAP.delete(this.getTarget());
350 }
351 if (this.channelzEnabled) {
352 this.channelzTrace.addTrace(
353 'CT_INFO',
354 'Address resolution succeeded'
355 );
356 }
357 this.configSelector = configSelector;
358 this.currentResolutionError = null;
359 /* We process the queue asynchronously to ensure that the corresponding
360 * load balancer update has completed. */
361 process.nextTick(() => {
362 const localQueue = this.configSelectionQueue;
363 this.configSelectionQueue = [];
364 if (localQueue.length > 0) {
365 this.callRefTimerUnref();
366 }
367 for (const call of localQueue) {
368 call.getConfig();
369 }
370 });
371 },
372 status => {
373 if (this.channelzEnabled) {
374 this.channelzTrace.addTrace(
375 'CT_WARNING',
376 'Address resolution failed with code ' +
377 status.code +
378 ' and details "' +
379 status.details +
380 '"'
381 );
382 }
383 if (this.configSelectionQueue.length > 0) {
384 this.trace(
385 'Name resolution failed with calls queued for config selection'
386 );
387 }
388 if (this.configSelector === null) {
389 this.currentResolutionError = {
390 ...restrictControlPlaneStatusCode(status.code, status.details),
391 metadata: status.metadata,
392 };
393 }
394 const localQueue = this.configSelectionQueue;
395 this.configSelectionQueue = [];
396 if (localQueue.length > 0) {
397 this.callRefTimerUnref();
398 }
399 for (const call of localQueue) {
400 call.reportResolverError(status);
401 }
402 }
403 );
404 this.filterStackFactory = new FilterStackFactory([
405 new MaxMessageSizeFilterFactory(this.options),
406 new CompressionFilterFactory(this, this.options),
407 ]);
408 this.trace(
409 'Channel constructed with options ' +
410 JSON.stringify(options, undefined, 2)
411 );
412 const error = new Error();
413 trace(
414 LogVerbosity.DEBUG,
415 'channel_stacktrace',
416 '(' +
417 this.channelzRef.id +
418 ') ' +
419 'Channel constructed \n' +
420 error.stack?.substring(error.stack.indexOf('\n') + 1)
421 );
422 this.lastActivityTimestamp = new Date();
423 }
424
425 private getChannelzInfo(): ChannelInfo {
426 return {
427 target: this.originalTarget,
428 state: this.connectivityState,
429 trace: this.channelzTrace,
430 callTracker: this.callTracker,
431 children: this.childrenTracker.getChildLists(),
432 };
433 }
434
435 private trace(text: string, verbosityOverride?: LogVerbosity) {
436 trace(
437 verbosityOverride ?? LogVerbosity.DEBUG,
438 'channel',
439 '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text
440 );
441 }
442
443 private callRefTimerRef() {
444 // If the hasRef function does not exist, always run the code
445 if (!this.callRefTimer.hasRef?.()) {
446 this.trace(
447 'callRefTimer.ref | configSelectionQueue.length=' +
448 this.configSelectionQueue.length +
449 ' pickQueue.length=' +
450 this.pickQueue.length
451 );
452 this.callRefTimer.ref?.();
453 }
454 }
455
456 private callRefTimerUnref() {
457 // If the hasRef function does not exist, always run the code
458 if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
459 this.trace(
460 'callRefTimer.unref | configSelectionQueue.length=' +
461 this.configSelectionQueue.length +
462 ' pickQueue.length=' +
463 this.pickQueue.length
464 );
465 this.callRefTimer.unref?.();
466 }
467 }
468
469 private removeConnectivityStateWatcher(
470 watcherObject: ConnectivityStateWatcher
471 ) {
472 const watcherIndex = this.connectivityStateWatchers.findIndex(
473 value => value === watcherObject
474 );
475 if (watcherIndex >= 0) {
476 this.connectivityStateWatchers.splice(watcherIndex, 1);
477 }
478 }
479
480 private updateState(newState: ConnectivityState): void {
481 trace(
482 LogVerbosity.DEBUG,
483 'connectivity_state',
484 '(' +
485 this.channelzRef.id +
486 ') ' +
487 uriToString(this.target) +
488 ' ' +
489 ConnectivityState[this.connectivityState] +
490 ' -> ' +
491 ConnectivityState[newState]
492 );
493 if (this.channelzEnabled) {
494 this.channelzTrace.addTrace(
495 'CT_INFO',
496 'Connectivity state change to ' + ConnectivityState[newState]
497 );
498 }
499 this.connectivityState = newState;
500 const watchersCopy = this.connectivityStateWatchers.slice();
501 for (const watcherObject of watchersCopy) {
502 if (newState !== watcherObject.currentState) {
503 if (watcherObject.timer) {
504 clearTimeout(watcherObject.timer);
505 }
506 this.removeConnectivityStateWatcher(watcherObject);
507 watcherObject.callback();
508 }
509 }
510 if (newState !== ConnectivityState.TRANSIENT_FAILURE) {
511 this.currentResolutionError = null;
512 }
513 }
514
515 throttleKeepalive(newKeepaliveTime: number) {
516 if (newKeepaliveTime > this.keepaliveTime) {
517 this.keepaliveTime = newKeepaliveTime;
518 for (const wrappedSubchannel of this.wrappedSubchannels) {
519 wrappedSubchannel.throttleKeepalive(newKeepaliveTime);
520 }
521 }
522 }
523
524 removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
525 this.wrappedSubchannels.delete(wrappedSubchannel);
526 }
527
528 doPick(metadata: Metadata, extraPickInfo: { [key: string]: string }) {
529 return this.currentPicker.pick({
530 metadata: metadata,
531 extraPickInfo: extraPickInfo,
532 });
533 }
534
535 queueCallForPick(call: LoadBalancingCall) {
536 this.pickQueue.push(call);
537 this.callRefTimerRef();
538 }
539
540 getConfig(method: string, metadata: Metadata): GetConfigResult {
541 this.resolvingLoadBalancer.exitIdle();
542 if (this.configSelector) {
543 return {
544 type: 'SUCCESS',
545 config: this.configSelector(method, metadata, this.randomChannelId),
546 };
547 } else {
548 if (this.currentResolutionError) {
549 return {
550 type: 'ERROR',
551 error: this.currentResolutionError,
552 };
553 } else {
554 return {
555 type: 'NONE',
556 };
557 }
558 }
559 }
560
561 queueCallForConfig(call: ResolvingCall) {
562 this.configSelectionQueue.push(call);
563 this.callRefTimerRef();
564 }
565
566 private enterIdle() {
567 this.resolvingLoadBalancer.destroy();
568 this.updateState(ConnectivityState.IDLE);
569 this.currentPicker = new QueuePicker(this.resolvingLoadBalancer);
570 if (this.idleTimer) {
571 clearTimeout(this.idleTimer);
572 this.idleTimer = null;
573 }
574 }
575
576 private startIdleTimeout(timeoutMs: number) {
577 this.idleTimer = setTimeout(() => {
578 if (this.callCount > 0) {
579 /* If there is currently a call, the channel will not go idle for a
580 * period of at least idleTimeoutMs, so check again after that time.
581 */
582 this.startIdleTimeout(this.idleTimeoutMs);
583 return;
584 }
585 const now = new Date();
586 const timeSinceLastActivity =
587 now.valueOf() - this.lastActivityTimestamp.valueOf();
588 if (timeSinceLastActivity >= this.idleTimeoutMs) {
589 this.trace(
590 'Idle timer triggered after ' +
591 this.idleTimeoutMs +
592 'ms of inactivity'
593 );
594 this.enterIdle();
595 } else {
596 /* Whenever the timer fires with the latest activity being too recent,
597 * set the timer again for the time when the time since the last
598 * activity is equal to the timeout. This should result in the timer
599 * firing no more than once every idleTimeoutMs/2 on average. */
600 this.startIdleTimeout(this.idleTimeoutMs - timeSinceLastActivity);
601 }
602 }, timeoutMs);
603 this.idleTimer.unref?.();
604 }
605
606 private maybeStartIdleTimer() {
607 if (
608 this.connectivityState !== ConnectivityState.SHUTDOWN &&
609 !this.idleTimer
610 ) {
611 this.startIdleTimeout(this.idleTimeoutMs);
612 }
613 }
614
615 private onCallStart() {
616 if (this.channelzEnabled) {
617 this.callTracker.addCallStarted();
618 }
619 this.callCount += 1;
620 }
621
622 private onCallEnd(status: StatusObject) {
623 if (this.channelzEnabled) {
624 if (status.code === Status.OK) {
625 this.callTracker.addCallSucceeded();
626 } else {
627 this.callTracker.addCallFailed();
628 }
629 }
630 this.callCount -= 1;
631 this.lastActivityTimestamp = new Date();
632 this.maybeStartIdleTimer();
633 }
634
635 createLoadBalancingCall(
636 callConfig: CallConfig,
637 method: string,
638 host: string,
639 credentials: CallCredentials,
640 deadline: Deadline
641 ): LoadBalancingCall {
642 const callNumber = getNextCallNumber();
643 this.trace(
644 'createLoadBalancingCall [' + callNumber + '] method="' + method + '"'
645 );
646 return new LoadBalancingCall(
647 this,
648 callConfig,
649 method,
650 host,
651 credentials,
652 deadline,
653 callNumber
654 );
655 }
656
657 createRetryingCall(
658 callConfig: CallConfig,
659 method: string,
660 host: string,
661 credentials: CallCredentials,
662 deadline: Deadline
663 ): RetryingCall {
664 const callNumber = getNextCallNumber();
665 this.trace(
666 'createRetryingCall [' + callNumber + '] method="' + method + '"'
667 );
668 return new RetryingCall(
669 this,
670 callConfig,
671 method,
672 host,
673 credentials,
674 deadline,
675 callNumber,
676 this.retryBufferTracker,
677 RETRY_THROTTLER_MAP.get(this.getTarget())
678 );
679 }
680
681 createInnerCall(
682 callConfig: CallConfig,
683 method: string,
684 host: string,
685 credentials: CallCredentials,
686 deadline: Deadline
687 ): LoadBalancingCall | RetryingCall {
688 // Create a RetryingCall if retries are enabled
689 if (this.options['grpc.enable_retries'] === 0) {
690 return this.createLoadBalancingCall(
691 callConfig,
692 method,
693 host,
694 credentials,
695 deadline
696 );
697 } else {
698 return this.createRetryingCall(
699 callConfig,
700 method,
701 host,
702 credentials,
703 deadline
704 );
705 }
706 }
707
708 createResolvingCall(
709 method: string,
710 deadline: Deadline,
711 host: string | null | undefined,
712 parentCall: ServerSurfaceCall | null,
713 propagateFlags: number | null | undefined
714 ): ResolvingCall {
715 const callNumber = getNextCallNumber();
716 this.trace(
717 'createResolvingCall [' +
718 callNumber +
719 '] method="' +
720 method +
721 '", deadline=' +
722 deadlineToString(deadline)
723 );
724 const finalOptions: CallStreamOptions = {
725 deadline: deadline,
726 flags: propagateFlags ?? Propagate.DEFAULTS,
727 host: host ?? this.defaultAuthority,
728 parentCall: parentCall,
729 };
730
731 const call = new ResolvingCall(
732 this,
733 method,
734 finalOptions,
735 this.filterStackFactory.clone(),
736 this.credentials._getCallCredentials(),
737 callNumber
738 );
739
740 this.onCallStart();
741 call.addStatusWatcher(status => {
742 this.onCallEnd(status);
743 });
744 return call;
745 }
746
747 close() {
748 this.resolvingLoadBalancer.destroy();
749 this.updateState(ConnectivityState.SHUTDOWN);
750 clearInterval(this.callRefTimer);
751 if (this.idleTimer) {
752 clearTimeout(this.idleTimer);
753 }
754 if (this.channelzEnabled) {
755 unregisterChannelzRef(this.channelzRef);
756 }
757
758 this.subchannelPool.unrefUnusedSubchannels();
759 }
760
761 getTarget() {
762 return uriToString(this.target);
763 }
764
765 getConnectivityState(tryToConnect: boolean) {
766 const connectivityState = this.connectivityState;
767 if (tryToConnect) {
768 this.resolvingLoadBalancer.exitIdle();
769 this.lastActivityTimestamp = new Date();
770 this.maybeStartIdleTimer();
771 }
772 return connectivityState;
773 }
774
775 watchConnectivityState(
776 currentState: ConnectivityState,
777 deadline: Date | number,
778 callback: (error?: Error) => void
779 ): void {
780 if (this.connectivityState === ConnectivityState.SHUTDOWN) {
781 throw new Error('Channel has been shut down');
782 }
783 let timer = null;
784 if (deadline !== Infinity) {
785 const deadlineDate: Date =
786 deadline instanceof Date ? deadline : new Date(deadline);
787 const now = new Date();
788 if (deadline === -Infinity || deadlineDate <= now) {
789 process.nextTick(
790 callback,
791 new Error('Deadline passed without connectivity state change')
792 );
793 return;
794 }
795 timer = setTimeout(() => {
796 this.removeConnectivityStateWatcher(watcherObject);
797 callback(
798 new Error('Deadline passed without connectivity state change')
799 );
800 }, deadlineDate.getTime() - now.getTime());
801 }
802 const watcherObject = {
803 currentState,
804 callback,
805 timer,
806 };
807 this.connectivityStateWatchers.push(watcherObject);
808 }
809
810 /**
811 * Get the channelz reference object for this channel. The returned value is
812 * garbage if channelz is disabled for this channel.
813 * @returns
814 */
815 getChannelzRef() {
816 return this.channelzRef;
817 }
818
819 createCall(
820 method: string,
821 deadline: Deadline,
822 host: string | null | undefined,
823 parentCall: ServerSurfaceCall | null,
824 propagateFlags: number | null | undefined
825 ): Call {
826 if (typeof method !== 'string') {
827 throw new TypeError('Channel#createCall: method must be a string');
828 }
829 if (!(typeof deadline === 'number' || deadline instanceof Date)) {
830 throw new TypeError(
831 'Channel#createCall: deadline must be a number or Date'
832 );
833 }
834 if (this.connectivityState === ConnectivityState.SHUTDOWN) {
835 throw new Error('Channel has been shut down');
836 }
837 return this.createResolvingCall(
838 method,
839 deadline,
840 host,
841 parentCall,
842 propagateFlags
843 );
844 }
845}