UNPKG

15.3 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { ChannelCredentials } from './channel-credentials';
19import { Metadata } from './metadata';
20import { ChannelOptions } from './channel-options';
21import { ConnectivityState } from './connectivity-state';
22import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
23import * as logging from './logging';
24import { LogVerbosity, Status } from './constants';
25import { GrpcUri, uriToString } from './uri-parser';
26import {
27 SubchannelAddress,
28 subchannelAddressToString,
29} from './subchannel-address';
30import {
31 SubchannelRef,
32 ChannelzTrace,
33 ChannelzChildrenTracker,
34 ChannelzChildrenTrackerStub,
35 SubchannelInfo,
36 registerChannelzSubchannel,
37 ChannelzCallTracker,
38 ChannelzCallTrackerStub,
39 unregisterChannelzRef,
40 ChannelzTraceStub,
41} from './channelz';
42import {
43 ConnectivityStateListener,
44 SubchannelInterface,
45} from './subchannel-interface';
46import { SubchannelCallInterceptingListener } from './subchannel-call';
47import { SubchannelCall } from './subchannel-call';
48import { CallEventTracker, SubchannelConnector, Transport } from './transport';
49
50const TRACER_NAME = 'subchannel';
51
52/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
53 * have a constant for the max signed 32 bit integer, so this is a simple way
54 * to calculate it */
55const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
56
57export class Subchannel {
58 /**
59 * The subchannel's current connectivity state. Invariant: `session` === `null`
60 * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE.
61 */
62 private connectivityState: ConnectivityState = ConnectivityState.IDLE;
63 /**
64 * The underlying http2 session used to make requests.
65 */
66 private transport: Transport | null = null;
67 /**
68 * Indicates that the subchannel should transition from TRANSIENT_FAILURE to
69 * CONNECTING instead of IDLE when the backoff timeout ends.
70 */
71 private continueConnecting = false;
72 /**
73 * A list of listener functions that will be called whenever the connectivity
74 * state changes. Will be modified by `addConnectivityStateListener` and
75 * `removeConnectivityStateListener`
76 */
77 private stateListeners: Set<ConnectivityStateListener> = new Set();
78
79 private backoffTimeout: BackoffTimeout;
80
81 private keepaliveTime: number;
82 /**
83 * Tracks channels and subchannel pools with references to this subchannel
84 */
85 private refcount = 0;
86
87 /**
88 * A string representation of the subchannel address, for logging/tracing
89 */
90 private subchannelAddressString: string;
91
92 // Channelz info
93 private readonly channelzEnabled: boolean = true;
94 private channelzRef: SubchannelRef;
95
96 private channelzTrace: ChannelzTrace | ChannelzTraceStub;
97 private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
98 private childrenTracker:
99 | ChannelzChildrenTracker
100 | ChannelzChildrenTrackerStub;
101
102 // Channelz socket info
103 private streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
104
105 /**
106 * A class representing a connection to a single backend.
107 * @param channelTarget The target string for the channel as a whole
108 * @param subchannelAddress The address for the backend that this subchannel
109 * will connect to
110 * @param options The channel options, plus any specific subchannel options
111 * for this subchannel
112 * @param credentials The channel credentials used to establish this
113 * connection
114 */
115 constructor(
116 private channelTarget: GrpcUri,
117 private subchannelAddress: SubchannelAddress,
118 private options: ChannelOptions,
119 private credentials: ChannelCredentials,
120 private connector: SubchannelConnector
121 ) {
122 const backoffOptions: BackoffOptions = {
123 initialDelay: options['grpc.initial_reconnect_backoff_ms'],
124 maxDelay: options['grpc.max_reconnect_backoff_ms'],
125 };
126 this.backoffTimeout = new BackoffTimeout(() => {
127 this.handleBackoffTimer();
128 }, backoffOptions);
129 this.backoffTimeout.unref();
130 this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
131
132 this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
133
134 if (options['grpc.enable_channelz'] === 0) {
135 this.channelzEnabled = false;
136 this.channelzTrace = new ChannelzTraceStub();
137 this.callTracker = new ChannelzCallTrackerStub();
138 this.childrenTracker = new ChannelzChildrenTrackerStub();
139 this.streamTracker = new ChannelzCallTrackerStub();
140 } else {
141 this.channelzTrace = new ChannelzTrace();
142 this.callTracker = new ChannelzCallTracker();
143 this.childrenTracker = new ChannelzChildrenTracker();
144 this.streamTracker = new ChannelzCallTracker();
145 }
146
147 this.channelzRef = registerChannelzSubchannel(
148 this.subchannelAddressString,
149 () => this.getChannelzInfo(),
150 this.channelzEnabled
151 );
152
153 this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
154 this.trace(
155 'Subchannel constructed with options ' +
156 JSON.stringify(options, undefined, 2)
157 );
158 }
159
160 private getChannelzInfo(): SubchannelInfo {
161 return {
162 state: this.connectivityState,
163 trace: this.channelzTrace,
164 callTracker: this.callTracker,
165 children: this.childrenTracker.getChildLists(),
166 target: this.subchannelAddressString,
167 };
168 }
169
170 private trace(text: string): void {
171 logging.trace(
172 LogVerbosity.DEBUG,
173 TRACER_NAME,
174 '(' +
175 this.channelzRef.id +
176 ') ' +
177 this.subchannelAddressString +
178 ' ' +
179 text
180 );
181 }
182
183 private refTrace(text: string): void {
184 logging.trace(
185 LogVerbosity.DEBUG,
186 'subchannel_refcount',
187 '(' +
188 this.channelzRef.id +
189 ') ' +
190 this.subchannelAddressString +
191 ' ' +
192 text
193 );
194 }
195
196 private handleBackoffTimer() {
197 if (this.continueConnecting) {
198 this.transitionToState(
199 [ConnectivityState.TRANSIENT_FAILURE],
200 ConnectivityState.CONNECTING
201 );
202 } else {
203 this.transitionToState(
204 [ConnectivityState.TRANSIENT_FAILURE],
205 ConnectivityState.IDLE
206 );
207 }
208 }
209
210 /**
211 * Start a backoff timer with the current nextBackoff timeout
212 */
213 private startBackoff() {
214 this.backoffTimeout.runOnce();
215 }
216
217 private stopBackoff() {
218 this.backoffTimeout.stop();
219 this.backoffTimeout.reset();
220 }
221
222 private startConnectingInternal() {
223 let options = this.options;
224 if (options['grpc.keepalive_time_ms']) {
225 const adjustedKeepaliveTime = Math.min(
226 this.keepaliveTime,
227 KEEPALIVE_MAX_TIME_MS
228 );
229 options = { ...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime };
230 }
231 this.connector
232 .connect(this.subchannelAddress, this.credentials, options)
233 .then(
234 transport => {
235 if (
236 this.transitionToState(
237 [ConnectivityState.CONNECTING],
238 ConnectivityState.READY
239 )
240 ) {
241 this.transport = transport;
242 if (this.channelzEnabled) {
243 this.childrenTracker.refChild(transport.getChannelzRef());
244 }
245 transport.addDisconnectListener(tooManyPings => {
246 this.transitionToState(
247 [ConnectivityState.READY],
248 ConnectivityState.IDLE
249 );
250 if (tooManyPings && this.keepaliveTime > 0) {
251 this.keepaliveTime *= 2;
252 logging.log(
253 LogVerbosity.ERROR,
254 `Connection to ${uriToString(this.channelTarget)} at ${
255 this.subchannelAddressString
256 } rejected by server because of excess pings. Increasing ping interval to ${
257 this.keepaliveTime
258 } ms`
259 );
260 }
261 });
262 } else {
263 /* If we can't transition from CONNECTING to READY here, we will
264 * not be using this transport, so release its resources. */
265 transport.shutdown();
266 }
267 },
268 error => {
269 this.transitionToState(
270 [ConnectivityState.CONNECTING],
271 ConnectivityState.TRANSIENT_FAILURE,
272 `${error}`
273 );
274 }
275 );
276 }
277
278 /**
279 * Initiate a state transition from any element of oldStates to the new
280 * state. If the current connectivityState is not in oldStates, do nothing.
281 * @param oldStates The set of states to transition from
282 * @param newState The state to transition to
283 * @returns True if the state changed, false otherwise
284 */
285 private transitionToState(
286 oldStates: ConnectivityState[],
287 newState: ConnectivityState,
288 errorMessage?: string
289 ): boolean {
290 if (oldStates.indexOf(this.connectivityState) === -1) {
291 return false;
292 }
293 this.trace(
294 ConnectivityState[this.connectivityState] +
295 ' -> ' +
296 ConnectivityState[newState]
297 );
298 if (this.channelzEnabled) {
299 this.channelzTrace.addTrace(
300 'CT_INFO',
301 'Connectivity state change to ' + ConnectivityState[newState]
302 );
303 }
304 const previousState = this.connectivityState;
305 this.connectivityState = newState;
306 switch (newState) {
307 case ConnectivityState.READY:
308 this.stopBackoff();
309 break;
310 case ConnectivityState.CONNECTING:
311 this.startBackoff();
312 this.startConnectingInternal();
313 this.continueConnecting = false;
314 break;
315 case ConnectivityState.TRANSIENT_FAILURE:
316 if (this.channelzEnabled && this.transport) {
317 this.childrenTracker.unrefChild(this.transport.getChannelzRef());
318 }
319 this.transport?.shutdown();
320 this.transport = null;
321 /* If the backoff timer has already ended by the time we get to the
322 * TRANSIENT_FAILURE state, we want to immediately transition out of
323 * TRANSIENT_FAILURE as though the backoff timer is ending right now */
324 if (!this.backoffTimeout.isRunning()) {
325 process.nextTick(() => {
326 this.handleBackoffTimer();
327 });
328 }
329 break;
330 case ConnectivityState.IDLE:
331 if (this.channelzEnabled && this.transport) {
332 this.childrenTracker.unrefChild(this.transport.getChannelzRef());
333 }
334 this.transport?.shutdown();
335 this.transport = null;
336 break;
337 default:
338 throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
339 }
340 for (const listener of this.stateListeners) {
341 listener(this, previousState, newState, this.keepaliveTime, errorMessage);
342 }
343 return true;
344 }
345
346 ref() {
347 this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount + 1));
348 this.refcount += 1;
349 }
350
351 unref() {
352 this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount - 1));
353 this.refcount -= 1;
354 if (this.refcount === 0) {
355 this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
356 unregisterChannelzRef(this.channelzRef);
357 process.nextTick(() => {
358 this.transitionToState(
359 [ConnectivityState.CONNECTING, ConnectivityState.READY],
360 ConnectivityState.IDLE
361 );
362 });
363 }
364 }
365
366 unrefIfOneRef(): boolean {
367 if (this.refcount === 1) {
368 this.unref();
369 return true;
370 }
371 return false;
372 }
373
374 createCall(
375 metadata: Metadata,
376 host: string,
377 method: string,
378 listener: SubchannelCallInterceptingListener
379 ): SubchannelCall {
380 if (!this.transport) {
381 throw new Error('Cannot create call, subchannel not READY');
382 }
383 let statsTracker: Partial<CallEventTracker>;
384 if (this.channelzEnabled) {
385 this.callTracker.addCallStarted();
386 this.streamTracker.addCallStarted();
387 statsTracker = {
388 onCallEnd: status => {
389 if (status.code === Status.OK) {
390 this.callTracker.addCallSucceeded();
391 } else {
392 this.callTracker.addCallFailed();
393 }
394 },
395 };
396 } else {
397 statsTracker = {};
398 }
399 return this.transport.createCall(
400 metadata,
401 host,
402 method,
403 listener,
404 statsTracker
405 );
406 }
407
408 /**
409 * If the subchannel is currently IDLE, start connecting and switch to the
410 * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE,
411 * the next time it would transition to IDLE, start connecting again instead.
412 * Otherwise, do nothing.
413 */
414 startConnecting() {
415 process.nextTick(() => {
416 /* First, try to transition from IDLE to connecting. If that doesn't happen
417 * because the state is not currently IDLE, check if it is
418 * TRANSIENT_FAILURE, and if so indicate that it should go back to
419 * connecting after the backoff timer ends. Otherwise do nothing */
420 if (
421 !this.transitionToState(
422 [ConnectivityState.IDLE],
423 ConnectivityState.CONNECTING
424 )
425 ) {
426 if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
427 this.continueConnecting = true;
428 }
429 }
430 });
431 }
432
433 /**
434 * Get the subchannel's current connectivity state.
435 */
436 getConnectivityState() {
437 return this.connectivityState;
438 }
439
440 /**
441 * Add a listener function to be called whenever the subchannel's
442 * connectivity state changes.
443 * @param listener
444 */
445 addConnectivityStateListener(listener: ConnectivityStateListener) {
446 this.stateListeners.add(listener);
447 }
448
449 /**
450 * Remove a listener previously added with `addConnectivityStateListener`
451 * @param listener A reference to a function previously passed to
452 * `addConnectivityStateListener`
453 */
454 removeConnectivityStateListener(listener: ConnectivityStateListener) {
455 this.stateListeners.delete(listener);
456 }
457
458 /**
459 * Reset the backoff timeout, and immediately start connecting if in backoff.
460 */
461 resetBackoff() {
462 process.nextTick(() => {
463 this.backoffTimeout.reset();
464 this.transitionToState(
465 [ConnectivityState.TRANSIENT_FAILURE],
466 ConnectivityState.CONNECTING
467 );
468 });
469 }
470
471 getAddress(): string {
472 return this.subchannelAddressString;
473 }
474
475 getChannelzRef(): SubchannelRef {
476 return this.channelzRef;
477 }
478
479 isHealthy(): boolean {
480 return true;
481 }
482
483 addHealthStateWatcher(listener: (healthy: boolean) => void): void {
484 // Do nothing with the listener
485 }
486
487 removeHealthStateWatcher(listener: (healthy: boolean) => void): void {
488 // Do nothing with the listener
489 }
490
491 getRealSubchannel(): this {
492 return this;
493 }
494
495 realSubchannelEquals(other: SubchannelInterface): boolean {
496 return other.getRealSubchannel() === this;
497 }
498
499 throttleKeepalive(newKeepaliveTime: number) {
500 if (newKeepaliveTime > this.keepaliveTime) {
501 this.keepaliveTime = newKeepaliveTime;
502 }
503 }
504}