UNPKG

13.8 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { ChannelCredentials } from './channel-credentials';
19import { Metadata } from './metadata';
20import { ChannelOptions } from './channel-options';
21import { ConnectivityState } from './connectivity-state';
22import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
23import * as logging from './logging';
24import { LogVerbosity, Status } from './constants';
25import { GrpcUri, uriToString } from './uri-parser';
26import {
27 SubchannelAddress,
28 subchannelAddressToString,
29} from './subchannel-address';
30import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, unregisterChannelzRef } from './channelz';
31import { ConnectivityStateListener } from './subchannel-interface';
32import { SubchannelCallInterceptingListener } from './subchannel-call';
33import { SubchannelCall } from './subchannel-call';
34import { CallEventTracker, SubchannelConnector, Transport } from './transport';
35
36const TRACER_NAME = 'subchannel';
37
38/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
39 * have a constant for the max signed 32 bit integer, so this is a simple way
40 * to calculate it */
41const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
42
43export class Subchannel {
44 /**
45 * The subchannel's current connectivity state. Invariant: `session` === `null`
46 * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE.
47 */
48 private connectivityState: ConnectivityState = ConnectivityState.IDLE;
49 /**
50 * The underlying http2 session used to make requests.
51 */
52 private transport: Transport | null = null;
53 /**
54 * Indicates that the subchannel should transition from TRANSIENT_FAILURE to
55 * CONNECTING instead of IDLE when the backoff timeout ends.
56 */
57 private continueConnecting = false;
58 /**
59 * A list of listener functions that will be called whenever the connectivity
60 * state changes. Will be modified by `addConnectivityStateListener` and
61 * `removeConnectivityStateListener`
62 */
63 private stateListeners: Set<ConnectivityStateListener> = new Set();
64
65 private backoffTimeout: BackoffTimeout;
66
67 private keepaliveTime: number;
68 /**
69 * Tracks channels and subchannel pools with references to this subchannel
70 */
71 private refcount = 0;
72
73 /**
74 * A string representation of the subchannel address, for logging/tracing
75 */
76 private subchannelAddressString: string;
77
78 // Channelz info
79 private readonly channelzEnabled: boolean = true;
80 private channelzRef: SubchannelRef;
81 private channelzTrace: ChannelzTrace;
82 private callTracker = new ChannelzCallTracker();
83 private childrenTracker = new ChannelzChildrenTracker();
84
85 // Channelz socket info
86 private streamTracker = new ChannelzCallTracker();
87
88 /**
89 * A class representing a connection to a single backend.
90 * @param channelTarget The target string for the channel as a whole
91 * @param subchannelAddress The address for the backend that this subchannel
92 * will connect to
93 * @param options The channel options, plus any specific subchannel options
94 * for this subchannel
95 * @param credentials The channel credentials used to establish this
96 * connection
97 */
98 constructor(
99 private channelTarget: GrpcUri,
100 private subchannelAddress: SubchannelAddress,
101 private options: ChannelOptions,
102 private credentials: ChannelCredentials,
103 private connector: SubchannelConnector
104 ) {
105 const backoffOptions: BackoffOptions = {
106 initialDelay: options['grpc.initial_reconnect_backoff_ms'],
107 maxDelay: options['grpc.max_reconnect_backoff_ms'],
108 };
109 this.backoffTimeout = new BackoffTimeout(() => {
110 this.handleBackoffTimer();
111 }, backoffOptions);
112 this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
113
114 this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
115
116 if (options['grpc.enable_channelz'] === 0) {
117 this.channelzEnabled = false;
118 }
119 this.channelzTrace = new ChannelzTrace();
120 this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled);
121 if (this.channelzEnabled) {
122 this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
123 }
124 this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
125 }
126
127 private getChannelzInfo(): SubchannelInfo {
128 return {
129 state: this.connectivityState,
130 trace: this.channelzTrace,
131 callTracker: this.callTracker,
132 children: this.childrenTracker.getChildLists(),
133 target: this.subchannelAddressString
134 };
135 }
136
137 private trace(text: string): void {
138 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
139 }
140
141 private refTrace(text: string): void {
142 logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
143 }
144
145 private handleBackoffTimer() {
146 if (this.continueConnecting) {
147 this.transitionToState(
148 [ConnectivityState.TRANSIENT_FAILURE],
149 ConnectivityState.CONNECTING
150 );
151 } else {
152 this.transitionToState(
153 [ConnectivityState.TRANSIENT_FAILURE],
154 ConnectivityState.IDLE
155 );
156 }
157 }
158
159 /**
160 * Start a backoff timer with the current nextBackoff timeout
161 */
162 private startBackoff() {
163 this.backoffTimeout.runOnce();
164 }
165
166 private stopBackoff() {
167 this.backoffTimeout.stop();
168 this.backoffTimeout.reset();
169 }
170
171 private startConnectingInternal() {
172 let options = this.options;
173 if (options['grpc.keepalive_time_ms']) {
174 const adjustedKeepaliveTime = Math.min(this.keepaliveTime, KEEPALIVE_MAX_TIME_MS);
175 options = {...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime};
176 }
177 this.connector.connect(this.subchannelAddress, this.credentials, options).then(
178 transport => {
179 if (this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.READY)) {
180 this.transport = transport;
181 if (this.channelzEnabled) {
182 this.childrenTracker.refChild(transport.getChannelzRef());
183 }
184 transport.addDisconnectListener((tooManyPings) => {
185 this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE);
186 if (tooManyPings && this.keepaliveTime > 0) {
187 this.keepaliveTime *= 2;
188 logging.log(
189 LogVerbosity.ERROR,
190 `Connection to ${uriToString(this.channelTarget)} at ${
191 this.subchannelAddressString
192 } rejected by server because of excess pings. Increasing ping interval to ${
193 this.keepaliveTime
194 } ms`
195 );
196 }
197 });
198 }
199 },
200 error => {
201 this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.TRANSIENT_FAILURE);
202 }
203 )
204 }
205
206 /**
207 * Initiate a state transition from any element of oldStates to the new
208 * state. If the current connectivityState is not in oldStates, do nothing.
209 * @param oldStates The set of states to transition from
210 * @param newState The state to transition to
211 * @returns True if the state changed, false otherwise
212 */
213 private transitionToState(
214 oldStates: ConnectivityState[],
215 newState: ConnectivityState
216 ): boolean {
217 if (oldStates.indexOf(this.connectivityState) === -1) {
218 return false;
219 }
220 this.trace(
221 ConnectivityState[this.connectivityState] +
222 ' -> ' +
223 ConnectivityState[newState]
224 );
225 if (this.channelzEnabled) {
226 this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
227 }
228 const previousState = this.connectivityState;
229 this.connectivityState = newState;
230 switch (newState) {
231 case ConnectivityState.READY:
232 this.stopBackoff();
233 break;
234 case ConnectivityState.CONNECTING:
235 this.startBackoff();
236 this.startConnectingInternal();
237 this.continueConnecting = false;
238 break;
239 case ConnectivityState.TRANSIENT_FAILURE:
240 if (this.channelzEnabled && this.transport) {
241 this.childrenTracker.unrefChild(this.transport.getChannelzRef());
242 }
243 this.transport?.shutdown();
244 this.transport = null;
245 /* If the backoff timer has already ended by the time we get to the
246 * TRANSIENT_FAILURE state, we want to immediately transition out of
247 * TRANSIENT_FAILURE as though the backoff timer is ending right now */
248 if (!this.backoffTimeout.isRunning()) {
249 process.nextTick(() => {
250 this.handleBackoffTimer();
251 });
252 }
253 break;
254 case ConnectivityState.IDLE:
255 if (this.channelzEnabled && this.transport) {
256 this.childrenTracker.unrefChild(this.transport.getChannelzRef());
257 }
258 this.transport?.shutdown();
259 this.transport = null;
260 break;
261 default:
262 throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
263 }
264 for (const listener of this.stateListeners) {
265 listener(this, previousState, newState, this.keepaliveTime);
266 }
267 return true;
268 }
269
270 ref() {
271 this.refTrace(
272 'refcount ' +
273 this.refcount +
274 ' -> ' +
275 (this.refcount + 1)
276 );
277 this.refcount += 1;
278 }
279
280 unref() {
281 this.refTrace(
282 'refcount ' +
283 this.refcount +
284 ' -> ' +
285 (this.refcount - 1)
286 );
287 this.refcount -= 1;
288 if (this.refcount === 0) {
289 if (this.channelzEnabled) {
290 this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
291 }
292 if (this.channelzEnabled) {
293 unregisterChannelzRef(this.channelzRef);
294 }
295 process.nextTick(() => {
296 this.transitionToState(
297 [ConnectivityState.CONNECTING, ConnectivityState.READY],
298 ConnectivityState.IDLE
299 );
300 });
301 }
302 }
303
304 unrefIfOneRef(): boolean {
305 if (this.refcount === 1) {
306 this.unref();
307 return true;
308 }
309 return false;
310 }
311
312 createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener): SubchannelCall {
313 if (!this.transport) {
314 throw new Error('Cannot create call, subchannel not READY');
315 }
316 let statsTracker: Partial<CallEventTracker>;
317 if (this.channelzEnabled) {
318 this.callTracker.addCallStarted();
319 this.streamTracker.addCallStarted();
320 statsTracker = {
321 onCallEnd: status => {
322 if (status.code === Status.OK) {
323 this.callTracker.addCallSucceeded();
324 } else {
325 this.callTracker.addCallFailed();
326 }
327 }
328 }
329 } else {
330 statsTracker = {};
331 }
332 return this.transport.createCall(metadata, host, method, listener, statsTracker);
333 }
334
335 /**
336 * If the subchannel is currently IDLE, start connecting and switch to the
337 * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE,
338 * the next time it would transition to IDLE, start connecting again instead.
339 * Otherwise, do nothing.
340 */
341 startConnecting() {
342 process.nextTick(() => {
343 /* First, try to transition from IDLE to connecting. If that doesn't happen
344 * because the state is not currently IDLE, check if it is
345 * TRANSIENT_FAILURE, and if so indicate that it should go back to
346 * connecting after the backoff timer ends. Otherwise do nothing */
347 if (
348 !this.transitionToState(
349 [ConnectivityState.IDLE],
350 ConnectivityState.CONNECTING
351 )
352 ) {
353 if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
354 this.continueConnecting = true;
355 }
356 }
357 });
358 }
359
360 /**
361 * Get the subchannel's current connectivity state.
362 */
363 getConnectivityState() {
364 return this.connectivityState;
365 }
366
367 /**
368 * Add a listener function to be called whenever the subchannel's
369 * connectivity state changes.
370 * @param listener
371 */
372 addConnectivityStateListener(listener: ConnectivityStateListener) {
373 this.stateListeners.add(listener);
374 }
375
376 /**
377 * Remove a listener previously added with `addConnectivityStateListener`
378 * @param listener A reference to a function previously passed to
379 * `addConnectivityStateListener`
380 */
381 removeConnectivityStateListener(listener: ConnectivityStateListener) {
382 this.stateListeners.delete(listener);
383 }
384
385 /**
386 * Reset the backoff timeout, and immediately start connecting if in backoff.
387 */
388 resetBackoff() {
389 process.nextTick(() => {
390 this.backoffTimeout.reset();
391 this.transitionToState(
392 [ConnectivityState.TRANSIENT_FAILURE],
393 ConnectivityState.CONNECTING
394 );
395 });
396 }
397
398 getAddress(): string {
399 return this.subchannelAddressString;
400 }
401
402 getChannelzRef(): SubchannelRef {
403 return this.channelzRef;
404 }
405
406 getRealSubchannel(): this {
407 return this;
408 }
409
410 throttleKeepalive(newKeepaliveTime: number) {
411 if (newKeepaliveTime > this.keepaliveTime) {
412 this.keepaliveTime = newKeepaliveTime;
413 }
414 }
415}