1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { ChannelCredentials } from './channel-credentials';
|
19 | import { Metadata } from './metadata';
|
20 | import { ChannelOptions } from './channel-options';
|
21 | import { ConnectivityState } from './connectivity-state';
|
22 | import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
|
23 | import * as logging from './logging';
|
24 | import { LogVerbosity, Status } from './constants';
|
25 | import { GrpcUri, uriToString } from './uri-parser';
|
26 | import {
|
27 | SubchannelAddress,
|
28 | subchannelAddressToString,
|
29 | } from './subchannel-address';
|
30 | import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, unregisterChannelzRef } from './channelz';
|
31 | import { ConnectivityStateListener } from './subchannel-interface';
|
32 | import { SubchannelCallInterceptingListener } from './subchannel-call';
|
33 | import { SubchannelCall } from './subchannel-call';
|
34 | import { CallEventTracker, SubchannelConnector, Transport } from './transport';
|
35 |
|
36 | const TRACER_NAME = 'subchannel';
|
37 |
|
38 |
|
39 |
|
40 |
|
41 | const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
|
42 |
|
43 | export class Subchannel {
|
44 | |
45 |
|
46 |
|
47 |
|
48 | private connectivityState: ConnectivityState = ConnectivityState.IDLE;
|
49 | |
50 |
|
51 |
|
52 | private transport: Transport | null = null;
|
53 | |
54 |
|
55 |
|
56 |
|
57 | private continueConnecting = false;
|
58 | |
59 |
|
60 |
|
61 |
|
62 |
|
63 | private stateListeners: ConnectivityStateListener[] = [];
|
64 |
|
65 | private backoffTimeout: BackoffTimeout;
|
66 |
|
67 | private keepaliveTime: number;
|
68 | |
69 |
|
70 |
|
71 | private refcount = 0;
|
72 |
|
73 | |
74 |
|
75 |
|
76 | private subchannelAddressString: string;
|
77 |
|
78 |
|
79 | private readonly channelzEnabled: boolean = true;
|
80 | private channelzRef: SubchannelRef;
|
81 | private channelzTrace: ChannelzTrace;
|
82 | private callTracker = new ChannelzCallTracker();
|
83 | private childrenTracker = new ChannelzChildrenTracker();
|
84 |
|
85 |
|
86 | private streamTracker = new ChannelzCallTracker();
|
87 |
|
88 | |
89 |
|
90 |
|
91 |
|
92 |
|
93 |
|
94 |
|
95 |
|
96 |
|
97 |
|
98 | constructor(
|
99 | private channelTarget: GrpcUri,
|
100 | private subchannelAddress: SubchannelAddress,
|
101 | private options: ChannelOptions,
|
102 | private credentials: ChannelCredentials,
|
103 | private connector: SubchannelConnector
|
104 | ) {
|
105 | const backoffOptions: BackoffOptions = {
|
106 | initialDelay: options['grpc.initial_reconnect_backoff_ms'],
|
107 | maxDelay: options['grpc.max_reconnect_backoff_ms'],
|
108 | };
|
109 | this.backoffTimeout = new BackoffTimeout(() => {
|
110 | this.handleBackoffTimer();
|
111 | }, backoffOptions);
|
112 | this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
|
113 |
|
114 | this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
|
115 |
|
116 | if (options['grpc.enable_channelz'] === 0) {
|
117 | this.channelzEnabled = false;
|
118 | }
|
119 | this.channelzTrace = new ChannelzTrace();
|
120 | this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled);
|
121 | if (this.channelzEnabled) {
|
122 | this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
|
123 | }
|
124 | this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
|
125 | }
|
126 |
|
127 | private getChannelzInfo(): SubchannelInfo {
|
128 | return {
|
129 | state: this.connectivityState,
|
130 | trace: this.channelzTrace,
|
131 | callTracker: this.callTracker,
|
132 | children: this.childrenTracker.getChildLists(),
|
133 | target: this.subchannelAddressString
|
134 | };
|
135 | }
|
136 |
|
137 | private trace(text: string): void {
|
138 | logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
|
139 | }
|
140 |
|
141 | private refTrace(text: string): void {
|
142 | logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
|
143 | }
|
144 |
|
145 | private handleBackoffTimer() {
|
146 | if (this.continueConnecting) {
|
147 | this.transitionToState(
|
148 | [ConnectivityState.TRANSIENT_FAILURE],
|
149 | ConnectivityState.CONNECTING
|
150 | );
|
151 | } else {
|
152 | this.transitionToState(
|
153 | [ConnectivityState.TRANSIENT_FAILURE],
|
154 | ConnectivityState.IDLE
|
155 | );
|
156 | }
|
157 | }
|
158 |
|
159 | |
160 |
|
161 |
|
162 | private startBackoff() {
|
163 | this.backoffTimeout.runOnce();
|
164 | }
|
165 |
|
166 | private stopBackoff() {
|
167 | this.backoffTimeout.stop();
|
168 | this.backoffTimeout.reset();
|
169 | }
|
170 |
|
171 | private startConnectingInternal() {
|
172 | let options = this.options;
|
173 | if (options['grpc.keepalive_time_ms']) {
|
174 | const adjustedKeepaliveTime = Math.min(this.keepaliveTime, KEEPALIVE_MAX_TIME_MS);
|
175 | options = {...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime};
|
176 | }
|
177 | this.connector.connect(this.subchannelAddress, this.credentials, options).then(
|
178 | transport => {
|
179 | if (this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.READY)) {
|
180 | this.transport = transport;
|
181 | if (this.channelzEnabled) {
|
182 | this.childrenTracker.refChild(transport.getChannelzRef());
|
183 | }
|
184 | transport.addDisconnectListener((tooManyPings) => {
|
185 | this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE);
|
186 | if (tooManyPings && this.keepaliveTime > 0) {
|
187 | this.keepaliveTime *= 2;
|
188 | logging.log(
|
189 | LogVerbosity.ERROR,
|
190 | `Connection to ${uriToString(this.channelTarget)} at ${
|
191 | this.subchannelAddressString
|
192 | } rejected by server because of excess pings. Increasing ping interval to ${
|
193 | this.keepaliveTime
|
194 | } ms`
|
195 | );
|
196 | }
|
197 | });
|
198 | }
|
199 | },
|
200 | error => {
|
201 | this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.TRANSIENT_FAILURE);
|
202 | }
|
203 | )
|
204 | }
|
205 |
|
206 | |
207 |
|
208 |
|
209 |
|
210 |
|
211 |
|
212 |
|
213 | private transitionToState(
|
214 | oldStates: ConnectivityState[],
|
215 | newState: ConnectivityState
|
216 | ): boolean {
|
217 | if (oldStates.indexOf(this.connectivityState) === -1) {
|
218 | return false;
|
219 | }
|
220 | this.trace(
|
221 | ConnectivityState[this.connectivityState] +
|
222 | ' -> ' +
|
223 | ConnectivityState[newState]
|
224 | );
|
225 | if (this.channelzEnabled) {
|
226 | this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
|
227 | }
|
228 | const previousState = this.connectivityState;
|
229 | this.connectivityState = newState;
|
230 | switch (newState) {
|
231 | case ConnectivityState.READY:
|
232 | this.stopBackoff();
|
233 | break;
|
234 | case ConnectivityState.CONNECTING:
|
235 | this.startBackoff();
|
236 | this.startConnectingInternal();
|
237 | this.continueConnecting = false;
|
238 | break;
|
239 | case ConnectivityState.TRANSIENT_FAILURE:
|
240 | if (this.channelzEnabled && this.transport) {
|
241 | this.childrenTracker.unrefChild(this.transport.getChannelzRef());
|
242 | }
|
243 | this.transport?.shutdown();
|
244 | this.transport = null;
|
245 | |
246 |
|
247 |
|
248 | if (!this.backoffTimeout.isRunning()) {
|
249 | process.nextTick(() => {
|
250 | this.handleBackoffTimer();
|
251 | });
|
252 | }
|
253 | break;
|
254 | case ConnectivityState.IDLE:
|
255 | if (this.channelzEnabled && this.transport) {
|
256 | this.childrenTracker.unrefChild(this.transport.getChannelzRef());
|
257 | }
|
258 | this.transport?.shutdown();
|
259 | this.transport = null;
|
260 | break;
|
261 | default:
|
262 | throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
|
263 | }
|
264 | |
265 |
|
266 | for (const listener of [...this.stateListeners]) {
|
267 | listener(this, previousState, newState, this.keepaliveTime);
|
268 | }
|
269 | return true;
|
270 | }
|
271 |
|
272 | ref() {
|
273 | this.refTrace(
|
274 | 'refcount ' +
|
275 | this.refcount +
|
276 | ' -> ' +
|
277 | (this.refcount + 1)
|
278 | );
|
279 | this.refcount += 1;
|
280 | }
|
281 |
|
282 | unref() {
|
283 | this.refTrace(
|
284 | 'refcount ' +
|
285 | this.refcount +
|
286 | ' -> ' +
|
287 | (this.refcount - 1)
|
288 | );
|
289 | this.refcount -= 1;
|
290 | if (this.refcount === 0) {
|
291 | if (this.channelzEnabled) {
|
292 | this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
|
293 | }
|
294 | this.transitionToState(
|
295 | [ConnectivityState.CONNECTING, ConnectivityState.READY],
|
296 | ConnectivityState.IDLE
|
297 | );
|
298 | if (this.channelzEnabled) {
|
299 | unregisterChannelzRef(this.channelzRef);
|
300 | }
|
301 | }
|
302 | }
|
303 |
|
304 | unrefIfOneRef(): boolean {
|
305 | if (this.refcount === 1) {
|
306 | this.unref();
|
307 | return true;
|
308 | }
|
309 | return false;
|
310 | }
|
311 |
|
312 | createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener): SubchannelCall {
|
313 | if (!this.transport) {
|
314 | throw new Error('Cannot create call, subchannel not READY');
|
315 | }
|
316 | let statsTracker: Partial<CallEventTracker>;
|
317 | if (this.channelzEnabled) {
|
318 | this.callTracker.addCallStarted();
|
319 | this.streamTracker.addCallStarted();
|
320 | statsTracker = {
|
321 | onCallEnd: status => {
|
322 | if (status.code === Status.OK) {
|
323 | this.callTracker.addCallSucceeded();
|
324 | } else {
|
325 | this.callTracker.addCallFailed();
|
326 | }
|
327 | }
|
328 | }
|
329 | } else {
|
330 | statsTracker = {};
|
331 | }
|
332 | return this.transport.createCall(metadata, host, method, listener, statsTracker);
|
333 | }
|
334 |
|
335 | |
336 |
|
337 |
|
338 |
|
339 |
|
340 |
|
341 | startConnecting() {
|
342 | |
343 |
|
344 |
|
345 |
|
346 | if (
|
347 | !this.transitionToState(
|
348 | [ConnectivityState.IDLE],
|
349 | ConnectivityState.CONNECTING
|
350 | )
|
351 | ) {
|
352 | if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
|
353 | this.continueConnecting = true;
|
354 | }
|
355 | }
|
356 | }
|
357 |
|
358 | |
359 |
|
360 |
|
361 | getConnectivityState() {
|
362 | return this.connectivityState;
|
363 | }
|
364 |
|
365 | |
366 |
|
367 |
|
368 |
|
369 |
|
370 | addConnectivityStateListener(listener: ConnectivityStateListener) {
|
371 | this.stateListeners.push(listener);
|
372 | }
|
373 |
|
374 | |
375 |
|
376 |
|
377 |
|
378 |
|
379 | removeConnectivityStateListener(listener: ConnectivityStateListener) {
|
380 | const listenerIndex = this.stateListeners.indexOf(listener);
|
381 | if (listenerIndex > -1) {
|
382 | this.stateListeners.splice(listenerIndex, 1);
|
383 | }
|
384 | }
|
385 |
|
386 | |
387 |
|
388 |
|
389 | resetBackoff() {
|
390 | this.backoffTimeout.reset();
|
391 | this.transitionToState(
|
392 | [ConnectivityState.TRANSIENT_FAILURE],
|
393 | ConnectivityState.CONNECTING
|
394 | );
|
395 | }
|
396 |
|
397 | getAddress(): string {
|
398 | return this.subchannelAddressString;
|
399 | }
|
400 |
|
401 | getChannelzRef(): SubchannelRef {
|
402 | return this.channelzRef;
|
403 | }
|
404 |
|
405 | getRealSubchannel(): this {
|
406 | return this;
|
407 | }
|
408 |
|
409 | throttleKeepalive(newKeepaliveTime: number) {
|
410 | if (newKeepaliveTime > this.keepaliveTime) {
|
411 | this.keepaliveTime = newKeepaliveTime;
|
412 | }
|
413 | }
|
414 | }
|