1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { ChannelCredentials } from './channel-credentials';
|
19 | import { Metadata } from './metadata';
|
20 | import { ChannelOptions } from './channel-options';
|
21 | import { ConnectivityState } from './connectivity-state';
|
22 | import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
|
23 | import * as logging from './logging';
|
24 | import { LogVerbosity, Status } from './constants';
|
25 | import { GrpcUri, uriToString } from './uri-parser';
|
26 | import {
|
27 | SubchannelAddress,
|
28 | subchannelAddressToString,
|
29 | } from './subchannel-address';
|
30 | import {
|
31 | SubchannelRef,
|
32 | ChannelzTrace,
|
33 | ChannelzChildrenTracker,
|
34 | ChannelzChildrenTrackerStub,
|
35 | SubchannelInfo,
|
36 | registerChannelzSubchannel,
|
37 | ChannelzCallTracker,
|
38 | ChannelzCallTrackerStub,
|
39 | unregisterChannelzRef,
|
40 | ChannelzTraceStub,
|
41 | } from './channelz';
|
42 | import {
|
43 | ConnectivityStateListener,
|
44 | SubchannelInterface,
|
45 | } from './subchannel-interface';
|
46 | import { SubchannelCallInterceptingListener } from './subchannel-call';
|
47 | import { SubchannelCall } from './subchannel-call';
|
48 | import { CallEventTracker, SubchannelConnector, Transport } from './transport';
|
49 |
|
50 | const TRACER_NAME = 'subchannel';
|
51 |
|
52 |
|
53 |
|
54 |
|
55 | const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
|
56 |
|
57 | export class Subchannel {
|
58 | |
59 |
|
60 |
|
61 |
|
62 | private connectivityState: ConnectivityState = ConnectivityState.IDLE;
|
63 | |
64 |
|
65 |
|
66 | private transport: Transport | null = null;
|
67 | |
68 |
|
69 |
|
70 |
|
71 | private continueConnecting = false;
|
72 | |
73 |
|
74 |
|
75 |
|
76 |
|
77 | private stateListeners: Set<ConnectivityStateListener> = new Set();
|
78 |
|
79 | private backoffTimeout: BackoffTimeout;
|
80 |
|
81 | private keepaliveTime: number;
|
82 | |
83 |
|
84 |
|
85 | private refcount = 0;
|
86 |
|
87 | |
88 |
|
89 |
|
90 | private subchannelAddressString: string;
|
91 |
|
92 |
|
93 | private readonly channelzEnabled: boolean = true;
|
94 | private channelzRef: SubchannelRef;
|
95 |
|
96 | private channelzTrace: ChannelzTrace | ChannelzTraceStub;
|
97 | private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
|
98 | private childrenTracker:
|
99 | | ChannelzChildrenTracker
|
100 | | ChannelzChildrenTrackerStub;
|
101 |
|
102 |
|
103 | private streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
|
104 |
|
105 | |
106 |
|
107 |
|
108 |
|
109 |
|
110 |
|
111 |
|
112 |
|
113 |
|
114 |
|
115 | constructor(
|
116 | private channelTarget: GrpcUri,
|
117 | private subchannelAddress: SubchannelAddress,
|
118 | private options: ChannelOptions,
|
119 | private credentials: ChannelCredentials,
|
120 | private connector: SubchannelConnector
|
121 | ) {
|
122 | const backoffOptions: BackoffOptions = {
|
123 | initialDelay: options['grpc.initial_reconnect_backoff_ms'],
|
124 | maxDelay: options['grpc.max_reconnect_backoff_ms'],
|
125 | };
|
126 | this.backoffTimeout = new BackoffTimeout(() => {
|
127 | this.handleBackoffTimer();
|
128 | }, backoffOptions);
|
129 | this.backoffTimeout.unref();
|
130 | this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
|
131 |
|
132 | this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
|
133 |
|
134 | if (options['grpc.enable_channelz'] === 0) {
|
135 | this.channelzEnabled = false;
|
136 | this.channelzTrace = new ChannelzTraceStub();
|
137 | this.callTracker = new ChannelzCallTrackerStub();
|
138 | this.childrenTracker = new ChannelzChildrenTrackerStub();
|
139 | this.streamTracker = new ChannelzCallTrackerStub();
|
140 | } else {
|
141 | this.channelzTrace = new ChannelzTrace();
|
142 | this.callTracker = new ChannelzCallTracker();
|
143 | this.childrenTracker = new ChannelzChildrenTracker();
|
144 | this.streamTracker = new ChannelzCallTracker();
|
145 | }
|
146 |
|
147 | this.channelzRef = registerChannelzSubchannel(
|
148 | this.subchannelAddressString,
|
149 | () => this.getChannelzInfo(),
|
150 | this.channelzEnabled
|
151 | );
|
152 |
|
153 | this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
|
154 | this.trace(
|
155 | 'Subchannel constructed with options ' +
|
156 | JSON.stringify(options, undefined, 2)
|
157 | );
|
158 | }
|
159 |
|
160 | private getChannelzInfo(): SubchannelInfo {
|
161 | return {
|
162 | state: this.connectivityState,
|
163 | trace: this.channelzTrace,
|
164 | callTracker: this.callTracker,
|
165 | children: this.childrenTracker.getChildLists(),
|
166 | target: this.subchannelAddressString,
|
167 | };
|
168 | }
|
169 |
|
170 | private trace(text: string): void {
|
171 | logging.trace(
|
172 | LogVerbosity.DEBUG,
|
173 | TRACER_NAME,
|
174 | '(' +
|
175 | this.channelzRef.id +
|
176 | ') ' +
|
177 | this.subchannelAddressString +
|
178 | ' ' +
|
179 | text
|
180 | );
|
181 | }
|
182 |
|
183 | private refTrace(text: string): void {
|
184 | logging.trace(
|
185 | LogVerbosity.DEBUG,
|
186 | 'subchannel_refcount',
|
187 | '(' +
|
188 | this.channelzRef.id +
|
189 | ') ' +
|
190 | this.subchannelAddressString +
|
191 | ' ' +
|
192 | text
|
193 | );
|
194 | }
|
195 |
|
196 | private handleBackoffTimer() {
|
197 | if (this.continueConnecting) {
|
198 | this.transitionToState(
|
199 | [ConnectivityState.TRANSIENT_FAILURE],
|
200 | ConnectivityState.CONNECTING
|
201 | );
|
202 | } else {
|
203 | this.transitionToState(
|
204 | [ConnectivityState.TRANSIENT_FAILURE],
|
205 | ConnectivityState.IDLE
|
206 | );
|
207 | }
|
208 | }
|
209 |
|
210 | |
211 |
|
212 |
|
213 | private startBackoff() {
|
214 | this.backoffTimeout.runOnce();
|
215 | }
|
216 |
|
217 | private stopBackoff() {
|
218 | this.backoffTimeout.stop();
|
219 | this.backoffTimeout.reset();
|
220 | }
|
221 |
|
222 | private startConnectingInternal() {
|
223 | let options = this.options;
|
224 | if (options['grpc.keepalive_time_ms']) {
|
225 | const adjustedKeepaliveTime = Math.min(
|
226 | this.keepaliveTime,
|
227 | KEEPALIVE_MAX_TIME_MS
|
228 | );
|
229 | options = { ...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime };
|
230 | }
|
231 | this.connector
|
232 | .connect(this.subchannelAddress, this.credentials, options)
|
233 | .then(
|
234 | transport => {
|
235 | if (
|
236 | this.transitionToState(
|
237 | [ConnectivityState.CONNECTING],
|
238 | ConnectivityState.READY
|
239 | )
|
240 | ) {
|
241 | this.transport = transport;
|
242 | if (this.channelzEnabled) {
|
243 | this.childrenTracker.refChild(transport.getChannelzRef());
|
244 | }
|
245 | transport.addDisconnectListener(tooManyPings => {
|
246 | this.transitionToState(
|
247 | [ConnectivityState.READY],
|
248 | ConnectivityState.IDLE
|
249 | );
|
250 | if (tooManyPings && this.keepaliveTime > 0) {
|
251 | this.keepaliveTime *= 2;
|
252 | logging.log(
|
253 | LogVerbosity.ERROR,
|
254 | `Connection to ${uriToString(this.channelTarget)} at ${
|
255 | this.subchannelAddressString
|
256 | } rejected by server because of excess pings. Increasing ping interval to ${
|
257 | this.keepaliveTime
|
258 | } ms`
|
259 | );
|
260 | }
|
261 | });
|
262 | } else {
|
263 | |
264 |
|
265 | transport.shutdown();
|
266 | }
|
267 | },
|
268 | error => {
|
269 | this.transitionToState(
|
270 | [ConnectivityState.CONNECTING],
|
271 | ConnectivityState.TRANSIENT_FAILURE,
|
272 | `${error}`
|
273 | );
|
274 | }
|
275 | );
|
276 | }
|
277 |
|
278 | |
279 |
|
280 |
|
281 |
|
282 |
|
283 |
|
284 |
|
285 | private transitionToState(
|
286 | oldStates: ConnectivityState[],
|
287 | newState: ConnectivityState,
|
288 | errorMessage?: string
|
289 | ): boolean {
|
290 | if (oldStates.indexOf(this.connectivityState) === -1) {
|
291 | return false;
|
292 | }
|
293 | this.trace(
|
294 | ConnectivityState[this.connectivityState] +
|
295 | ' -> ' +
|
296 | ConnectivityState[newState]
|
297 | );
|
298 | if (this.channelzEnabled) {
|
299 | this.channelzTrace.addTrace(
|
300 | 'CT_INFO',
|
301 | 'Connectivity state change to ' + ConnectivityState[newState]
|
302 | );
|
303 | }
|
304 | const previousState = this.connectivityState;
|
305 | this.connectivityState = newState;
|
306 | switch (newState) {
|
307 | case ConnectivityState.READY:
|
308 | this.stopBackoff();
|
309 | break;
|
310 | case ConnectivityState.CONNECTING:
|
311 | this.startBackoff();
|
312 | this.startConnectingInternal();
|
313 | this.continueConnecting = false;
|
314 | break;
|
315 | case ConnectivityState.TRANSIENT_FAILURE:
|
316 | if (this.channelzEnabled && this.transport) {
|
317 | this.childrenTracker.unrefChild(this.transport.getChannelzRef());
|
318 | }
|
319 | this.transport?.shutdown();
|
320 | this.transport = null;
|
321 | |
322 |
|
323 |
|
324 | if (!this.backoffTimeout.isRunning()) {
|
325 | process.nextTick(() => {
|
326 | this.handleBackoffTimer();
|
327 | });
|
328 | }
|
329 | break;
|
330 | case ConnectivityState.IDLE:
|
331 | if (this.channelzEnabled && this.transport) {
|
332 | this.childrenTracker.unrefChild(this.transport.getChannelzRef());
|
333 | }
|
334 | this.transport?.shutdown();
|
335 | this.transport = null;
|
336 | break;
|
337 | default:
|
338 | throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
|
339 | }
|
340 | for (const listener of this.stateListeners) {
|
341 | listener(this, previousState, newState, this.keepaliveTime, errorMessage);
|
342 | }
|
343 | return true;
|
344 | }
|
345 |
|
346 | ref() {
|
347 | this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount + 1));
|
348 | this.refcount += 1;
|
349 | }
|
350 |
|
351 | unref() {
|
352 | this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount - 1));
|
353 | this.refcount -= 1;
|
354 | if (this.refcount === 0) {
|
355 | this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
|
356 | unregisterChannelzRef(this.channelzRef);
|
357 | process.nextTick(() => {
|
358 | this.transitionToState(
|
359 | [ConnectivityState.CONNECTING, ConnectivityState.READY],
|
360 | ConnectivityState.IDLE
|
361 | );
|
362 | });
|
363 | }
|
364 | }
|
365 |
|
366 | unrefIfOneRef(): boolean {
|
367 | if (this.refcount === 1) {
|
368 | this.unref();
|
369 | return true;
|
370 | }
|
371 | return false;
|
372 | }
|
373 |
|
374 | createCall(
|
375 | metadata: Metadata,
|
376 | host: string,
|
377 | method: string,
|
378 | listener: SubchannelCallInterceptingListener
|
379 | ): SubchannelCall {
|
380 | if (!this.transport) {
|
381 | throw new Error('Cannot create call, subchannel not READY');
|
382 | }
|
383 | let statsTracker: Partial<CallEventTracker>;
|
384 | if (this.channelzEnabled) {
|
385 | this.callTracker.addCallStarted();
|
386 | this.streamTracker.addCallStarted();
|
387 | statsTracker = {
|
388 | onCallEnd: status => {
|
389 | if (status.code === Status.OK) {
|
390 | this.callTracker.addCallSucceeded();
|
391 | } else {
|
392 | this.callTracker.addCallFailed();
|
393 | }
|
394 | },
|
395 | };
|
396 | } else {
|
397 | statsTracker = {};
|
398 | }
|
399 | return this.transport.createCall(
|
400 | metadata,
|
401 | host,
|
402 | method,
|
403 | listener,
|
404 | statsTracker
|
405 | );
|
406 | }
|
407 |
|
408 | |
409 |
|
410 |
|
411 |
|
412 |
|
413 |
|
414 | startConnecting() {
|
415 | process.nextTick(() => {
|
416 | |
417 |
|
418 |
|
419 |
|
420 | if (
|
421 | !this.transitionToState(
|
422 | [ConnectivityState.IDLE],
|
423 | ConnectivityState.CONNECTING
|
424 | )
|
425 | ) {
|
426 | if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
|
427 | this.continueConnecting = true;
|
428 | }
|
429 | }
|
430 | });
|
431 | }
|
432 |
|
433 | |
434 |
|
435 |
|
436 | getConnectivityState() {
|
437 | return this.connectivityState;
|
438 | }
|
439 |
|
440 | |
441 |
|
442 |
|
443 |
|
444 |
|
445 | addConnectivityStateListener(listener: ConnectivityStateListener) {
|
446 | this.stateListeners.add(listener);
|
447 | }
|
448 |
|
449 | |
450 |
|
451 |
|
452 |
|
453 |
|
454 | removeConnectivityStateListener(listener: ConnectivityStateListener) {
|
455 | this.stateListeners.delete(listener);
|
456 | }
|
457 |
|
458 | |
459 |
|
460 |
|
461 | resetBackoff() {
|
462 | process.nextTick(() => {
|
463 | this.backoffTimeout.reset();
|
464 | this.transitionToState(
|
465 | [ConnectivityState.TRANSIENT_FAILURE],
|
466 | ConnectivityState.CONNECTING
|
467 | );
|
468 | });
|
469 | }
|
470 |
|
471 | getAddress(): string {
|
472 | return this.subchannelAddressString;
|
473 | }
|
474 |
|
475 | getChannelzRef(): SubchannelRef {
|
476 | return this.channelzRef;
|
477 | }
|
478 |
|
479 | isHealthy(): boolean {
|
480 | return true;
|
481 | }
|
482 |
|
483 | addHealthStateWatcher(listener: (healthy: boolean) => void): void {
|
484 |
|
485 | }
|
486 |
|
487 | removeHealthStateWatcher(listener: (healthy: boolean) => void): void {
|
488 |
|
489 | }
|
490 |
|
491 | getRealSubchannel(): this {
|
492 | return this;
|
493 | }
|
494 |
|
495 | realSubchannelEquals(other: SubchannelInterface): boolean {
|
496 | return other.getRealSubchannel() === this;
|
497 | }
|
498 |
|
499 | throttleKeepalive(newKeepaliveTime: number) {
|
500 | if (newKeepaliveTime > this.keepaliveTime) {
|
501 | this.keepaliveTime = newKeepaliveTime;
|
502 | }
|
503 | }
|
504 | }
|