1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import * as http2 from 'http2';
|
19 | import { ChannelCredentials } from './channel-credentials';
|
20 | import { Metadata } from './metadata';
|
21 | import { Call, Http2CallStream, WriteObject } from './call-stream';
|
22 | import { ChannelOptions } from './channel-options';
|
23 | import { PeerCertificate, checkServerIdentity, TLSSocket, CipherNameAndProtocol } from 'tls';
|
24 | import { ConnectivityState } from './connectivity-state';
|
25 | import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
|
26 | import { getDefaultAuthority } from './resolver';
|
27 | import * as logging from './logging';
|
28 | import { LogVerbosity, Status } from './constants';
|
29 | import { getProxiedConnection, ProxyConnectionResult } from './http_proxy';
|
30 | import * as net from 'net';
|
31 | import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser';
|
32 | import { ConnectionOptions } from 'tls';
|
33 | import { FilterFactory, Filter, BaseFilter } from './filter';
|
34 | import {
|
35 | stringToSubchannelAddress,
|
36 | SubchannelAddress,
|
37 | subchannelAddressToString,
|
38 | } from './subchannel-address';
|
39 | import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz';
|
40 | import { ConnectivityStateListener } from './subchannel-interface';
|
41 |
|
42 | const clientVersion = require('../../package.json').version;
|
43 |
|
44 | const TRACER_NAME = 'subchannel';
|
45 | const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl';
|
46 |
|
47 | const MIN_CONNECT_TIMEOUT_MS = 20000;
|
48 | const INITIAL_BACKOFF_MS = 1000;
|
49 | const BACKOFF_MULTIPLIER = 1.6;
|
50 | const MAX_BACKOFF_MS = 120000;
|
51 | const BACKOFF_JITTER = 0.2;
|
52 |
|
53 |
|
54 |
|
55 |
|
56 | const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
|
57 | const KEEPALIVE_TIMEOUT_MS = 20000;
|
58 |
|
59 | export interface SubchannelCallStatsTracker {
|
60 | addMessageSent(): void;
|
61 | addMessageReceived(): void;
|
62 | }
|
63 |
|
64 | const {
|
65 | HTTP2_HEADER_AUTHORITY,
|
66 | HTTP2_HEADER_CONTENT_TYPE,
|
67 | HTTP2_HEADER_METHOD,
|
68 | HTTP2_HEADER_PATH,
|
69 | HTTP2_HEADER_TE,
|
70 | HTTP2_HEADER_USER_AGENT,
|
71 | } = http2.constants;
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 |
|
78 | function uniformRandom(min: number, max: number) {
|
79 | return Math.random() * (max - min) + min;
|
80 | }
|
81 |
|
82 | const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii');
|
83 |
|
84 | export class Subchannel {
|
85 | |
86 |
|
87 |
|
88 |
|
89 | private connectivityState: ConnectivityState = ConnectivityState.IDLE;
|
90 | |
91 |
|
92 |
|
93 | private session: http2.ClientHttp2Session | null = null;
|
94 | |
95 |
|
96 |
|
97 |
|
98 | private continueConnecting = false;
|
99 | |
100 |
|
101 |
|
102 |
|
103 |
|
104 | private stateListeners: ConnectivityStateListener[] = [];
|
105 |
|
106 | |
107 |
|
108 |
|
109 |
|
110 |
|
111 | private disconnectListeners: Set<() => void> = new Set();
|
112 |
|
113 | private backoffTimeout: BackoffTimeout;
|
114 |
|
115 | |
116 |
|
117 |
|
118 | private userAgent: string;
|
119 |
|
120 | |
121 |
|
122 |
|
123 | private keepaliveTimeMs: number = KEEPALIVE_MAX_TIME_MS;
|
124 | |
125 |
|
126 |
|
127 | private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS;
|
128 | |
129 |
|
130 |
|
131 | private keepaliveIntervalId: NodeJS.Timer;
|
132 | |
133 |
|
134 |
|
135 | private keepaliveTimeoutId: NodeJS.Timer;
|
136 | |
137 |
|
138 |
|
139 | private keepaliveWithoutCalls = false;
|
140 |
|
141 | |
142 |
|
143 |
|
144 | private callRefcount = 0;
|
145 | |
146 |
|
147 |
|
148 | private refcount = 0;
|
149 |
|
150 | |
151 |
|
152 |
|
153 | private subchannelAddressString: string;
|
154 |
|
155 |
|
156 | private readonly channelzEnabled: boolean = true;
|
157 | private channelzRef: SubchannelRef;
|
158 | private channelzTrace: ChannelzTrace;
|
159 | private callTracker = new ChannelzCallTracker();
|
160 | private childrenTracker = new ChannelzChildrenTracker();
|
161 |
|
162 |
|
163 | private channelzSocketRef: SocketRef | null = null;
|
164 | |
165 |
|
166 |
|
167 |
|
168 | private remoteName: string | null = null;
|
169 | private streamTracker = new ChannelzCallTracker();
|
170 | private keepalivesSent = 0;
|
171 | private messagesSent = 0;
|
172 | private messagesReceived = 0;
|
173 | private lastMessageSentTimestamp: Date | null = null;
|
174 | private lastMessageReceivedTimestamp: Date | null = null;
|
175 |
|
176 | |
177 |
|
178 |
|
179 |
|
180 |
|
181 |
|
182 |
|
183 |
|
184 |
|
185 |
|
186 | constructor(
|
187 | private channelTarget: GrpcUri,
|
188 | private subchannelAddress: SubchannelAddress,
|
189 | private options: ChannelOptions,
|
190 | private credentials: ChannelCredentials
|
191 | ) {
|
192 |
|
193 | this.userAgent = [
|
194 | options['grpc.primary_user_agent'],
|
195 | `grpc-node-js/${clientVersion}`,
|
196 | options['grpc.secondary_user_agent'],
|
197 | ]
|
198 | .filter((e) => e)
|
199 | .join(' ');
|
200 |
|
201 | if ('grpc.keepalive_time_ms' in options) {
|
202 | this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
|
203 | }
|
204 | if ('grpc.keepalive_timeout_ms' in options) {
|
205 | this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!;
|
206 | }
|
207 | if ('grpc.keepalive_permit_without_calls' in options) {
|
208 | this.keepaliveWithoutCalls =
|
209 | options['grpc.keepalive_permit_without_calls'] === 1;
|
210 | } else {
|
211 | this.keepaliveWithoutCalls = false;
|
212 | }
|
213 | this.keepaliveIntervalId = setTimeout(() => {}, 0);
|
214 | clearTimeout(this.keepaliveIntervalId);
|
215 | this.keepaliveTimeoutId = setTimeout(() => {}, 0);
|
216 | clearTimeout(this.keepaliveTimeoutId);
|
217 | const backoffOptions: BackoffOptions = {
|
218 | initialDelay: options['grpc.initial_reconnect_backoff_ms'],
|
219 | maxDelay: options['grpc.max_reconnect_backoff_ms'],
|
220 | };
|
221 | this.backoffTimeout = new BackoffTimeout(() => {
|
222 | this.handleBackoffTimer();
|
223 | }, backoffOptions);
|
224 | this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
|
225 |
|
226 | if (options['grpc.enable_channelz'] === 0) {
|
227 | this.channelzEnabled = false;
|
228 | }
|
229 | this.channelzTrace = new ChannelzTrace();
|
230 | this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled);
|
231 | if (this.channelzEnabled) {
|
232 | this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
|
233 | }
|
234 | this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
|
235 | }
|
236 |
|
237 | private getChannelzInfo(): SubchannelInfo {
|
238 | return {
|
239 | state: this.connectivityState,
|
240 | trace: this.channelzTrace,
|
241 | callTracker: this.callTracker,
|
242 | children: this.childrenTracker.getChildLists(),
|
243 | target: this.subchannelAddressString
|
244 | };
|
245 | }
|
246 |
|
247 | private getChannelzSocketInfo(): SocketInfo | null {
|
248 | if (this.session === null) {
|
249 | return null;
|
250 | }
|
251 | const sessionSocket = this.session.socket;
|
252 | const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
|
253 | const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null;
|
254 | let tlsInfo: TlsInfo | null;
|
255 | if (this.session.encrypted) {
|
256 | const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
|
257 | const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher();
|
258 | const certificate = tlsSocket.getCertificate();
|
259 | const peerCertificate = tlsSocket.getPeerCertificate();
|
260 | tlsInfo = {
|
261 | cipherSuiteStandardName: cipherInfo.standardName ?? null,
|
262 | cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
|
263 | localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
|
264 | remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
|
265 | };
|
266 | } else {
|
267 | tlsInfo = null;
|
268 | }
|
269 | const socketInfo: SocketInfo = {
|
270 | remoteAddress: remoteAddress,
|
271 | localAddress: localAddress,
|
272 | security: tlsInfo,
|
273 | remoteName: this.remoteName,
|
274 | streamsStarted: this.streamTracker.callsStarted,
|
275 | streamsSucceeded: this.streamTracker.callsSucceeded,
|
276 | streamsFailed: this.streamTracker.callsFailed,
|
277 | messagesSent: this.messagesSent,
|
278 | messagesReceived: this.messagesReceived,
|
279 | keepAlivesSent: this.keepalivesSent,
|
280 | lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp,
|
281 | lastRemoteStreamCreatedTimestamp: null,
|
282 | lastMessageSentTimestamp: this.lastMessageSentTimestamp,
|
283 | lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp,
|
284 | localFlowControlWindow: this.session.state.localWindowSize ?? null,
|
285 | remoteFlowControlWindow: this.session.state.remoteWindowSize ?? null
|
286 | };
|
287 | return socketInfo;
|
288 | }
|
289 |
|
290 | private resetChannelzSocketInfo() {
|
291 | if (!this.channelzEnabled) {
|
292 | return;
|
293 | }
|
294 | if (this.channelzSocketRef) {
|
295 | unregisterChannelzRef(this.channelzSocketRef);
|
296 | this.childrenTracker.unrefChild(this.channelzSocketRef);
|
297 | this.channelzSocketRef = null;
|
298 | }
|
299 | this.remoteName = null;
|
300 | this.streamTracker = new ChannelzCallTracker();
|
301 | this.keepalivesSent = 0;
|
302 | this.messagesSent = 0;
|
303 | this.messagesReceived = 0;
|
304 | this.lastMessageSentTimestamp = null;
|
305 | this.lastMessageReceivedTimestamp = null;
|
306 | }
|
307 |
|
308 | private trace(text: string): void {
|
309 | logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
|
310 | }
|
311 |
|
312 | private refTrace(text: string): void {
|
313 | logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
|
314 | }
|
315 |
|
316 | private flowControlTrace(text: string): void {
|
317 | logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
|
318 | }
|
319 |
|
320 | private internalsTrace(text: string): void {
|
321 | logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
|
322 | }
|
323 |
|
324 | private keepaliveTrace(text: string): void {
|
325 | logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
|
326 | }
|
327 |
|
328 | private handleBackoffTimer() {
|
329 | if (this.continueConnecting) {
|
330 | this.transitionToState(
|
331 | [ConnectivityState.TRANSIENT_FAILURE],
|
332 | ConnectivityState.CONNECTING
|
333 | );
|
334 | } else {
|
335 | this.transitionToState(
|
336 | [ConnectivityState.TRANSIENT_FAILURE],
|
337 | ConnectivityState.IDLE
|
338 | );
|
339 | }
|
340 | }
|
341 |
|
342 | |
343 |
|
344 |
|
345 | private startBackoff() {
|
346 | this.backoffTimeout.runOnce();
|
347 | }
|
348 |
|
349 | private stopBackoff() {
|
350 | this.backoffTimeout.stop();
|
351 | this.backoffTimeout.reset();
|
352 | }
|
353 |
|
354 | private sendPing() {
|
355 | if (this.channelzEnabled) {
|
356 | this.keepalivesSent += 1;
|
357 | }
|
358 | this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms');
|
359 | this.keepaliveTimeoutId = setTimeout(() => {
|
360 | this.keepaliveTrace('Ping timeout passed without response');
|
361 | this.handleDisconnect();
|
362 | }, this.keepaliveTimeoutMs);
|
363 | this.keepaliveTimeoutId.unref?.();
|
364 | try {
|
365 | this.session!.ping(
|
366 | (err: Error | null, duration: number, payload: Buffer) => {
|
367 | this.keepaliveTrace('Received ping response');
|
368 | clearTimeout(this.keepaliveTimeoutId);
|
369 | }
|
370 | );
|
371 | } catch (e) {
|
372 | |
373 |
|
374 | this.transitionToState(
|
375 | [ConnectivityState.READY],
|
376 | ConnectivityState.TRANSIENT_FAILURE
|
377 | );
|
378 | }
|
379 | }
|
380 |
|
381 | private startKeepalivePings() {
|
382 | this.keepaliveIntervalId = setInterval(() => {
|
383 | this.sendPing();
|
384 | }, this.keepaliveTimeMs);
|
385 | this.keepaliveIntervalId.unref?.();
|
386 | |
387 |
|
388 | }
|
389 |
|
390 | |
391 |
|
392 |
|
393 |
|
394 |
|
395 | private stopKeepalivePings() {
|
396 | clearInterval(this.keepaliveIntervalId);
|
397 | clearTimeout(this.keepaliveTimeoutId);
|
398 | }
|
399 |
|
400 | private createSession(proxyConnectionResult: ProxyConnectionResult) {
|
401 | if (proxyConnectionResult.realTarget) {
|
402 | this.remoteName = uriToString(proxyConnectionResult.realTarget);
|
403 | this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget);
|
404 | } else {
|
405 | this.remoteName = null;
|
406 | this.trace('creating HTTP/2 session');
|
407 | }
|
408 | const targetAuthority = getDefaultAuthority(
|
409 | proxyConnectionResult.realTarget ?? this.channelTarget
|
410 | );
|
411 | let connectionOptions: http2.SecureClientSessionOptions =
|
412 | this.credentials._getConnectionOptions() || {};
|
413 | connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER;
|
414 | if ('grpc-node.max_session_memory' in this.options) {
|
415 | connectionOptions.maxSessionMemory = this.options[
|
416 | 'grpc-node.max_session_memory'
|
417 | ];
|
418 | } else {
|
419 | |
420 |
|
421 |
|
422 |
|
423 | connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
|
424 | }
|
425 | let addressScheme = 'http://';
|
426 | if ('secureContext' in connectionOptions) {
|
427 | addressScheme = 'https://';
|
428 |
|
429 |
|
430 |
|
431 | if (this.options['grpc.ssl_target_name_override']) {
|
432 | const sslTargetNameOverride = this.options[
|
433 | 'grpc.ssl_target_name_override'
|
434 | ]!;
|
435 | connectionOptions.checkServerIdentity = (
|
436 | host: string,
|
437 | cert: PeerCertificate
|
438 | ): Error | undefined => {
|
439 | return checkServerIdentity(sslTargetNameOverride, cert);
|
440 | };
|
441 | connectionOptions.servername = sslTargetNameOverride;
|
442 | } else {
|
443 | const authorityHostname =
|
444 | splitHostPort(targetAuthority)?.host ?? 'localhost';
|
445 |
|
446 | connectionOptions.servername = authorityHostname;
|
447 | }
|
448 | if (proxyConnectionResult.socket) {
|
449 | |
450 |
|
451 |
|
452 |
|
453 |
|
454 | connectionOptions.createConnection = (authority, option) => {
|
455 | return proxyConnectionResult.socket!;
|
456 | };
|
457 | }
|
458 | } else {
|
459 | |
460 |
|
461 |
|
462 | connectionOptions.createConnection = (authority, option) => {
|
463 | if (proxyConnectionResult.socket) {
|
464 | return proxyConnectionResult.socket;
|
465 | } else {
|
466 | |
467 |
|
468 |
|
469 | return net.connect(this.subchannelAddress);
|
470 | }
|
471 | };
|
472 | }
|
473 |
|
474 | connectionOptions = {
|
475 | ...connectionOptions,
|
476 | ...this.subchannelAddress,
|
477 | };
|
478 |
|
479 | |
480 |
|
481 |
|
482 |
|
483 |
|
484 |
|
485 |
|
486 |
|
487 |
|
488 |
|
489 |
|
490 |
|
491 |
|
492 |
|
493 |
|
494 |
|
495 |
|
496 | const session = http2.connect(
|
497 | addressScheme + targetAuthority,
|
498 | connectionOptions
|
499 | );
|
500 | this.session = session;
|
501 | this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!, this.channelzEnabled);
|
502 | if (this.channelzEnabled) {
|
503 | this.childrenTracker.refChild(this.channelzSocketRef);
|
504 | }
|
505 | session.unref();
|
506 | |
507 |
|
508 |
|
509 |
|
510 | session.once('connect', () => {
|
511 | if (this.session === session) {
|
512 | this.transitionToState(
|
513 | [ConnectivityState.CONNECTING],
|
514 | ConnectivityState.READY
|
515 | );
|
516 | }
|
517 | });
|
518 | session.once('close', () => {
|
519 | if (this.session === session) {
|
520 | this.trace('connection closed');
|
521 | this.transitionToState(
|
522 | [ConnectivityState.CONNECTING],
|
523 | ConnectivityState.TRANSIENT_FAILURE
|
524 | );
|
525 | |
526 |
|
527 |
|
528 | this.transitionToState(
|
529 | [ConnectivityState.READY],
|
530 | ConnectivityState.IDLE
|
531 | );
|
532 | }
|
533 | });
|
534 | session.once(
|
535 | 'goaway',
|
536 | (errorCode: number, lastStreamID: number, opaqueData: Buffer) => {
|
537 | if (this.session === session) {
|
538 | |
539 |
|
540 | if (
|
541 | errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
|
542 | opaqueData.equals(tooManyPingsData)
|
543 | ) {
|
544 | this.keepaliveTimeMs = Math.min(
|
545 | 2 * this.keepaliveTimeMs,
|
546 | KEEPALIVE_MAX_TIME_MS
|
547 | );
|
548 | logging.log(
|
549 | LogVerbosity.ERROR,
|
550 | `Connection to ${uriToString(this.channelTarget)} at ${
|
551 | this.subchannelAddressString
|
552 | } rejected by server because of excess pings. Increasing ping interval to ${
|
553 | this.keepaliveTimeMs
|
554 | } ms`
|
555 | );
|
556 | }
|
557 | this.trace(
|
558 | 'connection closed by GOAWAY with code ' +
|
559 | errorCode
|
560 | );
|
561 | this.transitionToState(
|
562 | [ConnectivityState.CONNECTING, ConnectivityState.READY],
|
563 | ConnectivityState.IDLE
|
564 | );
|
565 | }
|
566 | }
|
567 | );
|
568 | session.once('error', (error) => {
|
569 | |
570 |
|
571 | this.trace(
|
572 | 'connection closed with error ' +
|
573 | (error as Error).message
|
574 | );
|
575 | });
|
576 | if (logging.isTracerEnabled(TRACER_NAME)) {
|
577 | session.on('remoteSettings', (settings: http2.Settings) => {
|
578 | this.trace(
|
579 | 'new settings received' +
|
580 | (this.session !== session ? ' on the old connection' : '') +
|
581 | ': ' +
|
582 | JSON.stringify(settings)
|
583 | );
|
584 | });
|
585 | session.on('localSettings', (settings: http2.Settings) => {
|
586 | this.trace(
|
587 | 'local settings acknowledged by remote' +
|
588 | (this.session !== session ? ' on the old connection' : '') +
|
589 | ': ' +
|
590 | JSON.stringify(settings)
|
591 | );
|
592 | });
|
593 | }
|
594 | }
|
595 |
|
596 | private startConnectingInternal() {
|
597 | |
598 |
|
599 |
|
600 |
|
601 | const connectionOptions: ConnectionOptions =
|
602 | this.credentials._getConnectionOptions() || {};
|
603 |
|
604 | if ('secureContext' in connectionOptions) {
|
605 | connectionOptions.ALPNProtocols = ['h2'];
|
606 |
|
607 |
|
608 |
|
609 | if (this.options['grpc.ssl_target_name_override']) {
|
610 | const sslTargetNameOverride = this.options[
|
611 | 'grpc.ssl_target_name_override'
|
612 | ]!;
|
613 | connectionOptions.checkServerIdentity = (
|
614 | host: string,
|
615 | cert: PeerCertificate
|
616 | ): Error | undefined => {
|
617 | return checkServerIdentity(sslTargetNameOverride, cert);
|
618 | };
|
619 | connectionOptions.servername = sslTargetNameOverride;
|
620 | } else {
|
621 | if ('grpc.http_connect_target' in this.options) {
|
622 | |
623 |
|
624 |
|
625 |
|
626 | const targetPath = getDefaultAuthority(
|
627 | parseUri(this.options['grpc.http_connect_target'] as string) ?? {
|
628 | path: 'localhost',
|
629 | }
|
630 | );
|
631 | const hostPort = splitHostPort(targetPath);
|
632 | connectionOptions.servername = hostPort?.host ?? targetPath;
|
633 | }
|
634 | }
|
635 | }
|
636 |
|
637 | getProxiedConnection(
|
638 | this.subchannelAddress,
|
639 | this.options,
|
640 | connectionOptions
|
641 | ).then(
|
642 | (result) => {
|
643 | this.createSession(result);
|
644 | },
|
645 | (reason) => {
|
646 | this.transitionToState(
|
647 | [ConnectivityState.CONNECTING],
|
648 | ConnectivityState.TRANSIENT_FAILURE
|
649 | );
|
650 | }
|
651 | );
|
652 | }
|
653 |
|
654 | private handleDisconnect() {
|
655 | this.transitionToState(
|
656 | [ConnectivityState.READY],
|
657 | ConnectivityState.TRANSIENT_FAILURE);
|
658 | for (const listener of this.disconnectListeners.values()) {
|
659 | listener();
|
660 | }
|
661 | }
|
662 |
|
663 | |
664 |
|
665 |
|
666 |
|
667 |
|
668 |
|
669 |
|
670 | private transitionToState(
|
671 | oldStates: ConnectivityState[],
|
672 | newState: ConnectivityState
|
673 | ): boolean {
|
674 | if (oldStates.indexOf(this.connectivityState) === -1) {
|
675 | return false;
|
676 | }
|
677 | this.trace(
|
678 | ConnectivityState[this.connectivityState] +
|
679 | ' -> ' +
|
680 | ConnectivityState[newState]
|
681 | );
|
682 | if (this.channelzEnabled) {
|
683 | this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
|
684 | }
|
685 | const previousState = this.connectivityState;
|
686 | this.connectivityState = newState;
|
687 | switch (newState) {
|
688 | case ConnectivityState.READY:
|
689 | this.stopBackoff();
|
690 | const session = this.session!;
|
691 | session.socket.once('close', () => {
|
692 | if (this.session === session) {
|
693 | this.handleDisconnect();
|
694 | }
|
695 | });
|
696 | if (this.keepaliveWithoutCalls) {
|
697 | this.startKeepalivePings();
|
698 | }
|
699 | break;
|
700 | case ConnectivityState.CONNECTING:
|
701 | this.startBackoff();
|
702 | this.startConnectingInternal();
|
703 | this.continueConnecting = false;
|
704 | break;
|
705 | case ConnectivityState.TRANSIENT_FAILURE:
|
706 | if (this.session) {
|
707 | this.session.close();
|
708 | }
|
709 | this.session = null;
|
710 | this.resetChannelzSocketInfo();
|
711 | this.stopKeepalivePings();
|
712 | |
713 |
|
714 |
|
715 | if (!this.backoffTimeout.isRunning()) {
|
716 | process.nextTick(() => {
|
717 | this.handleBackoffTimer();
|
718 | });
|
719 | }
|
720 | break;
|
721 | case ConnectivityState.IDLE:
|
722 | if (this.session) {
|
723 | this.session.close();
|
724 | }
|
725 | this.session = null;
|
726 | this.resetChannelzSocketInfo();
|
727 | this.stopKeepalivePings();
|
728 | break;
|
729 | default:
|
730 | throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
|
731 | }
|
732 | |
733 |
|
734 | for (const listener of [...this.stateListeners]) {
|
735 | listener(this, previousState, newState);
|
736 | }
|
737 | return true;
|
738 | }
|
739 |
|
740 | |
741 |
|
742 |
|
743 |
|
744 | private checkBothRefcounts() {
|
745 | |
746 |
|
747 | if (this.callRefcount === 0 && this.refcount === 0) {
|
748 | if (this.channelzEnabled) {
|
749 | this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
|
750 | }
|
751 | this.transitionToState(
|
752 | [ConnectivityState.CONNECTING, ConnectivityState.READY],
|
753 | ConnectivityState.IDLE
|
754 | );
|
755 | if (this.channelzEnabled) {
|
756 | unregisterChannelzRef(this.channelzRef);
|
757 | }
|
758 | }
|
759 | }
|
760 |
|
761 | callRef() {
|
762 | this.refTrace(
|
763 | 'callRefcount ' +
|
764 | this.callRefcount +
|
765 | ' -> ' +
|
766 | (this.callRefcount + 1)
|
767 | );
|
768 | if (this.callRefcount === 0) {
|
769 | if (this.session) {
|
770 | this.session.ref();
|
771 | }
|
772 | this.backoffTimeout.ref();
|
773 | if (!this.keepaliveWithoutCalls) {
|
774 | this.startKeepalivePings();
|
775 | }
|
776 | }
|
777 | this.callRefcount += 1;
|
778 | }
|
779 |
|
780 | callUnref() {
|
781 | this.refTrace(
|
782 | 'callRefcount ' +
|
783 | this.callRefcount +
|
784 | ' -> ' +
|
785 | (this.callRefcount - 1)
|
786 | );
|
787 | this.callRefcount -= 1;
|
788 | if (this.callRefcount === 0) {
|
789 | if (this.session) {
|
790 | this.session.unref();
|
791 | }
|
792 | this.backoffTimeout.unref();
|
793 | if (!this.keepaliveWithoutCalls) {
|
794 | clearInterval(this.keepaliveIntervalId);
|
795 | }
|
796 | this.checkBothRefcounts();
|
797 | }
|
798 | }
|
799 |
|
800 | ref() {
|
801 | this.refTrace(
|
802 | 'refcount ' +
|
803 | this.refcount +
|
804 | ' -> ' +
|
805 | (this.refcount + 1)
|
806 | );
|
807 | this.refcount += 1;
|
808 | }
|
809 |
|
810 | unref() {
|
811 | this.refTrace(
|
812 | 'refcount ' +
|
813 | this.refcount +
|
814 | ' -> ' +
|
815 | (this.refcount - 1)
|
816 | );
|
817 | this.refcount -= 1;
|
818 | this.checkBothRefcounts();
|
819 | }
|
820 |
|
821 | unrefIfOneRef(): boolean {
|
822 | if (this.refcount === 1) {
|
823 | this.unref();
|
824 | return true;
|
825 | }
|
826 | return false;
|
827 | }
|
828 |
|
829 | |
830 |
|
831 |
|
832 |
|
833 |
|
834 |
|
835 |
|
836 | startCallStream(
|
837 | metadata: Metadata,
|
838 | callStream: Http2CallStream,
|
839 | extraFilters: Filter[]
|
840 | ) {
|
841 | const headers = metadata.toHttp2Headers();
|
842 | headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
|
843 | headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
|
844 | headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
|
845 | headers[HTTP2_HEADER_METHOD] = 'POST';
|
846 | headers[HTTP2_HEADER_PATH] = callStream.getMethod();
|
847 | headers[HTTP2_HEADER_TE] = 'trailers';
|
848 | let http2Stream: http2.ClientHttp2Stream;
|
849 | |
850 |
|
851 |
|
852 |
|
853 |
|
854 |
|
855 |
|
856 |
|
857 | try {
|
858 | http2Stream = this.session!.request(headers);
|
859 | } catch (e) {
|
860 | this.transitionToState(
|
861 | [ConnectivityState.READY],
|
862 | ConnectivityState.TRANSIENT_FAILURE
|
863 | );
|
864 | throw e;
|
865 | }
|
866 | let headersString = '';
|
867 | for (const header of Object.keys(headers)) {
|
868 | headersString += '\t\t' + header + ': ' + headers[header] + '\n';
|
869 | }
|
870 | logging.trace(
|
871 | LogVerbosity.DEBUG,
|
872 | 'call_stream',
|
873 | 'Starting stream [' + callStream.getCallNumber() + '] on subchannel ' +
|
874 | '(' + this.channelzRef.id + ') ' +
|
875 | this.subchannelAddressString +
|
876 | ' with headers\n' +
|
877 | headersString
|
878 | );
|
879 | this.flowControlTrace(
|
880 | 'local window size: ' +
|
881 | this.session!.state.localWindowSize +
|
882 | ' remote window size: ' +
|
883 | this.session!.state.remoteWindowSize
|
884 | );
|
885 | const streamSession = this.session;
|
886 | this.internalsTrace(
|
887 | 'session.closed=' +
|
888 | streamSession!.closed +
|
889 | ' session.destroyed=' +
|
890 | streamSession!.destroyed +
|
891 | ' session.socket.destroyed=' +
|
892 | streamSession!.socket.destroyed);
|
893 | let statsTracker: SubchannelCallStatsTracker;
|
894 | if (this.channelzEnabled) {
|
895 | this.callTracker.addCallStarted();
|
896 | callStream.addStatusWatcher(status => {
|
897 | if (status.code === Status.OK) {
|
898 | this.callTracker.addCallSucceeded();
|
899 | } else {
|
900 | this.callTracker.addCallFailed();
|
901 | }
|
902 | });
|
903 | this.streamTracker.addCallStarted();
|
904 | callStream.addStreamEndWatcher(success => {
|
905 | if (streamSession === this.session) {
|
906 | if (success) {
|
907 | this.streamTracker.addCallSucceeded();
|
908 | } else {
|
909 | this.streamTracker.addCallFailed();
|
910 | }
|
911 | }
|
912 | });
|
913 | statsTracker = {
|
914 | addMessageSent: () => {
|
915 | this.messagesSent += 1;
|
916 | this.lastMessageSentTimestamp = new Date();
|
917 | },
|
918 | addMessageReceived: () => {
|
919 | this.messagesReceived += 1;
|
920 | }
|
921 | }
|
922 | } else {
|
923 | statsTracker = {
|
924 | addMessageSent: () => {},
|
925 | addMessageReceived: () => {}
|
926 | }
|
927 | }
|
928 | callStream.attachHttp2Stream(http2Stream, this, extraFilters, statsTracker);
|
929 | }
|
930 |
|
931 | |
932 |
|
933 |
|
934 |
|
935 |
|
936 |
|
937 | startConnecting() {
|
938 | |
939 |
|
940 |
|
941 |
|
942 | if (
|
943 | !this.transitionToState(
|
944 | [ConnectivityState.IDLE],
|
945 | ConnectivityState.CONNECTING
|
946 | )
|
947 | ) {
|
948 | if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
|
949 | this.continueConnecting = true;
|
950 | }
|
951 | }
|
952 | }
|
953 |
|
954 | |
955 |
|
956 |
|
957 | getConnectivityState() {
|
958 | return this.connectivityState;
|
959 | }
|
960 |
|
961 | |
962 |
|
963 |
|
964 |
|
965 |
|
966 | addConnectivityStateListener(listener: ConnectivityStateListener) {
|
967 | this.stateListeners.push(listener);
|
968 | }
|
969 |
|
970 | |
971 |
|
972 |
|
973 |
|
974 |
|
975 | removeConnectivityStateListener(listener: ConnectivityStateListener) {
|
976 | const listenerIndex = this.stateListeners.indexOf(listener);
|
977 | if (listenerIndex > -1) {
|
978 | this.stateListeners.splice(listenerIndex, 1);
|
979 | }
|
980 | }
|
981 |
|
982 | addDisconnectListener(listener: () => void) {
|
983 | this.disconnectListeners.add(listener);
|
984 | }
|
985 |
|
986 | removeDisconnectListener(listener: () => void) {
|
987 | this.disconnectListeners.delete(listener);
|
988 | }
|
989 |
|
990 | |
991 |
|
992 |
|
993 | resetBackoff() {
|
994 | this.backoffTimeout.reset();
|
995 | this.transitionToState(
|
996 | [ConnectivityState.TRANSIENT_FAILURE],
|
997 | ConnectivityState.CONNECTING
|
998 | );
|
999 | }
|
1000 |
|
1001 | getAddress(): string {
|
1002 | return this.subchannelAddressString;
|
1003 | }
|
1004 |
|
1005 | getChannelzRef(): SubchannelRef {
|
1006 | return this.channelzRef;
|
1007 | }
|
1008 |
|
1009 | getRealSubchannel(): this {
|
1010 | return this;
|
1011 | }
|
1012 | }
|