UNPKG

31.2 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import {
19 Deadline,
20 Call,
21 Http2CallStream,
22 CallStreamOptions,
23 StatusObject,
24} from './call-stream';
25import { ChannelCredentials } from './channel-credentials';
26import { ChannelOptions } from './channel-options';
27import { ResolvingLoadBalancer } from './resolving-load-balancer';
28import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
29import { ChannelControlHelper } from './load-balancer';
30import { UnavailablePicker, Picker, PickResultType } from './picker';
31import { Metadata } from './metadata';
32import { Status, LogVerbosity, Propagate } from './constants';
33import { FilterStackFactory } from './filter-stack';
34import { CallCredentialsFilterFactory } from './call-credentials-filter';
35import { DeadlineFilterFactory } from './deadline-filter';
36import { CompressionFilterFactory } from './compression-filter';
37import {
38 CallConfig,
39 ConfigSelector,
40 getDefaultAuthority,
41 mapUriDefaultScheme,
42} from './resolver';
43import { trace, log } from './logging';
44import { SubchannelAddress } from './subchannel-address';
45import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
46import { mapProxyName } from './http_proxy';
47import { GrpcUri, parseUri, uriToString } from './uri-parser';
48import { ServerSurfaceCall } from './server-call';
49import { Filter } from './filter';
50
51import { ConnectivityState } from './connectivity-state';
52import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz';
53import { Subchannel } from './subchannel';
54
55/**
56 * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
57 */
58const MAX_TIMEOUT_TIME = 2147483647;
59
60let nextCallNumber = 0;
61
62function getNewCallNumber(): number {
63 const callNumber = nextCallNumber;
64 nextCallNumber += 1;
65 if (nextCallNumber >= Number.MAX_SAFE_INTEGER) {
66 nextCallNumber = 0;
67 }
68 return callNumber;
69}
70
71const INAPPROPRIATE_CONTROL_PLANE_CODES: Status[] = [
72 Status.OK,
73 Status.INVALID_ARGUMENT,
74 Status.NOT_FOUND,
75 Status.ALREADY_EXISTS,
76 Status.FAILED_PRECONDITION,
77 Status.ABORTED,
78 Status.OUT_OF_RANGE,
79 Status.DATA_LOSS
80]
81
82function restrictControlPlaneStatusCode(code: Status, details: string): {code: Status, details: string} {
83 if (INAPPROPRIATE_CONTROL_PLANE_CODES.includes(code)) {
84 return {
85 code: Status.INTERNAL,
86 details: `Invalid status from control plane: ${code} ${Status[code]} ${details}`
87 }
88 } else {
89 return {code, details};
90 }
91}
92
93/**
94 * An interface that represents a communication channel to a server specified
95 * by a given address.
96 */
97export interface Channel {
98 /**
99 * Close the channel. This has the same functionality as the existing
100 * grpc.Client.prototype.close
101 */
102 close(): void;
103 /**
104 * Return the target that this channel connects to
105 */
106 getTarget(): string;
107 /**
108 * Get the channel's current connectivity state. This method is here mainly
109 * because it is in the existing internal Channel class, and there isn't
110 * another good place to put it.
111 * @param tryToConnect If true, the channel will start connecting if it is
112 * idle. Otherwise, idle channels will only start connecting when a
113 * call starts.
114 */
115 getConnectivityState(tryToConnect: boolean): ConnectivityState;
116 /**
117 * Watch for connectivity state changes. This is also here mainly because
118 * it is in the existing external Channel class.
119 * @param currentState The state to watch for transitions from. This should
120 * always be populated by calling getConnectivityState immediately
121 * before.
122 * @param deadline A deadline for waiting for a state change
123 * @param callback Called with no error when a state change, or with an
124 * error if the deadline passes without a state change.
125 */
126 watchConnectivityState(
127 currentState: ConnectivityState,
128 deadline: Date | number,
129 callback: (error?: Error) => void
130 ): void;
131 /**
132 * Get the channelz reference object for this channel. A request to the
133 * channelz service for the id in this object will provide information
134 * about this channel.
135 */
136 getChannelzRef(): ChannelRef;
137 /**
138 * Create a call object. Call is an opaque type that is used by the Client
139 * class. This function is called by the gRPC library when starting a
140 * request. Implementers should return an instance of Call that is returned
141 * from calling createCall on an instance of the provided Channel class.
142 * @param method The full method string to request.
143 * @param deadline The call deadline
144 * @param host A host string override for making the request
145 * @param parentCall A server call to propagate some information from
146 * @param propagateFlags A bitwise combination of elements of grpc.propagate
147 * that indicates what information to propagate from parentCall.
148 */
149 createCall(
150 method: string,
151 deadline: Deadline,
152 host: string | null | undefined,
153 parentCall: ServerSurfaceCall | null,
154 propagateFlags: number | null | undefined
155 ): Call;
156}
157
158interface ConnectivityStateWatcher {
159 currentState: ConnectivityState;
160 timer: NodeJS.Timeout | null;
161 callback: (error?: Error) => void;
162}
163
164export class ChannelImplementation implements Channel {
165 private resolvingLoadBalancer: ResolvingLoadBalancer;
166 private subchannelPool: SubchannelPool;
167 private connectivityState: ConnectivityState = ConnectivityState.IDLE;
168 private currentPicker: Picker = new UnavailablePicker();
169 /**
170 * Calls queued up to get a call config. Should only be populated before the
171 * first time the resolver returns a result, which includes the ConfigSelector.
172 */
173 private configSelectionQueue: Array<{
174 callStream: Http2CallStream;
175 callMetadata: Metadata;
176 }> = [];
177 private pickQueue: Array<{
178 callStream: Http2CallStream;
179 callMetadata: Metadata;
180 callConfig: CallConfig;
181 dynamicFilters: Filter[];
182 }> = [];
183 private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
184 private defaultAuthority: string;
185 private filterStackFactory: FilterStackFactory;
186 private target: GrpcUri;
187 /**
188 * This timer does not do anything on its own. Its purpose is to hold the
189 * event loop open while there are any pending calls for the channel that
190 * have not yet been assigned to specific subchannels. In other words,
191 * the invariant is that callRefTimer is reffed if and only if pickQueue
192 * is non-empty.
193 */
194 private callRefTimer: NodeJS.Timer;
195 private configSelector: ConfigSelector | null = null;
196 /**
197 * This is the error from the name resolver if it failed most recently. It
198 * is only used to end calls that start while there is no config selector
199 * and the name resolver is in backoff, so it should be nulled if
200 * configSelector becomes set or the channel state becomes anything other
201 * than TRANSIENT_FAILURE.
202 */
203 private currentResolutionError: StatusObject | null = null;
204
205 // Channelz info
206 private readonly channelzEnabled: boolean = true;
207 private originalTarget: string;
208 private channelzRef: ChannelRef;
209 private channelzTrace: ChannelzTrace;
210 private callTracker = new ChannelzCallTracker();
211 private childrenTracker = new ChannelzChildrenTracker();
212
213 constructor(
214 target: string,
215 private readonly credentials: ChannelCredentials,
216 private readonly options: ChannelOptions
217 ) {
218 if (typeof target !== 'string') {
219 throw new TypeError('Channel target must be a string');
220 }
221 if (!(credentials instanceof ChannelCredentials)) {
222 throw new TypeError(
223 'Channel credentials must be a ChannelCredentials object'
224 );
225 }
226 if (options) {
227 if (typeof options !== 'object') {
228 throw new TypeError('Channel options must be an object');
229 }
230 }
231 this.originalTarget = target;
232 const originalTargetUri = parseUri(target);
233 if (originalTargetUri === null) {
234 throw new Error(`Could not parse target name "${target}"`);
235 }
236 /* This ensures that the target has a scheme that is registered with the
237 * resolver */
238 const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri);
239 if (defaultSchemeMapResult === null) {
240 throw new Error(
241 `Could not find a default scheme for target name "${target}"`
242 );
243 }
244
245 this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
246 this.callRefTimer.unref?.();
247
248 if (this.options['grpc.enable_channelz'] === 0) {
249 this.channelzEnabled = false;
250 }
251
252 this.channelzTrace = new ChannelzTrace();
253 this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled);
254 if (this.channelzEnabled) {
255 this.channelzTrace.addTrace('CT_INFO', 'Channel created');
256 }
257
258 if (this.options['grpc.default_authority']) {
259 this.defaultAuthority = this.options['grpc.default_authority'] as string;
260 } else {
261 this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult);
262 }
263 const proxyMapResult = mapProxyName(defaultSchemeMapResult, options);
264 this.target = proxyMapResult.target;
265 this.options = Object.assign({}, this.options, proxyMapResult.extraOptions);
266
267 /* The global boolean parameter to getSubchannelPool has the inverse meaning to what
268 * the grpc.use_local_subchannel_pool channel option means. */
269 this.subchannelPool = getSubchannelPool(
270 (options['grpc.use_local_subchannel_pool'] ?? 0) === 0
271 );
272 const channelControlHelper: ChannelControlHelper = {
273 createSubchannel: (
274 subchannelAddress: SubchannelAddress,
275 subchannelArgs: ChannelOptions
276 ) => {
277 const subchannel = this.subchannelPool.getOrCreateSubchannel(
278 this.target,
279 subchannelAddress,
280 Object.assign({}, this.options, subchannelArgs),
281 this.credentials
282 );
283 if (this.channelzEnabled) {
284 this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
285 }
286 return subchannel;
287 },
288 updateState: (connectivityState: ConnectivityState, picker: Picker) => {
289 this.currentPicker = picker;
290 const queueCopy = this.pickQueue.slice();
291 this.pickQueue = [];
292 this.callRefTimerUnref();
293 for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) {
294 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
295 }
296 this.updateState(connectivityState);
297 },
298 requestReresolution: () => {
299 // This should never be called.
300 throw new Error(
301 'Resolving load balancer should never call requestReresolution'
302 );
303 },
304 addChannelzChild: (child: ChannelRef | SubchannelRef) => {
305 if (this.channelzEnabled) {
306 this.childrenTracker.refChild(child);
307 }
308 },
309 removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
310 if (this.channelzEnabled) {
311 this.childrenTracker.unrefChild(child);
312 }
313 }
314 };
315 this.resolvingLoadBalancer = new ResolvingLoadBalancer(
316 this.target,
317 channelControlHelper,
318 options,
319 (configSelector) => {
320 if (this.channelzEnabled) {
321 this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
322 }
323 this.configSelector = configSelector;
324 this.currentResolutionError = null;
325 /* We process the queue asynchronously to ensure that the corresponding
326 * load balancer update has completed. */
327 process.nextTick(() => {
328 const localQueue = this.configSelectionQueue;
329 this.configSelectionQueue = [];
330 this.callRefTimerUnref();
331 for (const { callStream, callMetadata } of localQueue) {
332 this.tryGetConfig(callStream, callMetadata);
333 }
334 this.configSelectionQueue = [];
335 });
336 },
337 (status) => {
338 if (this.channelzEnabled) {
339 this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
340 }
341 if (this.configSelectionQueue.length > 0) {
342 this.trace('Name resolution failed with calls queued for config selection');
343 }
344 if (this.configSelector === null) {
345 this.currentResolutionError = {...restrictControlPlaneStatusCode(status.code, status.details), metadata: status.metadata};
346 }
347 const localQueue = this.configSelectionQueue;
348 this.configSelectionQueue = [];
349 this.callRefTimerUnref();
350 for (const { callStream, callMetadata } of localQueue) {
351 if (callMetadata.getOptions().waitForReady) {
352 this.callRefTimerRef();
353 this.configSelectionQueue.push({ callStream, callMetadata });
354 } else {
355 callStream.cancelWithStatus(status.code, status.details);
356 }
357 }
358 }
359 );
360 this.filterStackFactory = new FilterStackFactory([
361 new CallCredentialsFilterFactory(this),
362 new DeadlineFilterFactory(this),
363 new MaxMessageSizeFilterFactory(this.options),
364 new CompressionFilterFactory(this, this.options),
365 ]);
366 this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
367 const error = new Error();
368 trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1));
369 }
370
371 private getChannelzInfo(): ChannelInfo {
372 return {
373 target: this.originalTarget,
374 state: this.connectivityState,
375 trace: this.channelzTrace,
376 callTracker: this.callTracker,
377 children: this.childrenTracker.getChildLists()
378 };
379 }
380
381 private trace(text: string, verbosityOverride?: LogVerbosity) {
382 trace(verbosityOverride ?? LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text);
383 }
384
385 private callRefTimerRef() {
386 // If the hasRef function does not exist, always run the code
387 if (!this.callRefTimer.hasRef?.()) {
388 this.trace(
389 'callRefTimer.ref | configSelectionQueue.length=' +
390 this.configSelectionQueue.length +
391 ' pickQueue.length=' +
392 this.pickQueue.length
393 );
394 this.callRefTimer.ref?.();
395 }
396 }
397
398 private callRefTimerUnref() {
399 // If the hasRef function does not exist, always run the code
400 if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
401 this.trace(
402 'callRefTimer.unref | configSelectionQueue.length=' +
403 this.configSelectionQueue.length +
404 ' pickQueue.length=' +
405 this.pickQueue.length
406 );
407 this.callRefTimer.unref?.();
408 }
409 }
410
411 private pushPick(
412 callStream: Http2CallStream,
413 callMetadata: Metadata,
414 callConfig: CallConfig,
415 dynamicFilters: Filter[]
416 ) {
417 this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters });
418 this.callRefTimerRef();
419 }
420
421 /**
422 * Check the picker output for the given call and corresponding metadata,
423 * and take any relevant actions. Should not be called while iterating
424 * over pickQueue.
425 * @param callStream
426 * @param callMetadata
427 */
428 private tryPick(
429 callStream: Http2CallStream,
430 callMetadata: Metadata,
431 callConfig: CallConfig,
432 dynamicFilters: Filter[]
433 ) {
434 const pickResult = this.currentPicker.pick({
435 metadata: callMetadata,
436 extraPickInfo: callConfig.pickInformation,
437 });
438 const subchannelString = pickResult.subchannel ?
439 '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
440 '' + pickResult.subchannel;
441 this.trace(
442 'Pick result for call [' +
443 callStream.getCallNumber() +
444 ']: ' +
445 PickResultType[pickResult.pickResultType] +
446 ' subchannel: ' +
447 subchannelString +
448 ' status: ' +
449 pickResult.status?.code +
450 ' ' +
451 pickResult.status?.details
452 );
453 switch (pickResult.pickResultType) {
454 case PickResultType.COMPLETE:
455 if (pickResult.subchannel === null) {
456 callStream.cancelWithStatus(
457 Status.UNAVAILABLE,
458 'Request dropped by load balancing policy'
459 );
460 // End the call with an error
461 } else {
462 /* If the subchannel is not in the READY state, that indicates a bug
463 * somewhere in the load balancer or picker. So, we log an error and
464 * queue the pick to be tried again later. */
465 if (
466 pickResult.subchannel!.getConnectivityState() !==
467 ConnectivityState.READY
468 ) {
469 log(
470 LogVerbosity.ERROR,
471 'Error: COMPLETE pick result subchannel ' +
472 subchannelString +
473 ' has state ' +
474 ConnectivityState[pickResult.subchannel!.getConnectivityState()]
475 );
476 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
477 break;
478 }
479 /* We need to clone the callMetadata here because the transparent
480 * retry code in the promise resolution handler use the same
481 * callMetadata object, so it needs to stay unmodified */
482 callStream.filterStack
483 .sendMetadata(Promise.resolve(callMetadata.clone()))
484 .then(
485 (finalMetadata) => {
486 const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState();
487 if (subchannelState === ConnectivityState.READY) {
488 try {
489 const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
490 pickResult.subchannel?.getRealSubchannel().startCallStream(
491 finalMetadata,
492 callStream,
493 [...dynamicFilters, ...pickExtraFilters]
494 );
495 /* If we reach this point, the call stream has started
496 * successfully */
497 callConfig.onCommitted?.();
498 pickResult.onCallStarted?.();
499 } catch (error) {
500 const errorCode = (error as NodeJS.ErrnoException).code;
501 if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' ||
502 errorCode === 'ERR_HTTP2_INVALID_SESSION'
503 ) {
504 /* An error here indicates that something went wrong with
505 * the picked subchannel's http2 stream right before we
506 * tried to start the stream. We are handling a promise
507 * result here, so this is asynchronous with respect to the
508 * original tryPick call, so calling it again is not
509 * recursive. We call tryPick immediately instead of
510 * queueing this pick again because handling the queue is
511 * triggered by state changes, and we want to immediately
512 * check if the state has already changed since the
513 * previous tryPick call. We do this instead of cancelling
514 * the stream because the correct behavior may be
515 * re-queueing instead, based on the logic in the rest of
516 * tryPick */
517 this.trace(
518 'Failed to start call on picked subchannel ' +
519 subchannelString +
520 ' with error ' +
521 (error as Error).message +
522 '. Retrying pick',
523 LogVerbosity.INFO
524 );
525 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
526 } else {
527 this.trace(
528 'Failed to start call on picked subchanel ' +
529 subchannelString +
530 ' with error ' +
531 (error as Error).message +
532 '. Ending call',
533 LogVerbosity.INFO
534 );
535 callStream.cancelWithStatus(
536 Status.INTERNAL,
537 `Failed to start HTTP/2 stream with error: ${
538 (error as Error).message
539 }`
540 );
541 }
542 }
543 } else {
544 /* The logic for doing this here is the same as in the catch
545 * block above */
546 this.trace(
547 'Picked subchannel ' +
548 subchannelString +
549 ' has state ' +
550 ConnectivityState[subchannelState] +
551 ' after metadata filters. Retrying pick',
552 LogVerbosity.INFO
553 );
554 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
555 }
556 },
557 (error: Error & { code: number }) => {
558 // We assume the error code isn't 0 (Status.OK)
559 const {code, details} = restrictControlPlaneStatusCode(
560 typeof error.code === 'number' ? error.code : Status.UNKNOWN,
561 `Getting metadata from plugin failed with error: ${error.message}`
562 )
563 callStream.cancelWithStatus(code, details);
564 }
565 );
566 }
567 break;
568 case PickResultType.QUEUE:
569 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
570 break;
571 case PickResultType.TRANSIENT_FAILURE:
572 if (callMetadata.getOptions().waitForReady) {
573 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
574 } else {
575 const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
576 callStream.cancelWithStatus(code, details);
577 }
578 break;
579 case PickResultType.DROP:
580 const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
581 callStream.cancelWithStatus(code, details);
582 break;
583 default:
584 throw new Error(
585 `Invalid state: unknown pickResultType ${pickResult.pickResultType}`
586 );
587 }
588 }
589
590 private removeConnectivityStateWatcher(
591 watcherObject: ConnectivityStateWatcher
592 ) {
593 const watcherIndex = this.connectivityStateWatchers.findIndex(
594 (value) => value === watcherObject
595 );
596 if (watcherIndex >= 0) {
597 this.connectivityStateWatchers.splice(watcherIndex, 1);
598 }
599 }
600
601 private updateState(newState: ConnectivityState): void {
602 trace(
603 LogVerbosity.DEBUG,
604 'connectivity_state',
605 '(' + this.channelzRef.id + ') ' +
606 uriToString(this.target) +
607 ' ' +
608 ConnectivityState[this.connectivityState] +
609 ' -> ' +
610 ConnectivityState[newState]
611 );
612 if (this.channelzEnabled) {
613 this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
614 }
615 this.connectivityState = newState;
616 const watchersCopy = this.connectivityStateWatchers.slice();
617 for (const watcherObject of watchersCopy) {
618 if (newState !== watcherObject.currentState) {
619 if (watcherObject.timer) {
620 clearTimeout(watcherObject.timer);
621 }
622 this.removeConnectivityStateWatcher(watcherObject);
623 watcherObject.callback();
624 }
625 }
626 if (newState !== ConnectivityState.TRANSIENT_FAILURE) {
627 this.currentResolutionError = null;
628 }
629 }
630
631 private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
632 if (stream.getStatus() !== null) {
633 /* If the stream has a status, it has already finished and we don't need
634 * to take any more actions on it. */
635 return;
636 }
637 if (this.configSelector === null) {
638 /* This branch will only be taken at the beginning of the channel's life,
639 * before the resolver ever returns a result. So, the
640 * ResolvingLoadBalancer may be idle and if so it needs to be kicked
641 * because it now has a pending request. */
642 this.resolvingLoadBalancer.exitIdle();
643 if (this.currentResolutionError && !metadata.getOptions().waitForReady) {
644 stream.cancelWithStatus(this.currentResolutionError.code, this.currentResolutionError.details);
645 } else {
646 this.configSelectionQueue.push({
647 callStream: stream,
648 callMetadata: metadata,
649 });
650 this.callRefTimerRef();
651 }
652 } else {
653 const callConfig = this.configSelector(stream.getMethod(), metadata);
654 if (callConfig.status === Status.OK) {
655 if (callConfig.methodConfig.timeout) {
656 const deadline = new Date();
657 deadline.setSeconds(
658 deadline.getSeconds() + callConfig.methodConfig.timeout.seconds
659 );
660 deadline.setMilliseconds(
661 deadline.getMilliseconds() +
662 callConfig.methodConfig.timeout.nanos / 1_000_000
663 );
664 stream.setConfigDeadline(deadline);
665 // Refreshing the filters makes the deadline filter pick up the new deadline
666 stream.filterStack.refresh();
667 }
668 if (callConfig.dynamicFilterFactories.length > 0) {
669 /* These dynamicFilters are the mechanism for implementing gRFC A39:
670 * https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md
671 * We run them here instead of with the rest of the filters because
672 * that spec says "the xDS HTTP filters will run in between name
673 * resolution and load balancing".
674 *
675 * We use the filter stack here to simplify the multi-filter async
676 * waterfall logic, but we pass along the underlying list of filters
677 * to avoid having nested filter stacks when combining it with the
678 * original filter stack. We do not pass along the original filter
679 * factory list because these filters may need to persist data
680 * between sending headers and other operations. */
681 const dynamicFilterStackFactory = new FilterStackFactory(callConfig.dynamicFilterFactories);
682 const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream);
683 dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
684 this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters());
685 });
686 } else {
687 this.tryPick(stream, metadata, callConfig, []);
688 }
689 } else {
690 const {code, details} = restrictControlPlaneStatusCode(callConfig.status, 'Failed to route call to method ' + stream.getMethod());
691 stream.cancelWithStatus(code, details);
692 }
693 }
694 }
695
696 _startCallStream(stream: Http2CallStream, metadata: Metadata) {
697 this.tryGetConfig(stream, metadata.clone());
698 }
699
700 close() {
701 this.resolvingLoadBalancer.destroy();
702 this.updateState(ConnectivityState.SHUTDOWN);
703 clearInterval(this.callRefTimer);
704 if (this.channelzEnabled) {
705 unregisterChannelzRef(this.channelzRef);
706 }
707
708 this.subchannelPool.unrefUnusedSubchannels();
709 }
710
711 getTarget() {
712 return uriToString(this.target);
713 }
714
715 getConnectivityState(tryToConnect: boolean) {
716 const connectivityState = this.connectivityState;
717 if (tryToConnect) {
718 this.resolvingLoadBalancer.exitIdle();
719 }
720 return connectivityState;
721 }
722
723 watchConnectivityState(
724 currentState: ConnectivityState,
725 deadline: Date | number,
726 callback: (error?: Error) => void
727 ): void {
728 if (this.connectivityState === ConnectivityState.SHUTDOWN) {
729 throw new Error('Channel has been shut down');
730 }
731 let timer = null;
732 if (deadline !== Infinity) {
733 const deadlineDate: Date =
734 deadline instanceof Date ? deadline : new Date(deadline);
735 const now = new Date();
736 if (deadline === -Infinity || deadlineDate <= now) {
737 process.nextTick(
738 callback,
739 new Error('Deadline passed without connectivity state change')
740 );
741 return;
742 }
743 timer = setTimeout(() => {
744 this.removeConnectivityStateWatcher(watcherObject);
745 callback(
746 new Error('Deadline passed without connectivity state change')
747 );
748 }, deadlineDate.getTime() - now.getTime());
749 }
750 const watcherObject = {
751 currentState,
752 callback,
753 timer,
754 };
755 this.connectivityStateWatchers.push(watcherObject);
756 }
757
758 /**
759 * Get the channelz reference object for this channel. The returned value is
760 * garbage if channelz is disabled for this channel.
761 * @returns
762 */
763 getChannelzRef() {
764 return this.channelzRef;
765 }
766
767 createCall(
768 method: string,
769 deadline: Deadline,
770 host: string | null | undefined,
771 parentCall: ServerSurfaceCall | null,
772 propagateFlags: number | null | undefined
773 ): Call {
774 if (typeof method !== 'string') {
775 throw new TypeError('Channel#createCall: method must be a string');
776 }
777 if (!(typeof deadline === 'number' || deadline instanceof Date)) {
778 throw new TypeError(
779 'Channel#createCall: deadline must be a number or Date'
780 );
781 }
782 if (this.connectivityState === ConnectivityState.SHUTDOWN) {
783 throw new Error('Channel has been shut down');
784 }
785 const callNumber = getNewCallNumber();
786 this.trace(
787 'createCall [' +
788 callNumber +
789 '] method="' +
790 method +
791 '", deadline=' +
792 deadline
793 );
794 const finalOptions: CallStreamOptions = {
795 deadline: deadline,
796 flags: propagateFlags ?? Propagate.DEFAULTS,
797 host: host ?? this.defaultAuthority,
798 parentCall: parentCall,
799 };
800 const stream: Http2CallStream = new Http2CallStream(
801 method,
802 this,
803 finalOptions,
804 this.filterStackFactory,
805 this.credentials._getCallCredentials(),
806 callNumber
807 );
808 if (this.channelzEnabled) {
809 this.callTracker.addCallStarted();
810 stream.addStatusWatcher(status => {
811 if (status.code === Status.OK) {
812 this.callTracker.addCallSucceeded();
813 } else {
814 this.callTracker.addCallFailed();
815 }
816 });
817 }
818 return stream;
819 }
820}