1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { ChannelCredentials } from './channel-credentials';
|
19 | import { Metadata } from './metadata';
|
20 | import { ChannelOptions } from './channel-options';
|
21 | import { ConnectivityState } from './connectivity-state';
|
22 | import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
|
23 | import * as logging from './logging';
|
24 | import { LogVerbosity, Status } from './constants';
|
25 | import { GrpcUri, uriToString } from './uri-parser';
|
26 | import {
|
27 | SubchannelAddress,
|
28 | subchannelAddressToString,
|
29 | } from './subchannel-address';
|
30 | import {
|
31 | SubchannelRef,
|
32 | ChannelzTrace,
|
33 | ChannelzChildrenTracker,
|
34 | SubchannelInfo,
|
35 | registerChannelzSubchannel,
|
36 | ChannelzCallTracker,
|
37 | unregisterChannelzRef,
|
38 | } from './channelz';
|
39 | import {
|
40 | ConnectivityStateListener,
|
41 | SubchannelInterface,
|
42 | } from './subchannel-interface';
|
43 | import { SubchannelCallInterceptingListener } from './subchannel-call';
|
44 | import { SubchannelCall } from './subchannel-call';
|
45 | import { CallEventTracker, SubchannelConnector, Transport } from './transport';
|
46 |
|
47 | const TRACER_NAME = 'subchannel';
|
48 |
|
49 |
|
50 |
|
51 |
|
52 | const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
|
53 |
|
54 | export class Subchannel {
|
55 | |
56 |
|
57 |
|
58 |
|
59 | private connectivityState: ConnectivityState = ConnectivityState.IDLE;
|
60 | |
61 |
|
62 |
|
63 | private transport: Transport | null = null;
|
64 | |
65 |
|
66 |
|
67 |
|
68 | private continueConnecting = false;
|
69 | |
70 |
|
71 |
|
72 |
|
73 |
|
74 | private stateListeners: Set<ConnectivityStateListener> = new Set();
|
75 |
|
76 | private backoffTimeout: BackoffTimeout;
|
77 |
|
78 | private keepaliveTime: number;
|
79 | |
80 |
|
81 |
|
82 | private refcount = 0;
|
83 |
|
84 | |
85 |
|
86 |
|
87 | private subchannelAddressString: string;
|
88 |
|
89 |
|
90 | private readonly channelzEnabled: boolean = true;
|
91 | private channelzRef: SubchannelRef;
|
92 | private channelzTrace: ChannelzTrace;
|
93 | private callTracker = new ChannelzCallTracker();
|
94 | private childrenTracker = new ChannelzChildrenTracker();
|
95 |
|
96 |
|
97 | private streamTracker = new ChannelzCallTracker();
|
98 |
|
99 | |
100 |
|
101 |
|
102 |
|
103 |
|
104 |
|
105 |
|
106 |
|
107 |
|
108 |
|
109 | constructor(
|
110 | private channelTarget: GrpcUri,
|
111 | private subchannelAddress: SubchannelAddress,
|
112 | private options: ChannelOptions,
|
113 | private credentials: ChannelCredentials,
|
114 | private connector: SubchannelConnector
|
115 | ) {
|
116 | const backoffOptions: BackoffOptions = {
|
117 | initialDelay: options['grpc.initial_reconnect_backoff_ms'],
|
118 | maxDelay: options['grpc.max_reconnect_backoff_ms'],
|
119 | };
|
120 | this.backoffTimeout = new BackoffTimeout(() => {
|
121 | this.handleBackoffTimer();
|
122 | }, backoffOptions);
|
123 | this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
|
124 |
|
125 | this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
|
126 |
|
127 | if (options['grpc.enable_channelz'] === 0) {
|
128 | this.channelzEnabled = false;
|
129 | }
|
130 | this.channelzTrace = new ChannelzTrace();
|
131 | this.channelzRef = registerChannelzSubchannel(
|
132 | this.subchannelAddressString,
|
133 | () => this.getChannelzInfo(),
|
134 | this.channelzEnabled
|
135 | );
|
136 | if (this.channelzEnabled) {
|
137 | this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
|
138 | }
|
139 | this.trace(
|
140 | 'Subchannel constructed with options ' +
|
141 | JSON.stringify(options, undefined, 2)
|
142 | );
|
143 | }
|
144 |
|
145 | private getChannelzInfo(): SubchannelInfo {
|
146 | return {
|
147 | state: this.connectivityState,
|
148 | trace: this.channelzTrace,
|
149 | callTracker: this.callTracker,
|
150 | children: this.childrenTracker.getChildLists(),
|
151 | target: this.subchannelAddressString,
|
152 | };
|
153 | }
|
154 |
|
155 | private trace(text: string): void {
|
156 | logging.trace(
|
157 | LogVerbosity.DEBUG,
|
158 | TRACER_NAME,
|
159 | '(' +
|
160 | this.channelzRef.id +
|
161 | ') ' +
|
162 | this.subchannelAddressString +
|
163 | ' ' +
|
164 | text
|
165 | );
|
166 | }
|
167 |
|
168 | private refTrace(text: string): void {
|
169 | logging.trace(
|
170 | LogVerbosity.DEBUG,
|
171 | 'subchannel_refcount',
|
172 | '(' +
|
173 | this.channelzRef.id +
|
174 | ') ' +
|
175 | this.subchannelAddressString +
|
176 | ' ' +
|
177 | text
|
178 | );
|
179 | }
|
180 |
|
181 | private handleBackoffTimer() {
|
182 | if (this.continueConnecting) {
|
183 | this.transitionToState(
|
184 | [ConnectivityState.TRANSIENT_FAILURE],
|
185 | ConnectivityState.CONNECTING
|
186 | );
|
187 | } else {
|
188 | this.transitionToState(
|
189 | [ConnectivityState.TRANSIENT_FAILURE],
|
190 | ConnectivityState.IDLE
|
191 | );
|
192 | }
|
193 | }
|
194 |
|
195 | |
196 |
|
197 |
|
198 | private startBackoff() {
|
199 | this.backoffTimeout.runOnce();
|
200 | }
|
201 |
|
202 | private stopBackoff() {
|
203 | this.backoffTimeout.stop();
|
204 | this.backoffTimeout.reset();
|
205 | }
|
206 |
|
207 | private startConnectingInternal() {
|
208 | let options = this.options;
|
209 | if (options['grpc.keepalive_time_ms']) {
|
210 | const adjustedKeepaliveTime = Math.min(
|
211 | this.keepaliveTime,
|
212 | KEEPALIVE_MAX_TIME_MS
|
213 | );
|
214 | options = { ...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime };
|
215 | }
|
216 | this.connector
|
217 | .connect(this.subchannelAddress, this.credentials, options)
|
218 | .then(
|
219 | transport => {
|
220 | if (
|
221 | this.transitionToState(
|
222 | [ConnectivityState.CONNECTING],
|
223 | ConnectivityState.READY
|
224 | )
|
225 | ) {
|
226 | this.transport = transport;
|
227 | if (this.channelzEnabled) {
|
228 | this.childrenTracker.refChild(transport.getChannelzRef());
|
229 | }
|
230 | transport.addDisconnectListener(tooManyPings => {
|
231 | this.transitionToState(
|
232 | [ConnectivityState.READY],
|
233 | ConnectivityState.IDLE
|
234 | );
|
235 | if (tooManyPings && this.keepaliveTime > 0) {
|
236 | this.keepaliveTime *= 2;
|
237 | logging.log(
|
238 | LogVerbosity.ERROR,
|
239 | `Connection to ${uriToString(this.channelTarget)} at ${
|
240 | this.subchannelAddressString
|
241 | } rejected by server because of excess pings. Increasing ping interval to ${
|
242 | this.keepaliveTime
|
243 | } ms`
|
244 | );
|
245 | }
|
246 | });
|
247 | }
|
248 | },
|
249 | error => {
|
250 | this.transitionToState(
|
251 | [ConnectivityState.CONNECTING],
|
252 | ConnectivityState.TRANSIENT_FAILURE
|
253 | );
|
254 | }
|
255 | );
|
256 | }
|
257 |
|
258 | |
259 |
|
260 |
|
261 |
|
262 |
|
263 |
|
264 |
|
265 | private transitionToState(
|
266 | oldStates: ConnectivityState[],
|
267 | newState: ConnectivityState
|
268 | ): boolean {
|
269 | if (oldStates.indexOf(this.connectivityState) === -1) {
|
270 | return false;
|
271 | }
|
272 | this.trace(
|
273 | ConnectivityState[this.connectivityState] +
|
274 | ' -> ' +
|
275 | ConnectivityState[newState]
|
276 | );
|
277 | if (this.channelzEnabled) {
|
278 | this.channelzTrace.addTrace(
|
279 | 'CT_INFO',
|
280 | 'Connectivity state change to ' + ConnectivityState[newState]
|
281 | );
|
282 | }
|
283 | const previousState = this.connectivityState;
|
284 | this.connectivityState = newState;
|
285 | switch (newState) {
|
286 | case ConnectivityState.READY:
|
287 | this.stopBackoff();
|
288 | break;
|
289 | case ConnectivityState.CONNECTING:
|
290 | this.startBackoff();
|
291 | this.startConnectingInternal();
|
292 | this.continueConnecting = false;
|
293 | break;
|
294 | case ConnectivityState.TRANSIENT_FAILURE:
|
295 | if (this.channelzEnabled && this.transport) {
|
296 | this.childrenTracker.unrefChild(this.transport.getChannelzRef());
|
297 | }
|
298 | this.transport?.shutdown();
|
299 | this.transport = null;
|
300 | |
301 |
|
302 |
|
303 | if (!this.backoffTimeout.isRunning()) {
|
304 | process.nextTick(() => {
|
305 | this.handleBackoffTimer();
|
306 | });
|
307 | }
|
308 | break;
|
309 | case ConnectivityState.IDLE:
|
310 | if (this.channelzEnabled && this.transport) {
|
311 | this.childrenTracker.unrefChild(this.transport.getChannelzRef());
|
312 | }
|
313 | this.transport?.shutdown();
|
314 | this.transport = null;
|
315 | break;
|
316 | default:
|
317 | throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
|
318 | }
|
319 | for (const listener of this.stateListeners) {
|
320 | listener(this, previousState, newState, this.keepaliveTime);
|
321 | }
|
322 | return true;
|
323 | }
|
324 |
|
325 | ref() {
|
326 | this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount + 1));
|
327 | this.refcount += 1;
|
328 | }
|
329 |
|
330 | unref() {
|
331 | this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount - 1));
|
332 | this.refcount -= 1;
|
333 | if (this.refcount === 0) {
|
334 | if (this.channelzEnabled) {
|
335 | this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
|
336 | }
|
337 | if (this.channelzEnabled) {
|
338 | unregisterChannelzRef(this.channelzRef);
|
339 | }
|
340 | process.nextTick(() => {
|
341 | this.transitionToState(
|
342 | [ConnectivityState.CONNECTING, ConnectivityState.READY],
|
343 | ConnectivityState.IDLE
|
344 | );
|
345 | });
|
346 | }
|
347 | }
|
348 |
|
349 | unrefIfOneRef(): boolean {
|
350 | if (this.refcount === 1) {
|
351 | this.unref();
|
352 | return true;
|
353 | }
|
354 | return false;
|
355 | }
|
356 |
|
357 | createCall(
|
358 | metadata: Metadata,
|
359 | host: string,
|
360 | method: string,
|
361 | listener: SubchannelCallInterceptingListener
|
362 | ): SubchannelCall {
|
363 | if (!this.transport) {
|
364 | throw new Error('Cannot create call, subchannel not READY');
|
365 | }
|
366 | let statsTracker: Partial<CallEventTracker>;
|
367 | if (this.channelzEnabled) {
|
368 | this.callTracker.addCallStarted();
|
369 | this.streamTracker.addCallStarted();
|
370 | statsTracker = {
|
371 | onCallEnd: status => {
|
372 | if (status.code === Status.OK) {
|
373 | this.callTracker.addCallSucceeded();
|
374 | } else {
|
375 | this.callTracker.addCallFailed();
|
376 | }
|
377 | },
|
378 | };
|
379 | } else {
|
380 | statsTracker = {};
|
381 | }
|
382 | return this.transport.createCall(
|
383 | metadata,
|
384 | host,
|
385 | method,
|
386 | listener,
|
387 | statsTracker
|
388 | );
|
389 | }
|
390 |
|
391 | |
392 |
|
393 |
|
394 |
|
395 |
|
396 |
|
397 | startConnecting() {
|
398 | process.nextTick(() => {
|
399 | |
400 |
|
401 |
|
402 |
|
403 | if (
|
404 | !this.transitionToState(
|
405 | [ConnectivityState.IDLE],
|
406 | ConnectivityState.CONNECTING
|
407 | )
|
408 | ) {
|
409 | if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
|
410 | this.continueConnecting = true;
|
411 | }
|
412 | }
|
413 | });
|
414 | }
|
415 |
|
416 | |
417 |
|
418 |
|
419 | getConnectivityState() {
|
420 | return this.connectivityState;
|
421 | }
|
422 |
|
423 | |
424 |
|
425 |
|
426 |
|
427 |
|
428 | addConnectivityStateListener(listener: ConnectivityStateListener) {
|
429 | this.stateListeners.add(listener);
|
430 | }
|
431 |
|
432 | |
433 |
|
434 |
|
435 |
|
436 |
|
437 | removeConnectivityStateListener(listener: ConnectivityStateListener) {
|
438 | this.stateListeners.delete(listener);
|
439 | }
|
440 |
|
441 | |
442 |
|
443 |
|
444 | resetBackoff() {
|
445 | process.nextTick(() => {
|
446 | this.backoffTimeout.reset();
|
447 | this.transitionToState(
|
448 | [ConnectivityState.TRANSIENT_FAILURE],
|
449 | ConnectivityState.CONNECTING
|
450 | );
|
451 | });
|
452 | }
|
453 |
|
454 | getAddress(): string {
|
455 | return this.subchannelAddressString;
|
456 | }
|
457 |
|
458 | getChannelzRef(): SubchannelRef {
|
459 | return this.channelzRef;
|
460 | }
|
461 |
|
462 | getRealSubchannel(): this {
|
463 | return this;
|
464 | }
|
465 |
|
466 | realSubchannelEquals(other: SubchannelInterface): boolean {
|
467 | return other.getRealSubchannel() === this;
|
468 | }
|
469 |
|
470 | throttleKeepalive(newKeepaliveTime: number) {
|
471 | if (newKeepaliveTime > this.keepaliveTime) {
|
472 | this.keepaliveTime = newKeepaliveTime;
|
473 | }
|
474 | }
|
475 | }
|