UNPKG

21.8 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { ChannelCredentials } from './channel-credentials';
19import { ChannelOptions } from './channel-options';
20import { ResolvingLoadBalancer } from './resolving-load-balancer';
21import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
22import { ChannelControlHelper } from './load-balancer';
23import { UnavailablePicker, Picker, PickResultType } from './picker';
24import { Metadata } from './metadata';
25import { Status, LogVerbosity, Propagate } from './constants';
26import { FilterStackFactory } from './filter-stack';
27import { CompressionFilterFactory } from './compression-filter';
28import {
29 CallConfig,
30 ConfigSelector,
31 getDefaultAuthority,
32 mapUriDefaultScheme,
33} from './resolver';
34import { trace, log } from './logging';
35import { SubchannelAddress } from './subchannel-address';
36import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
37import { mapProxyName } from './http_proxy';
38import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser';
39import { ServerSurfaceCall } from './server-call';
40import { Filter } from './filter';
41
42import { ConnectivityState } from './connectivity-state';
43import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz';
44import { Subchannel } from './subchannel';
45import { LoadBalancingCall } from './load-balancing-call';
46import { CallCredentials } from './call-credentials';
47import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from './call-interface';
48import { SubchannelCall } from './subchannel-call';
49import { Deadline, deadlineToString, getDeadlineTimeoutString } from './deadline';
50import { ResolvingCall } from './resolving-call';
51import { getNextCallNumber } from './call-number';
52import { restrictControlPlaneStatusCode } from './control-plane-status';
53import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call';
54import { BaseSubchannelWrapper, ConnectivityStateListener, SubchannelInterface } from './subchannel-interface';
55
56/**
57 * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
58 */
59const MAX_TIMEOUT_TIME = 2147483647;
60
61interface ConnectivityStateWatcher {
62 currentState: ConnectivityState;
63 timer: NodeJS.Timeout | null;
64 callback: (error?: Error) => void;
65}
66
67interface NoneConfigResult {
68 type: 'NONE';
69}
70
71interface SuccessConfigResult {
72 type: 'SUCCESS';
73 config: CallConfig;
74}
75
76interface ErrorConfigResult {
77 type: 'ERROR';
78 error: StatusObject;
79}
80
81type GetConfigResult = NoneConfigResult | SuccessConfigResult | ErrorConfigResult;
82
83const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();
84
85const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB
86const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB
87
88class ChannelSubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface {
89 private refCount = 0;
90 private subchannelStateListener: ConnectivityStateListener;
91 constructor(childSubchannel: SubchannelInterface, private channel: InternalChannel) {
92 super(childSubchannel);
93 this.subchannelStateListener = (subchannel, previousState, newState, keepaliveTime) => {
94 channel.throttleKeepalive(keepaliveTime);
95 };
96 childSubchannel.addConnectivityStateListener(this.subchannelStateListener);
97 }
98
99 ref(): void {
100 this.child.ref();
101 this.refCount += 1;
102 }
103
104 unref(): void {
105 this.child.unref();
106 this.refCount -= 1;
107 if (this.refCount <= 0) {
108 this.child.removeConnectivityStateListener(this.subchannelStateListener);
109 this.channel.removeWrappedSubchannel(this);
110 }
111 }
112}
113
114export class InternalChannel {
115
116 private resolvingLoadBalancer: ResolvingLoadBalancer;
117 private subchannelPool: SubchannelPool;
118 private connectivityState: ConnectivityState = ConnectivityState.IDLE;
119 private currentPicker: Picker = new UnavailablePicker();
120 /**
121 * Calls queued up to get a call config. Should only be populated before the
122 * first time the resolver returns a result, which includes the ConfigSelector.
123 */
124 private configSelectionQueue: ResolvingCall[] = [];
125 private pickQueue: LoadBalancingCall[] = [];
126 private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
127 private defaultAuthority: string;
128 private filterStackFactory: FilterStackFactory;
129 private target: GrpcUri;
130 /**
131 * This timer does not do anything on its own. Its purpose is to hold the
132 * event loop open while there are any pending calls for the channel that
133 * have not yet been assigned to specific subchannels. In other words,
134 * the invariant is that callRefTimer is reffed if and only if pickQueue
135 * is non-empty.
136 */
137 private callRefTimer: NodeJS.Timer;
138 private configSelector: ConfigSelector | null = null;
139 /**
140 * This is the error from the name resolver if it failed most recently. It
141 * is only used to end calls that start while there is no config selector
142 * and the name resolver is in backoff, so it should be nulled if
143 * configSelector becomes set or the channel state becomes anything other
144 * than TRANSIENT_FAILURE.
145 */
146 private currentResolutionError: StatusObject | null = null;
147 private retryBufferTracker: MessageBufferTracker;
148 private keepaliveTime: number;
149 private wrappedSubchannels: Set<ChannelSubchannelWrapper> = new Set();
150
151 // Channelz info
152 private readonly channelzEnabled: boolean = true;
153 private originalTarget: string;
154 private channelzRef: ChannelRef;
155 private channelzTrace: ChannelzTrace;
156 private callTracker = new ChannelzCallTracker();
157 private childrenTracker = new ChannelzChildrenTracker();
158
159 constructor(
160 target: string,
161 private readonly credentials: ChannelCredentials,
162 private readonly options: ChannelOptions
163 ) {
164 if (typeof target !== 'string') {
165 throw new TypeError('Channel target must be a string');
166 }
167 if (!(credentials instanceof ChannelCredentials)) {
168 throw new TypeError(
169 'Channel credentials must be a ChannelCredentials object'
170 );
171 }
172 if (options) {
173 if (typeof options !== 'object') {
174 throw new TypeError('Channel options must be an object');
175 }
176 }
177 this.originalTarget = target;
178 const originalTargetUri = parseUri(target);
179 if (originalTargetUri === null) {
180 throw new Error(`Could not parse target name "${target}"`);
181 }
182 /* This ensures that the target has a scheme that is registered with the
183 * resolver */
184 const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri);
185 if (defaultSchemeMapResult === null) {
186 throw new Error(
187 `Could not find a default scheme for target name "${target}"`
188 );
189 }
190
191 this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
192 this.callRefTimer.unref?.();
193
194 if (this.options['grpc.enable_channelz'] === 0) {
195 this.channelzEnabled = false;
196 }
197
198 this.channelzTrace = new ChannelzTrace();
199 this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled);
200 if (this.channelzEnabled) {
201 this.channelzTrace.addTrace('CT_INFO', 'Channel created');
202 }
203
204 if (this.options['grpc.default_authority']) {
205 this.defaultAuthority = this.options['grpc.default_authority'] as string;
206 } else {
207 this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult);
208 }
209 const proxyMapResult = mapProxyName(defaultSchemeMapResult, options);
210 this.target = proxyMapResult.target;
211 this.options = Object.assign({}, this.options, proxyMapResult.extraOptions);
212
213 /* The global boolean parameter to getSubchannelPool has the inverse meaning to what
214 * the grpc.use_local_subchannel_pool channel option means. */
215 this.subchannelPool = getSubchannelPool(
216 (options['grpc.use_local_subchannel_pool'] ?? 0) === 0
217 );
218 this.retryBufferTracker = new MessageBufferTracker(
219 options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
220 options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
221 );
222 this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
223 const channelControlHelper: ChannelControlHelper = {
224 createSubchannel: (
225 subchannelAddress: SubchannelAddress,
226 subchannelArgs: ChannelOptions
227 ) => {
228 const subchannel = this.subchannelPool.getOrCreateSubchannel(
229 this.target,
230 subchannelAddress,
231 Object.assign({}, this.options, subchannelArgs),
232 this.credentials
233 );
234 subchannel.throttleKeepalive(this.keepaliveTime);
235 if (this.channelzEnabled) {
236 this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
237 }
238 const wrappedSubchannel = new ChannelSubchannelWrapper(subchannel, this);
239 this.wrappedSubchannels.add(wrappedSubchannel);
240 return wrappedSubchannel;
241 },
242 updateState: (connectivityState: ConnectivityState, picker: Picker) => {
243 this.currentPicker = picker;
244 const queueCopy = this.pickQueue.slice();
245 this.pickQueue = [];
246 this.callRefTimerUnref();
247 for (const call of queueCopy) {
248 call.doPick();
249 }
250 this.updateState(connectivityState);
251 },
252 requestReresolution: () => {
253 // This should never be called.
254 throw new Error(
255 'Resolving load balancer should never call requestReresolution'
256 );
257 },
258 addChannelzChild: (child: ChannelRef | SubchannelRef) => {
259 if (this.channelzEnabled) {
260 this.childrenTracker.refChild(child);
261 }
262 },
263 removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
264 if (this.channelzEnabled) {
265 this.childrenTracker.unrefChild(child);
266 }
267 }
268 };
269 this.resolvingLoadBalancer = new ResolvingLoadBalancer(
270 this.target,
271 channelControlHelper,
272 options,
273 (serviceConfig, configSelector) => {
274 if (serviceConfig.retryThrottling) {
275 RETRY_THROTTLER_MAP.set(this.getTarget(), new RetryThrottler(serviceConfig.retryThrottling.maxTokens, serviceConfig.retryThrottling.tokenRatio, RETRY_THROTTLER_MAP.get(this.getTarget())));
276 } else {
277 RETRY_THROTTLER_MAP.delete(this.getTarget());
278 }
279 if (this.channelzEnabled) {
280 this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
281 }
282 this.configSelector = configSelector;
283 this.currentResolutionError = null;
284 /* We process the queue asynchronously to ensure that the corresponding
285 * load balancer update has completed. */
286 process.nextTick(() => {
287 const localQueue = this.configSelectionQueue;
288 this.configSelectionQueue = [];
289 this.callRefTimerUnref();
290 for (const call of localQueue) {
291 call.getConfig();
292 }
293 this.configSelectionQueue = [];
294 });
295
296 },
297 (status) => {
298 if (this.channelzEnabled) {
299 this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
300 }
301 if (this.configSelectionQueue.length > 0) {
302 this.trace('Name resolution failed with calls queued for config selection');
303 }
304 if (this.configSelector === null) {
305 this.currentResolutionError = {...restrictControlPlaneStatusCode(status.code, status.details), metadata: status.metadata};
306 }
307 const localQueue = this.configSelectionQueue;
308 this.configSelectionQueue = [];
309 this.callRefTimerUnref();
310 for (const call of localQueue) {
311 call.reportResolverError(status);
312 }
313 }
314 );
315 this.filterStackFactory = new FilterStackFactory([
316 new MaxMessageSizeFilterFactory(this.options),
317 new CompressionFilterFactory(this, this.options),
318 ]);
319 this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
320 const error = new Error();
321 trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1));
322 }
323
324 private getChannelzInfo(): ChannelInfo {
325 return {
326 target: this.originalTarget,
327 state: this.connectivityState,
328 trace: this.channelzTrace,
329 callTracker: this.callTracker,
330 children: this.childrenTracker.getChildLists()
331 };
332 }
333
334 private trace(text: string, verbosityOverride?: LogVerbosity) {
335 trace(verbosityOverride ?? LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text);
336 }
337
338 private callRefTimerRef() {
339 // If the hasRef function does not exist, always run the code
340 if (!this.callRefTimer.hasRef?.()) {
341 this.trace(
342 'callRefTimer.ref | configSelectionQueue.length=' +
343 this.configSelectionQueue.length +
344 ' pickQueue.length=' +
345 this.pickQueue.length
346 );
347 this.callRefTimer.ref?.();
348 }
349 }
350
351 private callRefTimerUnref() {
352 // If the hasRef function does not exist, always run the code
353 if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
354 this.trace(
355 'callRefTimer.unref | configSelectionQueue.length=' +
356 this.configSelectionQueue.length +
357 ' pickQueue.length=' +
358 this.pickQueue.length
359 );
360 this.callRefTimer.unref?.();
361 }
362 }
363
364 private removeConnectivityStateWatcher(
365 watcherObject: ConnectivityStateWatcher
366 ) {
367 const watcherIndex = this.connectivityStateWatchers.findIndex(
368 (value) => value === watcherObject
369 );
370 if (watcherIndex >= 0) {
371 this.connectivityStateWatchers.splice(watcherIndex, 1);
372 }
373 }
374
375 private updateState(newState: ConnectivityState): void {
376 trace(
377 LogVerbosity.DEBUG,
378 'connectivity_state',
379 '(' + this.channelzRef.id + ') ' +
380 uriToString(this.target) +
381 ' ' +
382 ConnectivityState[this.connectivityState] +
383 ' -> ' +
384 ConnectivityState[newState]
385 );
386 if (this.channelzEnabled) {
387 this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
388 }
389 this.connectivityState = newState;
390 const watchersCopy = this.connectivityStateWatchers.slice();
391 for (const watcherObject of watchersCopy) {
392 if (newState !== watcherObject.currentState) {
393 if (watcherObject.timer) {
394 clearTimeout(watcherObject.timer);
395 }
396 this.removeConnectivityStateWatcher(watcherObject);
397 watcherObject.callback();
398 }
399 }
400 if (newState !== ConnectivityState.TRANSIENT_FAILURE) {
401 this.currentResolutionError = null;
402 }
403 }
404
405 throttleKeepalive(newKeepaliveTime: number) {
406 if (newKeepaliveTime > this.keepaliveTime) {
407 this.keepaliveTime = newKeepaliveTime;
408 for (const wrappedSubchannel of this.wrappedSubchannels) {
409 wrappedSubchannel.throttleKeepalive(newKeepaliveTime);
410 }
411 }
412 }
413
414 removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
415 this.wrappedSubchannels.delete(wrappedSubchannel);
416 }
417
418 doPick(metadata: Metadata, extraPickInfo: {[key: string]: string}) {
419 return this.currentPicker.pick({metadata: metadata, extraPickInfo: extraPickInfo});
420 }
421
422 queueCallForPick(call: LoadBalancingCall) {
423 this.pickQueue.push(call);
424 this.callRefTimerRef();
425 }
426
427 getConfig(method: string, metadata: Metadata): GetConfigResult {
428 this.resolvingLoadBalancer.exitIdle();
429 if (this.configSelector) {
430 return {
431 type: 'SUCCESS',
432 config: this.configSelector(method, metadata)
433 };
434 } else {
435 if (this.currentResolutionError) {
436 return {
437 type: 'ERROR',
438 error: this.currentResolutionError
439 }
440 } else {
441 return {
442 type: 'NONE'
443 }
444 }
445 }
446 }
447
448 queueCallForConfig(call: ResolvingCall) {
449 this.configSelectionQueue.push(call);
450 this.callRefTimerRef();
451 }
452
453 createLoadBalancingCall(
454 callConfig: CallConfig,
455 method: string,
456 host: string,
457 credentials: CallCredentials,
458 deadline: Deadline
459 ): LoadBalancingCall {
460 const callNumber = getNextCallNumber();
461 this.trace(
462 'createLoadBalancingCall [' +
463 callNumber +
464 '] method="' +
465 method +
466 '"'
467 );
468 return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, callNumber);
469 }
470
471 createRetryingCall(
472 callConfig: CallConfig,
473 method: string,
474 host: string,
475 credentials: CallCredentials,
476 deadline: Deadline
477 ): RetryingCall {
478 const callNumber = getNextCallNumber();
479 this.trace(
480 'createRetryingCall [' +
481 callNumber +
482 '] method="' +
483 method +
484 '"'
485 );
486 return new RetryingCall(this, callConfig, method, host, credentials, deadline, callNumber, this.retryBufferTracker, RETRY_THROTTLER_MAP.get(this.getTarget()))
487 }
488
489 createInnerCall(
490 callConfig: CallConfig,
491 method: string,
492 host: string,
493 credentials: CallCredentials,
494 deadline: Deadline
495 ): Call {
496 // Create a RetryingCall if retries are enabled
497 if (this.options['grpc.enable_retries'] === 0) {
498 return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline);
499 } else {
500 return this.createRetryingCall(callConfig, method, host, credentials, deadline);
501 }
502 }
503
504 createResolvingCall(
505 method: string,
506 deadline: Deadline,
507 host: string | null | undefined,
508 parentCall: ServerSurfaceCall | null,
509 propagateFlags: number | null | undefined
510 ): ResolvingCall {
511 const callNumber = getNextCallNumber();
512 this.trace(
513 'createResolvingCall [' +
514 callNumber +
515 '] method="' +
516 method +
517 '", deadline=' +
518 deadlineToString(deadline)
519 );
520 const finalOptions: CallStreamOptions = {
521 deadline: deadline,
522 flags: propagateFlags ?? Propagate.DEFAULTS,
523 host: host ?? this.defaultAuthority,
524 parentCall: parentCall,
525 };
526
527 const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory.clone(), this.credentials._getCallCredentials(), callNumber);
528
529 if (this.channelzEnabled) {
530 this.callTracker.addCallStarted();
531 call.addStatusWatcher(status => {
532 if (status.code === Status.OK) {
533 this.callTracker.addCallSucceeded();
534 } else {
535 this.callTracker.addCallFailed();
536 }
537 });
538 }
539 return call;
540
541 }
542
543 close() {
544 this.resolvingLoadBalancer.destroy();
545 this.updateState(ConnectivityState.SHUTDOWN);
546 clearInterval(this.callRefTimer);
547 if (this.channelzEnabled) {
548 unregisterChannelzRef(this.channelzRef);
549 }
550
551 this.subchannelPool.unrefUnusedSubchannels();
552 }
553
554 getTarget() {
555 return uriToString(this.target);
556 }
557
558 getConnectivityState(tryToConnect: boolean) {
559 const connectivityState = this.connectivityState;
560 if (tryToConnect) {
561 this.resolvingLoadBalancer.exitIdle();
562 }
563 return connectivityState;
564 }
565
566 watchConnectivityState(
567 currentState: ConnectivityState,
568 deadline: Date | number,
569 callback: (error?: Error) => void
570 ): void {
571 if (this.connectivityState === ConnectivityState.SHUTDOWN) {
572 throw new Error('Channel has been shut down');
573 }
574 let timer = null;
575 if (deadline !== Infinity) {
576 const deadlineDate: Date =
577 deadline instanceof Date ? deadline : new Date(deadline);
578 const now = new Date();
579 if (deadline === -Infinity || deadlineDate <= now) {
580 process.nextTick(
581 callback,
582 new Error('Deadline passed without connectivity state change')
583 );
584 return;
585 }
586 timer = setTimeout(() => {
587 this.removeConnectivityStateWatcher(watcherObject);
588 callback(
589 new Error('Deadline passed without connectivity state change')
590 );
591 }, deadlineDate.getTime() - now.getTime());
592 }
593 const watcherObject = {
594 currentState,
595 callback,
596 timer,
597 };
598 this.connectivityStateWatchers.push(watcherObject);
599 }
600
601 /**
602 * Get the channelz reference object for this channel. The returned value is
603 * garbage if channelz is disabled for this channel.
604 * @returns
605 */
606 getChannelzRef() {
607 return this.channelzRef;
608 }
609
610 createCall(
611 method: string,
612 deadline: Deadline,
613 host: string | null | undefined,
614 parentCall: ServerSurfaceCall | null,
615 propagateFlags: number | null | undefined
616 ): Call {
617 if (typeof method !== 'string') {
618 throw new TypeError('Channel#createCall: method must be a string');
619 }
620 if (!(typeof deadline === 'number' || deadline instanceof Date)) {
621 throw new TypeError(
622 'Channel#createCall: deadline must be a number or Date'
623 );
624 }
625 if (this.connectivityState === ConnectivityState.SHUTDOWN) {
626 throw new Error('Channel has been shut down');
627 }
628 return this.createResolvingCall(method, deadline, host, parentCall, propagateFlags);
629 }
630}