1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import {
|
19 | Deadline,
|
20 | Call,
|
21 | Http2CallStream,
|
22 | CallStreamOptions,
|
23 | StatusObject,
|
24 | } from './call-stream';
|
25 | import { ChannelCredentials } from './channel-credentials';
|
26 | import { ChannelOptions } from './channel-options';
|
27 | import { ResolvingLoadBalancer } from './resolving-load-balancer';
|
28 | import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
|
29 | import { ChannelControlHelper } from './load-balancer';
|
30 | import { UnavailablePicker, Picker, PickResultType } from './picker';
|
31 | import { Metadata } from './metadata';
|
32 | import { Status, LogVerbosity, Propagate } from './constants';
|
33 | import { FilterStackFactory } from './filter-stack';
|
34 | import { CallCredentialsFilterFactory } from './call-credentials-filter';
|
35 | import { DeadlineFilterFactory } from './deadline-filter';
|
36 | import { CompressionFilterFactory } from './compression-filter';
|
37 | import {
|
38 | CallConfig,
|
39 | ConfigSelector,
|
40 | getDefaultAuthority,
|
41 | mapUriDefaultScheme,
|
42 | } from './resolver';
|
43 | import { trace, log } from './logging';
|
44 | import { SubchannelAddress } from './subchannel-address';
|
45 | import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
|
46 | import { mapProxyName } from './http_proxy';
|
47 | import { GrpcUri, parseUri, uriToString } from './uri-parser';
|
48 | import { ServerSurfaceCall } from './server-call';
|
49 | import { Filter } from './filter';
|
50 |
|
51 | import { ConnectivityState } from './connectivity-state';
|
52 | import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz';
|
53 | import { Subchannel } from './subchannel';
|
54 |
|
55 |
|
56 |
|
57 |
|
58 | const MAX_TIMEOUT_TIME = 2147483647;
|
59 |
|
60 | let nextCallNumber = 0;
|
61 |
|
62 | function getNewCallNumber(): number {
|
63 | const callNumber = nextCallNumber;
|
64 | nextCallNumber += 1;
|
65 | if (nextCallNumber >= Number.MAX_SAFE_INTEGER) {
|
66 | nextCallNumber = 0;
|
67 | }
|
68 | return callNumber;
|
69 | }
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 | export interface Channel {
|
76 | |
77 |
|
78 |
|
79 |
|
80 | close(): void;
|
81 | |
82 |
|
83 |
|
84 | getTarget(): string;
|
85 | |
86 |
|
87 |
|
88 |
|
89 |
|
90 |
|
91 |
|
92 |
|
93 | getConnectivityState(tryToConnect: boolean): ConnectivityState;
|
94 | |
95 |
|
96 |
|
97 |
|
98 |
|
99 |
|
100 |
|
101 |
|
102 |
|
103 |
|
104 | watchConnectivityState(
|
105 | currentState: ConnectivityState,
|
106 | deadline: Date | number,
|
107 | callback: (error?: Error) => void
|
108 | ): void;
|
109 | |
110 |
|
111 |
|
112 |
|
113 |
|
114 | getChannelzRef(): ChannelRef;
|
115 | |
116 |
|
117 |
|
118 |
|
119 |
|
120 |
|
121 |
|
122 |
|
123 |
|
124 |
|
125 |
|
126 |
|
127 | createCall(
|
128 | method: string,
|
129 | deadline: Deadline,
|
130 | host: string | null | undefined,
|
131 | parentCall: ServerSurfaceCall | null,
|
132 | propagateFlags: number | null | undefined
|
133 | ): Call;
|
134 | }
|
135 |
|
136 | interface ConnectivityStateWatcher {
|
137 | currentState: ConnectivityState;
|
138 | timer: NodeJS.Timeout | null;
|
139 | callback: (error?: Error) => void;
|
140 | }
|
141 |
|
142 | export class ChannelImplementation implements Channel {
|
143 | private resolvingLoadBalancer: ResolvingLoadBalancer;
|
144 | private subchannelPool: SubchannelPool;
|
145 | private connectivityState: ConnectivityState = ConnectivityState.IDLE;
|
146 | private currentPicker: Picker = new UnavailablePicker();
|
147 | |
148 |
|
149 |
|
150 |
|
151 | private configSelectionQueue: Array<{
|
152 | callStream: Http2CallStream;
|
153 | callMetadata: Metadata;
|
154 | }> = [];
|
155 | private pickQueue: Array<{
|
156 | callStream: Http2CallStream;
|
157 | callMetadata: Metadata;
|
158 | callConfig: CallConfig;
|
159 | dynamicFilters: Filter[];
|
160 | }> = [];
|
161 | private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
|
162 | private defaultAuthority: string;
|
163 | private filterStackFactory: FilterStackFactory;
|
164 | private target: GrpcUri;
|
165 | |
166 |
|
167 |
|
168 |
|
169 |
|
170 |
|
171 |
|
172 | private callRefTimer: NodeJS.Timer;
|
173 | private configSelector: ConfigSelector | null = null;
|
174 | |
175 |
|
176 |
|
177 |
|
178 |
|
179 |
|
180 |
|
181 | private currentResolutionError: StatusObject | null = null;
|
182 |
|
183 |
|
184 | private readonly channelzEnabled: boolean = true;
|
185 | private originalTarget: string;
|
186 | private channelzRef: ChannelRef;
|
187 | private channelzTrace: ChannelzTrace;
|
188 | private callTracker = new ChannelzCallTracker();
|
189 | private childrenTracker = new ChannelzChildrenTracker();
|
190 |
|
191 | constructor(
|
192 | target: string,
|
193 | private readonly credentials: ChannelCredentials,
|
194 | private readonly options: ChannelOptions
|
195 | ) {
|
196 | if (typeof target !== 'string') {
|
197 | throw new TypeError('Channel target must be a string');
|
198 | }
|
199 | if (!(credentials instanceof ChannelCredentials)) {
|
200 | throw new TypeError(
|
201 | 'Channel credentials must be a ChannelCredentials object'
|
202 | );
|
203 | }
|
204 | if (options) {
|
205 | if (typeof options !== 'object') {
|
206 | throw new TypeError('Channel options must be an object');
|
207 | }
|
208 | }
|
209 | this.originalTarget = target;
|
210 | const originalTargetUri = parseUri(target);
|
211 | if (originalTargetUri === null) {
|
212 | throw new Error(`Could not parse target name "${target}"`);
|
213 | }
|
214 | |
215 |
|
216 | const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri);
|
217 | if (defaultSchemeMapResult === null) {
|
218 | throw new Error(
|
219 | `Could not find a default scheme for target name "${target}"`
|
220 | );
|
221 | }
|
222 |
|
223 | this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
|
224 | this.callRefTimer.unref?.();
|
225 |
|
226 | if (this.options['grpc.enable_channelz'] === 0) {
|
227 | this.channelzEnabled = false;
|
228 | }
|
229 |
|
230 | this.channelzTrace = new ChannelzTrace();
|
231 | this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled);
|
232 | if (this.channelzEnabled) {
|
233 | this.channelzTrace.addTrace('CT_INFO', 'Channel created');
|
234 | }
|
235 |
|
236 | if (this.options['grpc.default_authority']) {
|
237 | this.defaultAuthority = this.options['grpc.default_authority'] as string;
|
238 | } else {
|
239 | this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult);
|
240 | }
|
241 | const proxyMapResult = mapProxyName(defaultSchemeMapResult, options);
|
242 | this.target = proxyMapResult.target;
|
243 | this.options = Object.assign({}, this.options, proxyMapResult.extraOptions);
|
244 |
|
245 | |
246 |
|
247 | this.subchannelPool = getSubchannelPool(
|
248 | (options['grpc.use_local_subchannel_pool'] ?? 0) === 0
|
249 | );
|
250 | const channelControlHelper: ChannelControlHelper = {
|
251 | createSubchannel: (
|
252 | subchannelAddress: SubchannelAddress,
|
253 | subchannelArgs: ChannelOptions
|
254 | ) => {
|
255 | const subchannel = this.subchannelPool.getOrCreateSubchannel(
|
256 | this.target,
|
257 | subchannelAddress,
|
258 | Object.assign({}, this.options, subchannelArgs),
|
259 | this.credentials
|
260 | );
|
261 | if (this.channelzEnabled) {
|
262 | this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
|
263 | }
|
264 | return subchannel;
|
265 | },
|
266 | updateState: (connectivityState: ConnectivityState, picker: Picker) => {
|
267 | this.currentPicker = picker;
|
268 | const queueCopy = this.pickQueue.slice();
|
269 | this.pickQueue = [];
|
270 | this.callRefTimerUnref();
|
271 | for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) {
|
272 | this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
273 | }
|
274 | this.updateState(connectivityState);
|
275 | },
|
276 | requestReresolution: () => {
|
277 |
|
278 | throw new Error(
|
279 | 'Resolving load balancer should never call requestReresolution'
|
280 | );
|
281 | },
|
282 | addChannelzChild: (child: ChannelRef | SubchannelRef) => {
|
283 | if (this.channelzEnabled) {
|
284 | this.childrenTracker.refChild(child);
|
285 | }
|
286 | },
|
287 | removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
|
288 | if (this.channelzEnabled) {
|
289 | this.childrenTracker.unrefChild(child);
|
290 | }
|
291 | }
|
292 | };
|
293 | this.resolvingLoadBalancer = new ResolvingLoadBalancer(
|
294 | this.target,
|
295 | channelControlHelper,
|
296 | options,
|
297 | (configSelector) => {
|
298 | if (this.channelzEnabled) {
|
299 | this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
|
300 | }
|
301 | this.configSelector = configSelector;
|
302 | this.currentResolutionError = null;
|
303 | |
304 |
|
305 | process.nextTick(() => {
|
306 | const localQueue = this.configSelectionQueue;
|
307 | this.configSelectionQueue = [];
|
308 | this.callRefTimerUnref();
|
309 | for (const { callStream, callMetadata } of localQueue) {
|
310 | this.tryGetConfig(callStream, callMetadata);
|
311 | }
|
312 | this.configSelectionQueue = [];
|
313 | });
|
314 | },
|
315 | (status) => {
|
316 | if (this.channelzEnabled) {
|
317 | this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
|
318 | }
|
319 | if (this.configSelectionQueue.length > 0) {
|
320 | this.trace('Name resolution failed with calls queued for config selection');
|
321 | }
|
322 | if (this.configSelector === null) {
|
323 | this.currentResolutionError = status;
|
324 | }
|
325 | const localQueue = this.configSelectionQueue;
|
326 | this.configSelectionQueue = [];
|
327 | this.callRefTimerUnref();
|
328 | for (const { callStream, callMetadata } of localQueue) {
|
329 | if (callMetadata.getOptions().waitForReady) {
|
330 | this.callRefTimerRef();
|
331 | this.configSelectionQueue.push({ callStream, callMetadata });
|
332 | } else {
|
333 | callStream.cancelWithStatus(status.code, status.details);
|
334 | }
|
335 | }
|
336 | }
|
337 | );
|
338 | this.filterStackFactory = new FilterStackFactory([
|
339 | new CallCredentialsFilterFactory(this),
|
340 | new DeadlineFilterFactory(this),
|
341 | new MaxMessageSizeFilterFactory(this.options),
|
342 | new CompressionFilterFactory(this, this.options),
|
343 | ]);
|
344 | this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
|
345 | const error = new Error();
|
346 | trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1));
|
347 | }
|
348 |
|
349 | private getChannelzInfo(): ChannelInfo {
|
350 | return {
|
351 | target: this.originalTarget,
|
352 | state: this.connectivityState,
|
353 | trace: this.channelzTrace,
|
354 | callTracker: this.callTracker,
|
355 | children: this.childrenTracker.getChildLists()
|
356 | };
|
357 | }
|
358 |
|
359 | private trace(text: string, verbosityOverride?: LogVerbosity) {
|
360 | trace(verbosityOverride ?? LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text);
|
361 | }
|
362 |
|
363 | private callRefTimerRef() {
|
364 |
|
365 | if (!this.callRefTimer.hasRef?.()) {
|
366 | this.trace(
|
367 | 'callRefTimer.ref | configSelectionQueue.length=' +
|
368 | this.configSelectionQueue.length +
|
369 | ' pickQueue.length=' +
|
370 | this.pickQueue.length
|
371 | );
|
372 | this.callRefTimer.ref?.();
|
373 | }
|
374 | }
|
375 |
|
376 | private callRefTimerUnref() {
|
377 |
|
378 | if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
|
379 | this.trace(
|
380 | 'callRefTimer.unref | configSelectionQueue.length=' +
|
381 | this.configSelectionQueue.length +
|
382 | ' pickQueue.length=' +
|
383 | this.pickQueue.length
|
384 | );
|
385 | this.callRefTimer.unref?.();
|
386 | }
|
387 | }
|
388 |
|
389 | private pushPick(
|
390 | callStream: Http2CallStream,
|
391 | callMetadata: Metadata,
|
392 | callConfig: CallConfig,
|
393 | dynamicFilters: Filter[]
|
394 | ) {
|
395 | this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters });
|
396 | this.callRefTimerRef();
|
397 | }
|
398 |
|
399 | |
400 |
|
401 |
|
402 |
|
403 |
|
404 |
|
405 |
|
406 | private tryPick(
|
407 | callStream: Http2CallStream,
|
408 | callMetadata: Metadata,
|
409 | callConfig: CallConfig,
|
410 | dynamicFilters: Filter[]
|
411 | ) {
|
412 | const pickResult = this.currentPicker.pick({
|
413 | metadata: callMetadata,
|
414 | extraPickInfo: callConfig.pickInformation,
|
415 | });
|
416 | const subchannelString = pickResult.subchannel ?
|
417 | '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
|
418 | '' + pickResult.subchannel;
|
419 | this.trace(
|
420 | 'Pick result for call [' +
|
421 | callStream.getCallNumber() +
|
422 | ']: ' +
|
423 | PickResultType[pickResult.pickResultType] +
|
424 | ' subchannel: ' +
|
425 | subchannelString +
|
426 | ' status: ' +
|
427 | pickResult.status?.code +
|
428 | ' ' +
|
429 | pickResult.status?.details
|
430 | );
|
431 | switch (pickResult.pickResultType) {
|
432 | case PickResultType.COMPLETE:
|
433 | if (pickResult.subchannel === null) {
|
434 | callStream.cancelWithStatus(
|
435 | Status.UNAVAILABLE,
|
436 | 'Request dropped by load balancing policy'
|
437 | );
|
438 |
|
439 | } else {
|
440 | |
441 |
|
442 |
|
443 | if (
|
444 | pickResult.subchannel!.getConnectivityState() !==
|
445 | ConnectivityState.READY
|
446 | ) {
|
447 | log(
|
448 | LogVerbosity.ERROR,
|
449 | 'Error: COMPLETE pick result subchannel ' +
|
450 | subchannelString +
|
451 | ' has state ' +
|
452 | ConnectivityState[pickResult.subchannel!.getConnectivityState()]
|
453 | );
|
454 | this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
455 | break;
|
456 | }
|
457 | |
458 |
|
459 |
|
460 | callStream.filterStack
|
461 | .sendMetadata(Promise.resolve(callMetadata.clone()))
|
462 | .then(
|
463 | (finalMetadata) => {
|
464 | const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState();
|
465 | if (subchannelState === ConnectivityState.READY) {
|
466 | try {
|
467 | const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
|
468 | pickResult.subchannel?.getRealSubchannel().startCallStream(
|
469 | finalMetadata,
|
470 | callStream,
|
471 | [...dynamicFilters, ...pickExtraFilters]
|
472 | );
|
473 | |
474 |
|
475 | callConfig.onCommitted?.();
|
476 | pickResult.onCallStarted?.();
|
477 | } catch (error) {
|
478 | const errorCode = (error as NodeJS.ErrnoException).code;
|
479 | if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' ||
|
480 | errorCode === 'ERR_HTTP2_INVALID_SESSION'
|
481 | ) {
|
482 | |
483 |
|
484 |
|
485 |
|
486 |
|
487 |
|
488 |
|
489 |
|
490 |
|
491 |
|
492 |
|
493 |
|
494 |
|
495 | this.trace(
|
496 | 'Failed to start call on picked subchannel ' +
|
497 | subchannelString +
|
498 | ' with error ' +
|
499 | (error as Error).message +
|
500 | '. Retrying pick',
|
501 | LogVerbosity.INFO
|
502 | );
|
503 | this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
504 | } else {
|
505 | this.trace(
|
506 | 'Failed to start call on picked subchanel ' +
|
507 | subchannelString +
|
508 | ' with error ' +
|
509 | (error as Error).message +
|
510 | '. Ending call',
|
511 | LogVerbosity.INFO
|
512 | );
|
513 | callStream.cancelWithStatus(
|
514 | Status.INTERNAL,
|
515 | `Failed to start HTTP/2 stream with error: ${
|
516 | (error as Error).message
|
517 | }`
|
518 | );
|
519 | }
|
520 | }
|
521 | } else {
|
522 | |
523 |
|
524 | this.trace(
|
525 | 'Picked subchannel ' +
|
526 | subchannelString +
|
527 | ' has state ' +
|
528 | ConnectivityState[subchannelState] +
|
529 | ' after metadata filters. Retrying pick',
|
530 | LogVerbosity.INFO
|
531 | );
|
532 | this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
|
533 | }
|
534 | },
|
535 | (error: Error & { code: number }) => {
|
536 |
|
537 | callStream.cancelWithStatus(
|
538 | typeof error.code === 'number' ? error.code : Status.UNKNOWN,
|
539 | `Getting metadata from plugin failed with error: ${error.message}`
|
540 | );
|
541 | }
|
542 | );
|
543 | }
|
544 | break;
|
545 | case PickResultType.QUEUE:
|
546 | this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
547 | break;
|
548 | case PickResultType.TRANSIENT_FAILURE:
|
549 | if (callMetadata.getOptions().waitForReady) {
|
550 | this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
|
551 | } else {
|
552 | callStream.cancelWithStatus(
|
553 | pickResult.status!.code,
|
554 | pickResult.status!.details
|
555 | );
|
556 | }
|
557 | break;
|
558 | case PickResultType.DROP:
|
559 | callStream.cancelWithStatus(
|
560 | pickResult.status!.code,
|
561 | pickResult.status!.details
|
562 | );
|
563 | break;
|
564 | default:
|
565 | throw new Error(
|
566 | `Invalid state: unknown pickResultType ${pickResult.pickResultType}`
|
567 | );
|
568 | }
|
569 | }
|
570 |
|
571 | private removeConnectivityStateWatcher(
|
572 | watcherObject: ConnectivityStateWatcher
|
573 | ) {
|
574 | const watcherIndex = this.connectivityStateWatchers.findIndex(
|
575 | (value) => value === watcherObject
|
576 | );
|
577 | if (watcherIndex >= 0) {
|
578 | this.connectivityStateWatchers.splice(watcherIndex, 1);
|
579 | }
|
580 | }
|
581 |
|
582 | private updateState(newState: ConnectivityState): void {
|
583 | trace(
|
584 | LogVerbosity.DEBUG,
|
585 | 'connectivity_state',
|
586 | '(' + this.channelzRef.id + ') ' +
|
587 | uriToString(this.target) +
|
588 | ' ' +
|
589 | ConnectivityState[this.connectivityState] +
|
590 | ' -> ' +
|
591 | ConnectivityState[newState]
|
592 | );
|
593 | if (this.channelzEnabled) {
|
594 | this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
|
595 | }
|
596 | this.connectivityState = newState;
|
597 | const watchersCopy = this.connectivityStateWatchers.slice();
|
598 | for (const watcherObject of watchersCopy) {
|
599 | if (newState !== watcherObject.currentState) {
|
600 | if (watcherObject.timer) {
|
601 | clearTimeout(watcherObject.timer);
|
602 | }
|
603 | this.removeConnectivityStateWatcher(watcherObject);
|
604 | watcherObject.callback();
|
605 | }
|
606 | }
|
607 | if (newState !== ConnectivityState.TRANSIENT_FAILURE) {
|
608 | this.currentResolutionError = null;
|
609 | }
|
610 | }
|
611 |
|
612 | private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
|
613 | if (stream.getStatus() !== null) {
|
614 | |
615 |
|
616 | return;
|
617 | }
|
618 | if (this.configSelector === null) {
|
619 | |
620 |
|
621 |
|
622 |
|
623 | this.resolvingLoadBalancer.exitIdle();
|
624 | if (this.currentResolutionError && !metadata.getOptions().waitForReady) {
|
625 | stream.cancelWithStatus(this.currentResolutionError.code, this.currentResolutionError.details);
|
626 | } else {
|
627 | this.configSelectionQueue.push({
|
628 | callStream: stream,
|
629 | callMetadata: metadata,
|
630 | });
|
631 | this.callRefTimerRef();
|
632 | }
|
633 | } else {
|
634 | const callConfig = this.configSelector(stream.getMethod(), metadata);
|
635 | if (callConfig.status === Status.OK) {
|
636 | if (callConfig.methodConfig.timeout) {
|
637 | const deadline = new Date();
|
638 | deadline.setSeconds(
|
639 | deadline.getSeconds() + callConfig.methodConfig.timeout.seconds
|
640 | );
|
641 | deadline.setMilliseconds(
|
642 | deadline.getMilliseconds() +
|
643 | callConfig.methodConfig.timeout.nanos / 1_000_000
|
644 | );
|
645 | stream.setConfigDeadline(deadline);
|
646 |
|
647 | stream.filterStack.refresh();
|
648 | }
|
649 | if (callConfig.dynamicFilterFactories.length > 0) {
|
650 | |
651 |
|
652 |
|
653 |
|
654 |
|
655 |
|
656 |
|
657 |
|
658 |
|
659 |
|
660 |
|
661 |
|
662 | const dynamicFilterStackFactory = new FilterStackFactory(callConfig.dynamicFilterFactories);
|
663 | const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream);
|
664 | dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
|
665 | this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters());
|
666 | });
|
667 | } else {
|
668 | this.tryPick(stream, metadata, callConfig, []);
|
669 | }
|
670 | } else {
|
671 | stream.cancelWithStatus(
|
672 | callConfig.status,
|
673 | 'Failed to route call to method ' + stream.getMethod()
|
674 | );
|
675 | }
|
676 | }
|
677 | }
|
678 |
|
679 | _startCallStream(stream: Http2CallStream, metadata: Metadata) {
|
680 | this.tryGetConfig(stream, metadata.clone());
|
681 | }
|
682 |
|
683 | close() {
|
684 | this.resolvingLoadBalancer.destroy();
|
685 | this.updateState(ConnectivityState.SHUTDOWN);
|
686 | clearInterval(this.callRefTimer);
|
687 | if (this.channelzEnabled) {
|
688 | unregisterChannelzRef(this.channelzRef);
|
689 | }
|
690 |
|
691 | this.subchannelPool.unrefUnusedSubchannels();
|
692 | }
|
693 |
|
694 | getTarget() {
|
695 | return uriToString(this.target);
|
696 | }
|
697 |
|
698 | getConnectivityState(tryToConnect: boolean) {
|
699 | const connectivityState = this.connectivityState;
|
700 | if (tryToConnect) {
|
701 | this.resolvingLoadBalancer.exitIdle();
|
702 | }
|
703 | return connectivityState;
|
704 | }
|
705 |
|
706 | watchConnectivityState(
|
707 | currentState: ConnectivityState,
|
708 | deadline: Date | number,
|
709 | callback: (error?: Error) => void
|
710 | ): void {
|
711 | if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
712 | throw new Error('Channel has been shut down');
|
713 | }
|
714 | let timer = null;
|
715 | if (deadline !== Infinity) {
|
716 | const deadlineDate: Date =
|
717 | deadline instanceof Date ? deadline : new Date(deadline);
|
718 | const now = new Date();
|
719 | if (deadline === -Infinity || deadlineDate <= now) {
|
720 | process.nextTick(
|
721 | callback,
|
722 | new Error('Deadline passed without connectivity state change')
|
723 | );
|
724 | return;
|
725 | }
|
726 | timer = setTimeout(() => {
|
727 | this.removeConnectivityStateWatcher(watcherObject);
|
728 | callback(
|
729 | new Error('Deadline passed without connectivity state change')
|
730 | );
|
731 | }, deadlineDate.getTime() - now.getTime());
|
732 | }
|
733 | const watcherObject = {
|
734 | currentState,
|
735 | callback,
|
736 | timer,
|
737 | };
|
738 | this.connectivityStateWatchers.push(watcherObject);
|
739 | }
|
740 |
|
741 | |
742 |
|
743 |
|
744 |
|
745 |
|
746 | getChannelzRef() {
|
747 | return this.channelzRef;
|
748 | }
|
749 |
|
750 | createCall(
|
751 | method: string,
|
752 | deadline: Deadline,
|
753 | host: string | null | undefined,
|
754 | parentCall: ServerSurfaceCall | null,
|
755 | propagateFlags: number | null | undefined
|
756 | ): Call {
|
757 | if (typeof method !== 'string') {
|
758 | throw new TypeError('Channel#createCall: method must be a string');
|
759 | }
|
760 | if (!(typeof deadline === 'number' || deadline instanceof Date)) {
|
761 | throw new TypeError(
|
762 | 'Channel#createCall: deadline must be a number or Date'
|
763 | );
|
764 | }
|
765 | if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
766 | throw new Error('Channel has been shut down');
|
767 | }
|
768 | const callNumber = getNewCallNumber();
|
769 | this.trace(
|
770 | 'createCall [' +
|
771 | callNumber +
|
772 | '] method="' +
|
773 | method +
|
774 | '", deadline=' +
|
775 | deadline
|
776 | );
|
777 | const finalOptions: CallStreamOptions = {
|
778 | deadline: deadline,
|
779 | flags: propagateFlags ?? Propagate.DEFAULTS,
|
780 | host: host ?? this.defaultAuthority,
|
781 | parentCall: parentCall,
|
782 | };
|
783 | const stream: Http2CallStream = new Http2CallStream(
|
784 | method,
|
785 | this,
|
786 | finalOptions,
|
787 | this.filterStackFactory,
|
788 | this.credentials._getCallCredentials(),
|
789 | callNumber
|
790 | );
|
791 | if (this.channelzEnabled) {
|
792 | this.callTracker.addCallStarted();
|
793 | stream.addStatusWatcher(status => {
|
794 | if (status.code === Status.OK) {
|
795 | this.callTracker.addCallSucceeded();
|
796 | } else {
|
797 | this.callTracker.addCallFailed();
|
798 | }
|
799 | });
|
800 | }
|
801 | return stream;
|
802 | }
|
803 | }
|