UNPKG

14.7 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Subchannel = void 0;
20const connectivity_state_1 = require("./connectivity-state");
21const backoff_timeout_1 = require("./backoff-timeout");
22const logging = require("./logging");
23const constants_1 = require("./constants");
24const uri_parser_1 = require("./uri-parser");
25const subchannel_address_1 = require("./subchannel-address");
26const channelz_1 = require("./channelz");
27const TRACER_NAME = 'subchannel';
28/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
29 * have a constant for the max signed 32 bit integer, so this is a simple way
30 * to calculate it */
31const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
32class Subchannel {
33 /**
34 * A class representing a connection to a single backend.
35 * @param channelTarget The target string for the channel as a whole
36 * @param subchannelAddress The address for the backend that this subchannel
37 * will connect to
38 * @param options The channel options, plus any specific subchannel options
39 * for this subchannel
40 * @param credentials The channel credentials used to establish this
41 * connection
42 */
43 constructor(channelTarget, subchannelAddress, options, credentials, connector) {
44 var _a;
45 this.channelTarget = channelTarget;
46 this.subchannelAddress = subchannelAddress;
47 this.options = options;
48 this.credentials = credentials;
49 this.connector = connector;
50 /**
51 * The subchannel's current connectivity state. Invariant: `session` === `null`
52 * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE.
53 */
54 this.connectivityState = connectivity_state_1.ConnectivityState.IDLE;
55 /**
56 * The underlying http2 session used to make requests.
57 */
58 this.transport = null;
59 /**
60 * Indicates that the subchannel should transition from TRANSIENT_FAILURE to
61 * CONNECTING instead of IDLE when the backoff timeout ends.
62 */
63 this.continueConnecting = false;
64 /**
65 * A list of listener functions that will be called whenever the connectivity
66 * state changes. Will be modified by `addConnectivityStateListener` and
67 * `removeConnectivityStateListener`
68 */
69 this.stateListeners = [];
70 /**
71 * Tracks channels and subchannel pools with references to this subchannel
72 */
73 this.refcount = 0;
74 // Channelz info
75 this.channelzEnabled = true;
76 this.callTracker = new channelz_1.ChannelzCallTracker();
77 this.childrenTracker = new channelz_1.ChannelzChildrenTracker();
78 // Channelz socket info
79 this.streamTracker = new channelz_1.ChannelzCallTracker();
80 const backoffOptions = {
81 initialDelay: options['grpc.initial_reconnect_backoff_ms'],
82 maxDelay: options['grpc.max_reconnect_backoff_ms'],
83 };
84 this.backoffTimeout = new backoff_timeout_1.BackoffTimeout(() => {
85 this.handleBackoffTimer();
86 }, backoffOptions);
87 this.subchannelAddressString = (0, subchannel_address_1.subchannelAddressToString)(subchannelAddress);
88 this.keepaliveTime = (_a = options['grpc.keepalive_time_ms']) !== null && _a !== void 0 ? _a : -1;
89 if (options['grpc.enable_channelz'] === 0) {
90 this.channelzEnabled = false;
91 }
92 this.channelzTrace = new channelz_1.ChannelzTrace();
93 this.channelzRef = (0, channelz_1.registerChannelzSubchannel)(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled);
94 if (this.channelzEnabled) {
95 this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
96 }
97 this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
98 }
99 getChannelzInfo() {
100 return {
101 state: this.connectivityState,
102 trace: this.channelzTrace,
103 callTracker: this.callTracker,
104 children: this.childrenTracker.getChildLists(),
105 target: this.subchannelAddressString
106 };
107 }
108 trace(text) {
109 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
110 }
111 refTrace(text) {
112 logging.trace(constants_1.LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
113 }
114 handleBackoffTimer() {
115 if (this.continueConnecting) {
116 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.CONNECTING);
117 }
118 else {
119 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.IDLE);
120 }
121 }
122 /**
123 * Start a backoff timer with the current nextBackoff timeout
124 */
125 startBackoff() {
126 this.backoffTimeout.runOnce();
127 }
128 stopBackoff() {
129 this.backoffTimeout.stop();
130 this.backoffTimeout.reset();
131 }
132 startConnectingInternal() {
133 let options = this.options;
134 if (options['grpc.keepalive_time_ms']) {
135 const adjustedKeepaliveTime = Math.min(this.keepaliveTime, KEEPALIVE_MAX_TIME_MS);
136 options = Object.assign(Object.assign({}, options), { 'grpc.keepalive_time_ms': adjustedKeepaliveTime });
137 }
138 this.connector.connect(this.subchannelAddress, this.credentials, options).then(transport => {
139 if (this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.READY)) {
140 this.transport = transport;
141 if (this.channelzEnabled) {
142 this.childrenTracker.refChild(transport.getChannelzRef());
143 }
144 transport.addDisconnectListener((tooManyPings) => {
145 this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE);
146 if (tooManyPings && this.keepaliveTime > 0) {
147 this.keepaliveTime *= 2;
148 logging.log(constants_1.LogVerbosity.ERROR, `Connection to ${(0, uri_parser_1.uriToString)(this.channelTarget)} at ${this.subchannelAddressString} rejected by server because of excess pings. Increasing ping interval to ${this.keepaliveTime} ms`);
149 }
150 });
151 }
152 }, error => {
153 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
154 });
155 }
156 /**
157 * Initiate a state transition from any element of oldStates to the new
158 * state. If the current connectivityState is not in oldStates, do nothing.
159 * @param oldStates The set of states to transition from
160 * @param newState The state to transition to
161 * @returns True if the state changed, false otherwise
162 */
163 transitionToState(oldStates, newState) {
164 var _a, _b;
165 if (oldStates.indexOf(this.connectivityState) === -1) {
166 return false;
167 }
168 this.trace(connectivity_state_1.ConnectivityState[this.connectivityState] +
169 ' -> ' +
170 connectivity_state_1.ConnectivityState[newState]);
171 if (this.channelzEnabled) {
172 this.channelzTrace.addTrace('CT_INFO', connectivity_state_1.ConnectivityState[this.connectivityState] + ' -> ' + connectivity_state_1.ConnectivityState[newState]);
173 }
174 const previousState = this.connectivityState;
175 this.connectivityState = newState;
176 switch (newState) {
177 case connectivity_state_1.ConnectivityState.READY:
178 this.stopBackoff();
179 break;
180 case connectivity_state_1.ConnectivityState.CONNECTING:
181 this.startBackoff();
182 this.startConnectingInternal();
183 this.continueConnecting = false;
184 break;
185 case connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE:
186 if (this.channelzEnabled && this.transport) {
187 this.childrenTracker.unrefChild(this.transport.getChannelzRef());
188 }
189 (_a = this.transport) === null || _a === void 0 ? void 0 : _a.shutdown();
190 this.transport = null;
191 /* If the backoff timer has already ended by the time we get to the
192 * TRANSIENT_FAILURE state, we want to immediately transition out of
193 * TRANSIENT_FAILURE as though the backoff timer is ending right now */
194 if (!this.backoffTimeout.isRunning()) {
195 process.nextTick(() => {
196 this.handleBackoffTimer();
197 });
198 }
199 break;
200 case connectivity_state_1.ConnectivityState.IDLE:
201 if (this.channelzEnabled && this.transport) {
202 this.childrenTracker.unrefChild(this.transport.getChannelzRef());
203 }
204 (_b = this.transport) === null || _b === void 0 ? void 0 : _b.shutdown();
205 this.transport = null;
206 break;
207 default:
208 throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
209 }
210 /* We use a shallow copy of the stateListeners array in case a listener
211 * is removed during this iteration */
212 for (const listener of [...this.stateListeners]) {
213 listener(this, previousState, newState, this.keepaliveTime);
214 }
215 return true;
216 }
217 ref() {
218 this.refTrace('refcount ' +
219 this.refcount +
220 ' -> ' +
221 (this.refcount + 1));
222 this.refcount += 1;
223 }
224 unref() {
225 this.refTrace('refcount ' +
226 this.refcount +
227 ' -> ' +
228 (this.refcount - 1));
229 this.refcount -= 1;
230 if (this.refcount === 0) {
231 if (this.channelzEnabled) {
232 this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
233 }
234 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING, connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE);
235 if (this.channelzEnabled) {
236 (0, channelz_1.unregisterChannelzRef)(this.channelzRef);
237 }
238 }
239 }
240 unrefIfOneRef() {
241 if (this.refcount === 1) {
242 this.unref();
243 return true;
244 }
245 return false;
246 }
247 createCall(metadata, host, method, listener) {
248 if (!this.transport) {
249 throw new Error('Cannot create call, subchannel not READY');
250 }
251 let statsTracker;
252 if (this.channelzEnabled) {
253 this.callTracker.addCallStarted();
254 this.streamTracker.addCallStarted();
255 statsTracker = {
256 onCallEnd: status => {
257 if (status.code === constants_1.Status.OK) {
258 this.callTracker.addCallSucceeded();
259 }
260 else {
261 this.callTracker.addCallFailed();
262 }
263 }
264 };
265 }
266 else {
267 statsTracker = {};
268 }
269 return this.transport.createCall(metadata, host, method, listener, statsTracker);
270 }
271 /**
272 * If the subchannel is currently IDLE, start connecting and switch to the
273 * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE,
274 * the next time it would transition to IDLE, start connecting again instead.
275 * Otherwise, do nothing.
276 */
277 startConnecting() {
278 /* First, try to transition from IDLE to connecting. If that doesn't happen
279 * because the state is not currently IDLE, check if it is
280 * TRANSIENT_FAILURE, and if so indicate that it should go back to
281 * connecting after the backoff timer ends. Otherwise do nothing */
282 if (!this.transitionToState([connectivity_state_1.ConnectivityState.IDLE], connectivity_state_1.ConnectivityState.CONNECTING)) {
283 if (this.connectivityState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) {
284 this.continueConnecting = true;
285 }
286 }
287 }
288 /**
289 * Get the subchannel's current connectivity state.
290 */
291 getConnectivityState() {
292 return this.connectivityState;
293 }
294 /**
295 * Add a listener function to be called whenever the subchannel's
296 * connectivity state changes.
297 * @param listener
298 */
299 addConnectivityStateListener(listener) {
300 this.stateListeners.push(listener);
301 }
302 /**
303 * Remove a listener previously added with `addConnectivityStateListener`
304 * @param listener A reference to a function previously passed to
305 * `addConnectivityStateListener`
306 */
307 removeConnectivityStateListener(listener) {
308 const listenerIndex = this.stateListeners.indexOf(listener);
309 if (listenerIndex > -1) {
310 this.stateListeners.splice(listenerIndex, 1);
311 }
312 }
313 /**
314 * Reset the backoff timeout, and immediately start connecting if in backoff.
315 */
316 resetBackoff() {
317 this.backoffTimeout.reset();
318 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.CONNECTING);
319 }
320 getAddress() {
321 return this.subchannelAddressString;
322 }
323 getChannelzRef() {
324 return this.channelzRef;
325 }
326 getRealSubchannel() {
327 return this;
328 }
329 throttleKeepalive(newKeepaliveTime) {
330 if (newKeepaliveTime > this.keepaliveTime) {
331 this.keepaliveTime = newKeepaliveTime;
332 }
333 }
334}
335exports.Subchannel = Subchannel;
336//# sourceMappingURL=subchannel.js.map
\No newline at end of file