UNPKG

14.3 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { ChannelCredentials } from './channel-credentials';
19import { Metadata } from './metadata';
20import { ChannelOptions } from './channel-options';
21import { ConnectivityState } from './connectivity-state';
22import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
23import * as logging from './logging';
24import { LogVerbosity, Status } from './constants';
25import { GrpcUri, uriToString } from './uri-parser';
26import {
27 SubchannelAddress,
28 subchannelAddressToString,
29} from './subchannel-address';
30import {
31 SubchannelRef,
32 ChannelzTrace,
33 ChannelzChildrenTracker,
34 SubchannelInfo,
35 registerChannelzSubchannel,
36 ChannelzCallTracker,
37 unregisterChannelzRef,
38} from './channelz';
39import {
40 ConnectivityStateListener,
41 SubchannelInterface,
42} from './subchannel-interface';
43import { SubchannelCallInterceptingListener } from './subchannel-call';
44import { SubchannelCall } from './subchannel-call';
45import { CallEventTracker, SubchannelConnector, Transport } from './transport';
46
47const TRACER_NAME = 'subchannel';
48
49/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
50 * have a constant for the max signed 32 bit integer, so this is a simple way
51 * to calculate it */
52const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
53
54export class Subchannel {
55 /**
56 * The subchannel's current connectivity state. Invariant: `session` === `null`
57 * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE.
58 */
59 private connectivityState: ConnectivityState = ConnectivityState.IDLE;
60 /**
61 * The underlying http2 session used to make requests.
62 */
63 private transport: Transport | null = null;
64 /**
65 * Indicates that the subchannel should transition from TRANSIENT_FAILURE to
66 * CONNECTING instead of IDLE when the backoff timeout ends.
67 */
68 private continueConnecting = false;
69 /**
70 * A list of listener functions that will be called whenever the connectivity
71 * state changes. Will be modified by `addConnectivityStateListener` and
72 * `removeConnectivityStateListener`
73 */
74 private stateListeners: Set<ConnectivityStateListener> = new Set();
75
76 private backoffTimeout: BackoffTimeout;
77
78 private keepaliveTime: number;
79 /**
80 * Tracks channels and subchannel pools with references to this subchannel
81 */
82 private refcount = 0;
83
84 /**
85 * A string representation of the subchannel address, for logging/tracing
86 */
87 private subchannelAddressString: string;
88
89 // Channelz info
90 private readonly channelzEnabled: boolean = true;
91 private channelzRef: SubchannelRef;
92 private channelzTrace: ChannelzTrace;
93 private callTracker = new ChannelzCallTracker();
94 private childrenTracker = new ChannelzChildrenTracker();
95
96 // Channelz socket info
97 private streamTracker = new ChannelzCallTracker();
98
99 /**
100 * A class representing a connection to a single backend.
101 * @param channelTarget The target string for the channel as a whole
102 * @param subchannelAddress The address for the backend that this subchannel
103 * will connect to
104 * @param options The channel options, plus any specific subchannel options
105 * for this subchannel
106 * @param credentials The channel credentials used to establish this
107 * connection
108 */
109 constructor(
110 private channelTarget: GrpcUri,
111 private subchannelAddress: SubchannelAddress,
112 private options: ChannelOptions,
113 private credentials: ChannelCredentials,
114 private connector: SubchannelConnector
115 ) {
116 const backoffOptions: BackoffOptions = {
117 initialDelay: options['grpc.initial_reconnect_backoff_ms'],
118 maxDelay: options['grpc.max_reconnect_backoff_ms'],
119 };
120 this.backoffTimeout = new BackoffTimeout(() => {
121 this.handleBackoffTimer();
122 }, backoffOptions);
123 this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
124
125 this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
126
127 if (options['grpc.enable_channelz'] === 0) {
128 this.channelzEnabled = false;
129 }
130 this.channelzTrace = new ChannelzTrace();
131 this.channelzRef = registerChannelzSubchannel(
132 this.subchannelAddressString,
133 () => this.getChannelzInfo(),
134 this.channelzEnabled
135 );
136 if (this.channelzEnabled) {
137 this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
138 }
139 this.trace(
140 'Subchannel constructed with options ' +
141 JSON.stringify(options, undefined, 2)
142 );
143 }
144
145 private getChannelzInfo(): SubchannelInfo {
146 return {
147 state: this.connectivityState,
148 trace: this.channelzTrace,
149 callTracker: this.callTracker,
150 children: this.childrenTracker.getChildLists(),
151 target: this.subchannelAddressString,
152 };
153 }
154
155 private trace(text: string): void {
156 logging.trace(
157 LogVerbosity.DEBUG,
158 TRACER_NAME,
159 '(' +
160 this.channelzRef.id +
161 ') ' +
162 this.subchannelAddressString +
163 ' ' +
164 text
165 );
166 }
167
168 private refTrace(text: string): void {
169 logging.trace(
170 LogVerbosity.DEBUG,
171 'subchannel_refcount',
172 '(' +
173 this.channelzRef.id +
174 ') ' +
175 this.subchannelAddressString +
176 ' ' +
177 text
178 );
179 }
180
181 private handleBackoffTimer() {
182 if (this.continueConnecting) {
183 this.transitionToState(
184 [ConnectivityState.TRANSIENT_FAILURE],
185 ConnectivityState.CONNECTING
186 );
187 } else {
188 this.transitionToState(
189 [ConnectivityState.TRANSIENT_FAILURE],
190 ConnectivityState.IDLE
191 );
192 }
193 }
194
195 /**
196 * Start a backoff timer with the current nextBackoff timeout
197 */
198 private startBackoff() {
199 this.backoffTimeout.runOnce();
200 }
201
202 private stopBackoff() {
203 this.backoffTimeout.stop();
204 this.backoffTimeout.reset();
205 }
206
207 private startConnectingInternal() {
208 let options = this.options;
209 if (options['grpc.keepalive_time_ms']) {
210 const adjustedKeepaliveTime = Math.min(
211 this.keepaliveTime,
212 KEEPALIVE_MAX_TIME_MS
213 );
214 options = { ...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime };
215 }
216 this.connector
217 .connect(this.subchannelAddress, this.credentials, options)
218 .then(
219 transport => {
220 if (
221 this.transitionToState(
222 [ConnectivityState.CONNECTING],
223 ConnectivityState.READY
224 )
225 ) {
226 this.transport = transport;
227 if (this.channelzEnabled) {
228 this.childrenTracker.refChild(transport.getChannelzRef());
229 }
230 transport.addDisconnectListener(tooManyPings => {
231 this.transitionToState(
232 [ConnectivityState.READY],
233 ConnectivityState.IDLE
234 );
235 if (tooManyPings && this.keepaliveTime > 0) {
236 this.keepaliveTime *= 2;
237 logging.log(
238 LogVerbosity.ERROR,
239 `Connection to ${uriToString(this.channelTarget)} at ${
240 this.subchannelAddressString
241 } rejected by server because of excess pings. Increasing ping interval to ${
242 this.keepaliveTime
243 } ms`
244 );
245 }
246 });
247 }
248 },
249 error => {
250 this.transitionToState(
251 [ConnectivityState.CONNECTING],
252 ConnectivityState.TRANSIENT_FAILURE
253 );
254 }
255 );
256 }
257
258 /**
259 * Initiate a state transition from any element of oldStates to the new
260 * state. If the current connectivityState is not in oldStates, do nothing.
261 * @param oldStates The set of states to transition from
262 * @param newState The state to transition to
263 * @returns True if the state changed, false otherwise
264 */
265 private transitionToState(
266 oldStates: ConnectivityState[],
267 newState: ConnectivityState
268 ): boolean {
269 if (oldStates.indexOf(this.connectivityState) === -1) {
270 return false;
271 }
272 this.trace(
273 ConnectivityState[this.connectivityState] +
274 ' -> ' +
275 ConnectivityState[newState]
276 );
277 if (this.channelzEnabled) {
278 this.channelzTrace.addTrace(
279 'CT_INFO',
280 'Connectivity state change to ' + ConnectivityState[newState]
281 );
282 }
283 const previousState = this.connectivityState;
284 this.connectivityState = newState;
285 switch (newState) {
286 case ConnectivityState.READY:
287 this.stopBackoff();
288 break;
289 case ConnectivityState.CONNECTING:
290 this.startBackoff();
291 this.startConnectingInternal();
292 this.continueConnecting = false;
293 break;
294 case ConnectivityState.TRANSIENT_FAILURE:
295 if (this.channelzEnabled && this.transport) {
296 this.childrenTracker.unrefChild(this.transport.getChannelzRef());
297 }
298 this.transport?.shutdown();
299 this.transport = null;
300 /* If the backoff timer has already ended by the time we get to the
301 * TRANSIENT_FAILURE state, we want to immediately transition out of
302 * TRANSIENT_FAILURE as though the backoff timer is ending right now */
303 if (!this.backoffTimeout.isRunning()) {
304 process.nextTick(() => {
305 this.handleBackoffTimer();
306 });
307 }
308 break;
309 case ConnectivityState.IDLE:
310 if (this.channelzEnabled && this.transport) {
311 this.childrenTracker.unrefChild(this.transport.getChannelzRef());
312 }
313 this.transport?.shutdown();
314 this.transport = null;
315 break;
316 default:
317 throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
318 }
319 for (const listener of this.stateListeners) {
320 listener(this, previousState, newState, this.keepaliveTime);
321 }
322 return true;
323 }
324
325 ref() {
326 this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount + 1));
327 this.refcount += 1;
328 }
329
330 unref() {
331 this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount - 1));
332 this.refcount -= 1;
333 if (this.refcount === 0) {
334 if (this.channelzEnabled) {
335 this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
336 }
337 if (this.channelzEnabled) {
338 unregisterChannelzRef(this.channelzRef);
339 }
340 process.nextTick(() => {
341 this.transitionToState(
342 [ConnectivityState.CONNECTING, ConnectivityState.READY],
343 ConnectivityState.IDLE
344 );
345 });
346 }
347 }
348
349 unrefIfOneRef(): boolean {
350 if (this.refcount === 1) {
351 this.unref();
352 return true;
353 }
354 return false;
355 }
356
357 createCall(
358 metadata: Metadata,
359 host: string,
360 method: string,
361 listener: SubchannelCallInterceptingListener
362 ): SubchannelCall {
363 if (!this.transport) {
364 throw new Error('Cannot create call, subchannel not READY');
365 }
366 let statsTracker: Partial<CallEventTracker>;
367 if (this.channelzEnabled) {
368 this.callTracker.addCallStarted();
369 this.streamTracker.addCallStarted();
370 statsTracker = {
371 onCallEnd: status => {
372 if (status.code === Status.OK) {
373 this.callTracker.addCallSucceeded();
374 } else {
375 this.callTracker.addCallFailed();
376 }
377 },
378 };
379 } else {
380 statsTracker = {};
381 }
382 return this.transport.createCall(
383 metadata,
384 host,
385 method,
386 listener,
387 statsTracker
388 );
389 }
390
391 /**
392 * If the subchannel is currently IDLE, start connecting and switch to the
393 * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE,
394 * the next time it would transition to IDLE, start connecting again instead.
395 * Otherwise, do nothing.
396 */
397 startConnecting() {
398 process.nextTick(() => {
399 /* First, try to transition from IDLE to connecting. If that doesn't happen
400 * because the state is not currently IDLE, check if it is
401 * TRANSIENT_FAILURE, and if so indicate that it should go back to
402 * connecting after the backoff timer ends. Otherwise do nothing */
403 if (
404 !this.transitionToState(
405 [ConnectivityState.IDLE],
406 ConnectivityState.CONNECTING
407 )
408 ) {
409 if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
410 this.continueConnecting = true;
411 }
412 }
413 });
414 }
415
416 /**
417 * Get the subchannel's current connectivity state.
418 */
419 getConnectivityState() {
420 return this.connectivityState;
421 }
422
423 /**
424 * Add a listener function to be called whenever the subchannel's
425 * connectivity state changes.
426 * @param listener
427 */
428 addConnectivityStateListener(listener: ConnectivityStateListener) {
429 this.stateListeners.add(listener);
430 }
431
432 /**
433 * Remove a listener previously added with `addConnectivityStateListener`
434 * @param listener A reference to a function previously passed to
435 * `addConnectivityStateListener`
436 */
437 removeConnectivityStateListener(listener: ConnectivityStateListener) {
438 this.stateListeners.delete(listener);
439 }
440
441 /**
442 * Reset the backoff timeout, and immediately start connecting if in backoff.
443 */
444 resetBackoff() {
445 process.nextTick(() => {
446 this.backoffTimeout.reset();
447 this.transitionToState(
448 [ConnectivityState.TRANSIENT_FAILURE],
449 ConnectivityState.CONNECTING
450 );
451 });
452 }
453
454 getAddress(): string {
455 return this.subchannelAddressString;
456 }
457
458 getChannelzRef(): SubchannelRef {
459 return this.channelzRef;
460 }
461
462 getRealSubchannel(): this {
463 return this;
464 }
465
466 realSubchannelEquals(other: SubchannelInterface): boolean {
467 return other.getRealSubchannel() === this;
468 }
469
470 throttleKeepalive(newKeepaliveTime: number) {
471 if (newKeepaliveTime > this.keepaliveTime) {
472 this.keepaliveTime = newKeepaliveTime;
473 }
474 }
475}