UNPKG

35.5 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as http2 from 'http2';
19import { ChannelCredentials } from './channel-credentials';
20import { Metadata } from './metadata';
21import { Call, Http2CallStream, WriteObject } from './call-stream';
22import { ChannelOptions } from './channel-options';
23import { PeerCertificate, checkServerIdentity, TLSSocket, CipherNameAndProtocol } from 'tls';
24import { ConnectivityState } from './connectivity-state';
25import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
26import { getDefaultAuthority } from './resolver';
27import * as logging from './logging';
28import { LogVerbosity, Status } from './constants';
29import { getProxiedConnection, ProxyConnectionResult } from './http_proxy';
30import * as net from 'net';
31import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser';
32import { ConnectionOptions } from 'tls';
33import { FilterFactory, Filter, BaseFilter } from './filter';
34import {
35 stringToSubchannelAddress,
36 SubchannelAddress,
37 subchannelAddressToString,
38} from './subchannel-address';
39import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz';
40import { ConnectivityStateListener } from './subchannel-interface';
41
42const clientVersion = require('../../package.json').version;
43
44const TRACER_NAME = 'subchannel';
45const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl';
46
47const MIN_CONNECT_TIMEOUT_MS = 20000;
48const INITIAL_BACKOFF_MS = 1000;
49const BACKOFF_MULTIPLIER = 1.6;
50const MAX_BACKOFF_MS = 120000;
51const BACKOFF_JITTER = 0.2;
52
53/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
54 * have a constant for the max signed 32 bit integer, so this is a simple way
55 * to calculate it */
56const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
57const KEEPALIVE_TIMEOUT_MS = 20000;
58
59export interface SubchannelCallStatsTracker {
60 addMessageSent(): void;
61 addMessageReceived(): void;
62}
63
64const {
65 HTTP2_HEADER_AUTHORITY,
66 HTTP2_HEADER_CONTENT_TYPE,
67 HTTP2_HEADER_METHOD,
68 HTTP2_HEADER_PATH,
69 HTTP2_HEADER_TE,
70 HTTP2_HEADER_USER_AGENT,
71} = http2.constants;
72
73/**
74 * Get a number uniformly at random in the range [min, max)
75 * @param min
76 * @param max
77 */
78function uniformRandom(min: number, max: number) {
79 return Math.random() * (max - min) + min;
80}
81
82const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii');
83
84export class Subchannel {
85 /**
86 * The subchannel's current connectivity state. Invariant: `session` === `null`
87 * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE.
88 */
89 private connectivityState: ConnectivityState = ConnectivityState.IDLE;
90 /**
91 * The underlying http2 session used to make requests.
92 */
93 private session: http2.ClientHttp2Session | null = null;
94 /**
95 * Indicates that the subchannel should transition from TRANSIENT_FAILURE to
96 * CONNECTING instead of IDLE when the backoff timeout ends.
97 */
98 private continueConnecting = false;
99 /**
100 * A list of listener functions that will be called whenever the connectivity
101 * state changes. Will be modified by `addConnectivityStateListener` and
102 * `removeConnectivityStateListener`
103 */
104 private stateListeners: ConnectivityStateListener[] = [];
105
106 /**
107 * A list of listener functions that will be called when the underlying
108 * socket disconnects. Used for ending active calls with an UNAVAILABLE
109 * status.
110 */
111 private disconnectListeners: Set<() => void> = new Set();
112
113 private backoffTimeout: BackoffTimeout;
114
115 /**
116 * The complete user agent string constructed using channel args.
117 */
118 private userAgent: string;
119
120 /**
121 * The amount of time in between sending pings
122 */
123 private keepaliveTimeMs: number = KEEPALIVE_MAX_TIME_MS;
124 /**
125 * The amount of time to wait for an acknowledgement after sending a ping
126 */
127 private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS;
128 /**
129 * Timer reference for timeout that indicates when to send the next ping
130 */
131 private keepaliveIntervalId: NodeJS.Timer;
132 /**
133 * Timer reference tracking when the most recent ping will be considered lost
134 */
135 private keepaliveTimeoutId: NodeJS.Timer;
136 /**
137 * Indicates whether keepalive pings should be sent without any active calls
138 */
139 private keepaliveWithoutCalls = false;
140
141 /**
142 * Tracks calls with references to this subchannel
143 */
144 private callRefcount = 0;
145 /**
146 * Tracks channels and subchannel pools with references to this subchannel
147 */
148 private refcount = 0;
149
150 /**
151 * A string representation of the subchannel address, for logging/tracing
152 */
153 private subchannelAddressString: string;
154
155 // Channelz info
156 private readonly channelzEnabled: boolean = true;
157 private channelzRef: SubchannelRef;
158 private channelzTrace: ChannelzTrace;
159 private callTracker = new ChannelzCallTracker();
160 private childrenTracker = new ChannelzChildrenTracker();
161
162 // Channelz socket info
163 private channelzSocketRef: SocketRef | null = null;
164 /**
165 * Name of the remote server, if it is not the same as the subchannel
166 * address, i.e. if connecting through an HTTP CONNECT proxy.
167 */
168 private remoteName: string | null = null;
169 private streamTracker = new ChannelzCallTracker();
170 private keepalivesSent = 0;
171 private messagesSent = 0;
172 private messagesReceived = 0;
173 private lastMessageSentTimestamp: Date | null = null;
174 private lastMessageReceivedTimestamp: Date | null = null;
175
176 /**
177 * A class representing a connection to a single backend.
178 * @param channelTarget The target string for the channel as a whole
179 * @param subchannelAddress The address for the backend that this subchannel
180 * will connect to
181 * @param options The channel options, plus any specific subchannel options
182 * for this subchannel
183 * @param credentials The channel credentials used to establish this
184 * connection
185 */
186 constructor(
187 private channelTarget: GrpcUri,
188 private subchannelAddress: SubchannelAddress,
189 private options: ChannelOptions,
190 private credentials: ChannelCredentials
191 ) {
192 // Build user-agent string.
193 this.userAgent = [
194 options['grpc.primary_user_agent'],
195 `grpc-node-js/${clientVersion}`,
196 options['grpc.secondary_user_agent'],
197 ]
198 .filter((e) => e)
199 .join(' '); // remove falsey values first
200
201 if ('grpc.keepalive_time_ms' in options) {
202 this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
203 }
204 if ('grpc.keepalive_timeout_ms' in options) {
205 this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!;
206 }
207 if ('grpc.keepalive_permit_without_calls' in options) {
208 this.keepaliveWithoutCalls =
209 options['grpc.keepalive_permit_without_calls'] === 1;
210 } else {
211 this.keepaliveWithoutCalls = false;
212 }
213 this.keepaliveIntervalId = setTimeout(() => {}, 0);
214 clearTimeout(this.keepaliveIntervalId);
215 this.keepaliveTimeoutId = setTimeout(() => {}, 0);
216 clearTimeout(this.keepaliveTimeoutId);
217 const backoffOptions: BackoffOptions = {
218 initialDelay: options['grpc.initial_reconnect_backoff_ms'],
219 maxDelay: options['grpc.max_reconnect_backoff_ms'],
220 };
221 this.backoffTimeout = new BackoffTimeout(() => {
222 this.handleBackoffTimer();
223 }, backoffOptions);
224 this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
225
226 if (options['grpc.enable_channelz'] === 0) {
227 this.channelzEnabled = false;
228 }
229 this.channelzTrace = new ChannelzTrace();
230 this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled);
231 if (this.channelzEnabled) {
232 this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
233 }
234 this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
235 }
236
237 private getChannelzInfo(): SubchannelInfo {
238 return {
239 state: this.connectivityState,
240 trace: this.channelzTrace,
241 callTracker: this.callTracker,
242 children: this.childrenTracker.getChildLists(),
243 target: this.subchannelAddressString
244 };
245 }
246
247 private getChannelzSocketInfo(): SocketInfo | null {
248 if (this.session === null) {
249 return null;
250 }
251 const sessionSocket = this.session.socket;
252 const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
253 const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null;
254 let tlsInfo: TlsInfo | null;
255 if (this.session.encrypted) {
256 const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
257 const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher();
258 const certificate = tlsSocket.getCertificate();
259 const peerCertificate = tlsSocket.getPeerCertificate();
260 tlsInfo = {
261 cipherSuiteStandardName: cipherInfo.standardName ?? null,
262 cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
263 localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
264 remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
265 };
266 } else {
267 tlsInfo = null;
268 }
269 const socketInfo: SocketInfo = {
270 remoteAddress: remoteAddress,
271 localAddress: localAddress,
272 security: tlsInfo,
273 remoteName: this.remoteName,
274 streamsStarted: this.streamTracker.callsStarted,
275 streamsSucceeded: this.streamTracker.callsSucceeded,
276 streamsFailed: this.streamTracker.callsFailed,
277 messagesSent: this.messagesSent,
278 messagesReceived: this.messagesReceived,
279 keepAlivesSent: this.keepalivesSent,
280 lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp,
281 lastRemoteStreamCreatedTimestamp: null,
282 lastMessageSentTimestamp: this.lastMessageSentTimestamp,
283 lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp,
284 localFlowControlWindow: this.session.state.localWindowSize ?? null,
285 remoteFlowControlWindow: this.session.state.remoteWindowSize ?? null
286 };
287 return socketInfo;
288 }
289
290 private resetChannelzSocketInfo() {
291 if (!this.channelzEnabled) {
292 return;
293 }
294 if (this.channelzSocketRef) {
295 unregisterChannelzRef(this.channelzSocketRef);
296 this.childrenTracker.unrefChild(this.channelzSocketRef);
297 this.channelzSocketRef = null;
298 }
299 this.remoteName = null;
300 this.streamTracker = new ChannelzCallTracker();
301 this.keepalivesSent = 0;
302 this.messagesSent = 0;
303 this.messagesReceived = 0;
304 this.lastMessageSentTimestamp = null;
305 this.lastMessageReceivedTimestamp = null;
306 }
307
308 private trace(text: string): void {
309 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
310 }
311
312 private refTrace(text: string): void {
313 logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
314 }
315
316 private flowControlTrace(text: string): void {
317 logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
318 }
319
320 private internalsTrace(text: string): void {
321 logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
322 }
323
324 private keepaliveTrace(text: string): void {
325 logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
326 }
327
328 private handleBackoffTimer() {
329 if (this.continueConnecting) {
330 this.transitionToState(
331 [ConnectivityState.TRANSIENT_FAILURE],
332 ConnectivityState.CONNECTING
333 );
334 } else {
335 this.transitionToState(
336 [ConnectivityState.TRANSIENT_FAILURE],
337 ConnectivityState.IDLE
338 );
339 }
340 }
341
342 /**
343 * Start a backoff timer with the current nextBackoff timeout
344 */
345 private startBackoff() {
346 this.backoffTimeout.runOnce();
347 }
348
349 private stopBackoff() {
350 this.backoffTimeout.stop();
351 this.backoffTimeout.reset();
352 }
353
354 private sendPing() {
355 if (this.channelzEnabled) {
356 this.keepalivesSent += 1;
357 }
358 this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms');
359 this.keepaliveTimeoutId = setTimeout(() => {
360 this.keepaliveTrace('Ping timeout passed without response');
361 this.handleDisconnect();
362 }, this.keepaliveTimeoutMs);
363 this.keepaliveTimeoutId.unref?.();
364 try {
365 this.session!.ping(
366 (err: Error | null, duration: number, payload: Buffer) => {
367 this.keepaliveTrace('Received ping response');
368 clearTimeout(this.keepaliveTimeoutId);
369 }
370 );
371 } catch (e) {
372 /* If we fail to send a ping, the connection is no longer functional, so
373 * we should discard it. */
374 this.transitionToState(
375 [ConnectivityState.READY],
376 ConnectivityState.TRANSIENT_FAILURE
377 );
378 }
379 }
380
381 private startKeepalivePings() {
382 this.keepaliveIntervalId = setInterval(() => {
383 this.sendPing();
384 }, this.keepaliveTimeMs);
385 this.keepaliveIntervalId.unref?.();
386 /* Don't send a ping immediately because whatever caused us to start
387 * sending pings should also involve some network activity. */
388 }
389
390 /**
391 * Stop keepalive pings when terminating a connection. This discards the
392 * outstanding ping timeout, so it should not be called if the same
393 * connection will still be used.
394 */
395 private stopKeepalivePings() {
396 clearInterval(this.keepaliveIntervalId);
397 clearTimeout(this.keepaliveTimeoutId);
398 }
399
400 private createSession(proxyConnectionResult: ProxyConnectionResult) {
401 if (proxyConnectionResult.realTarget) {
402 this.remoteName = uriToString(proxyConnectionResult.realTarget);
403 this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget);
404 } else {
405 this.remoteName = null;
406 this.trace('creating HTTP/2 session');
407 }
408 const targetAuthority = getDefaultAuthority(
409 proxyConnectionResult.realTarget ?? this.channelTarget
410 );
411 let connectionOptions: http2.SecureClientSessionOptions =
412 this.credentials._getConnectionOptions() || {};
413 connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER;
414 if ('grpc-node.max_session_memory' in this.options) {
415 connectionOptions.maxSessionMemory = this.options[
416 'grpc-node.max_session_memory'
417 ];
418 } else {
419 /* By default, set a very large max session memory limit, to effectively
420 * disable enforcement of the limit. Some testing indicates that Node's
421 * behavior degrades badly when this limit is reached, so we solve that
422 * by disabling the check entirely. */
423 connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
424 }
425 let addressScheme = 'http://';
426 if ('secureContext' in connectionOptions) {
427 addressScheme = 'https://';
428 // If provided, the value of grpc.ssl_target_name_override should be used
429 // to override the target hostname when checking server identity.
430 // This option is used for testing only.
431 if (this.options['grpc.ssl_target_name_override']) {
432 const sslTargetNameOverride = this.options[
433 'grpc.ssl_target_name_override'
434 ]!;
435 connectionOptions.checkServerIdentity = (
436 host: string,
437 cert: PeerCertificate
438 ): Error | undefined => {
439 return checkServerIdentity(sslTargetNameOverride, cert);
440 };
441 connectionOptions.servername = sslTargetNameOverride;
442 } else {
443 const authorityHostname =
444 splitHostPort(targetAuthority)?.host ?? 'localhost';
445 // We want to always set servername to support SNI
446 connectionOptions.servername = authorityHostname;
447 }
448 if (proxyConnectionResult.socket) {
449 /* This is part of the workaround for
450 * https://github.com/nodejs/node/issues/32922. Without that bug,
451 * proxyConnectionResult.socket would always be a plaintext socket and
452 * this would say
453 * connectionOptions.socket = proxyConnectionResult.socket; */
454 connectionOptions.createConnection = (authority, option) => {
455 return proxyConnectionResult.socket!;
456 };
457 }
458 } else {
459 /* In all but the most recent versions of Node, http2.connect does not use
460 * the options when establishing plaintext connections, so we need to
461 * establish that connection explicitly. */
462 connectionOptions.createConnection = (authority, option) => {
463 if (proxyConnectionResult.socket) {
464 return proxyConnectionResult.socket;
465 } else {
466 /* net.NetConnectOpts is declared in a way that is more restrictive
467 * than what net.connect will actually accept, so we use the type
468 * assertion to work around that. */
469 return net.connect(this.subchannelAddress);
470 }
471 };
472 }
473
474 connectionOptions = {
475 ...connectionOptions,
476 ...this.subchannelAddress,
477 };
478
479 /* http2.connect uses the options here:
480 * https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036
481 * The spread operator overides earlier values with later ones, so any port
482 * or host values in the options will be used rather than any values extracted
483 * from the first argument. In addition, the path overrides the host and port,
484 * as documented for plaintext connections here:
485 * https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener
486 * and for TLS connections here:
487 * https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In
488 * earlier versions of Node, http2.connect passes these options to
489 * tls.connect but not net.connect, so in the insecure case we still need
490 * to set the createConnection option above to create the connection
491 * explicitly. We cannot do that in the TLS case because http2.connect
492 * passes necessary additional options to tls.connect.
493 * The first argument just needs to be parseable as a URL and the scheme
494 * determines whether the connection will be established over TLS or not.
495 */
496 const session = http2.connect(
497 addressScheme + targetAuthority,
498 connectionOptions
499 );
500 this.session = session;
501 this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!, this.channelzEnabled);
502 if (this.channelzEnabled) {
503 this.childrenTracker.refChild(this.channelzSocketRef);
504 }
505 session.unref();
506 /* For all of these events, check if the session at the time of the event
507 * is the same one currently attached to this subchannel, to ensure that
508 * old events from previous connection attempts cannot cause invalid state
509 * transitions. */
510 session.once('connect', () => {
511 if (this.session === session) {
512 this.transitionToState(
513 [ConnectivityState.CONNECTING],
514 ConnectivityState.READY
515 );
516 }
517 });
518 session.once('close', () => {
519 if (this.session === session) {
520 this.trace('connection closed');
521 this.transitionToState(
522 [ConnectivityState.CONNECTING],
523 ConnectivityState.TRANSIENT_FAILURE
524 );
525 /* Transitioning directly to IDLE here should be OK because we are not
526 * doing any backoff, because a connection was established at some
527 * point */
528 this.transitionToState(
529 [ConnectivityState.READY],
530 ConnectivityState.IDLE
531 );
532 }
533 });
534 session.once(
535 'goaway',
536 (errorCode: number, lastStreamID: number, opaqueData: Buffer) => {
537 if (this.session === session) {
538 /* See the last paragraph of
539 * https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */
540 if (
541 errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
542 opaqueData.equals(tooManyPingsData)
543 ) {
544 this.keepaliveTimeMs = Math.min(
545 2 * this.keepaliveTimeMs,
546 KEEPALIVE_MAX_TIME_MS
547 );
548 logging.log(
549 LogVerbosity.ERROR,
550 `Connection to ${uriToString(this.channelTarget)} at ${
551 this.subchannelAddressString
552 } rejected by server because of excess pings. Increasing ping interval to ${
553 this.keepaliveTimeMs
554 } ms`
555 );
556 }
557 this.trace(
558 'connection closed by GOAWAY with code ' +
559 errorCode
560 );
561 this.transitionToState(
562 [ConnectivityState.CONNECTING, ConnectivityState.READY],
563 ConnectivityState.IDLE
564 );
565 }
566 }
567 );
568 session.once('error', (error) => {
569 /* Do nothing here. Any error should also trigger a close event, which is
570 * where we want to handle that. */
571 this.trace(
572 'connection closed with error ' +
573 (error as Error).message
574 );
575 });
576 if (logging.isTracerEnabled(TRACER_NAME)) {
577 session.on('remoteSettings', (settings: http2.Settings) => {
578 this.trace(
579 'new settings received' +
580 (this.session !== session ? ' on the old connection' : '') +
581 ': ' +
582 JSON.stringify(settings)
583 );
584 });
585 session.on('localSettings', (settings: http2.Settings) => {
586 this.trace(
587 'local settings acknowledged by remote' +
588 (this.session !== session ? ' on the old connection' : '') +
589 ': ' +
590 JSON.stringify(settings)
591 );
592 });
593 }
594 }
595
596 private startConnectingInternal() {
597 /* Pass connection options through to the proxy so that it's able to
598 * upgrade it's connection to support tls if needed.
599 * This is a workaround for https://github.com/nodejs/node/issues/32922
600 * See https://github.com/grpc/grpc-node/pull/1369 for more info. */
601 const connectionOptions: ConnectionOptions =
602 this.credentials._getConnectionOptions() || {};
603
604 if ('secureContext' in connectionOptions) {
605 connectionOptions.ALPNProtocols = ['h2'];
606 // If provided, the value of grpc.ssl_target_name_override should be used
607 // to override the target hostname when checking server identity.
608 // This option is used for testing only.
609 if (this.options['grpc.ssl_target_name_override']) {
610 const sslTargetNameOverride = this.options[
611 'grpc.ssl_target_name_override'
612 ]!;
613 connectionOptions.checkServerIdentity = (
614 host: string,
615 cert: PeerCertificate
616 ): Error | undefined => {
617 return checkServerIdentity(sslTargetNameOverride, cert);
618 };
619 connectionOptions.servername = sslTargetNameOverride;
620 } else {
621 if ('grpc.http_connect_target' in this.options) {
622 /* This is more or less how servername will be set in createSession
623 * if a connection is successfully established through the proxy.
624 * If the proxy is not used, these connectionOptions are discarded
625 * anyway */
626 const targetPath = getDefaultAuthority(
627 parseUri(this.options['grpc.http_connect_target'] as string) ?? {
628 path: 'localhost',
629 }
630 );
631 const hostPort = splitHostPort(targetPath);
632 connectionOptions.servername = hostPort?.host ?? targetPath;
633 }
634 }
635 }
636
637 getProxiedConnection(
638 this.subchannelAddress,
639 this.options,
640 connectionOptions
641 ).then(
642 (result) => {
643 this.createSession(result);
644 },
645 (reason) => {
646 this.transitionToState(
647 [ConnectivityState.CONNECTING],
648 ConnectivityState.TRANSIENT_FAILURE
649 );
650 }
651 );
652 }
653
654 private handleDisconnect() {
655 this.transitionToState(
656 [ConnectivityState.READY],
657 ConnectivityState.TRANSIENT_FAILURE);
658 for (const listener of this.disconnectListeners.values()) {
659 listener();
660 }
661 }
662
663 /**
664 * Initiate a state transition from any element of oldStates to the new
665 * state. If the current connectivityState is not in oldStates, do nothing.
666 * @param oldStates The set of states to transition from
667 * @param newState The state to transition to
668 * @returns True if the state changed, false otherwise
669 */
670 private transitionToState(
671 oldStates: ConnectivityState[],
672 newState: ConnectivityState
673 ): boolean {
674 if (oldStates.indexOf(this.connectivityState) === -1) {
675 return false;
676 }
677 this.trace(
678 ConnectivityState[this.connectivityState] +
679 ' -> ' +
680 ConnectivityState[newState]
681 );
682 if (this.channelzEnabled) {
683 this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
684 }
685 const previousState = this.connectivityState;
686 this.connectivityState = newState;
687 switch (newState) {
688 case ConnectivityState.READY:
689 this.stopBackoff();
690 const session = this.session!;
691 session.socket.once('close', () => {
692 if (this.session === session) {
693 this.handleDisconnect();
694 }
695 });
696 if (this.keepaliveWithoutCalls) {
697 this.startKeepalivePings();
698 }
699 break;
700 case ConnectivityState.CONNECTING:
701 this.startBackoff();
702 this.startConnectingInternal();
703 this.continueConnecting = false;
704 break;
705 case ConnectivityState.TRANSIENT_FAILURE:
706 if (this.session) {
707 this.session.close();
708 }
709 this.session = null;
710 this.resetChannelzSocketInfo();
711 this.stopKeepalivePings();
712 /* If the backoff timer has already ended by the time we get to the
713 * TRANSIENT_FAILURE state, we want to immediately transition out of
714 * TRANSIENT_FAILURE as though the backoff timer is ending right now */
715 if (!this.backoffTimeout.isRunning()) {
716 process.nextTick(() => {
717 this.handleBackoffTimer();
718 });
719 }
720 break;
721 case ConnectivityState.IDLE:
722 if (this.session) {
723 this.session.close();
724 }
725 this.session = null;
726 this.resetChannelzSocketInfo();
727 this.stopKeepalivePings();
728 break;
729 default:
730 throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
731 }
732 /* We use a shallow copy of the stateListeners array in case a listener
733 * is removed during this iteration */
734 for (const listener of [...this.stateListeners]) {
735 listener(this, previousState, newState);
736 }
737 return true;
738 }
739
740 /**
741 * Check if the subchannel associated with zero calls and with zero channels.
742 * If so, shut it down.
743 */
744 private checkBothRefcounts() {
745 /* If no calls, channels, or subchannel pools have any more references to
746 * this subchannel, we can be sure it will never be used again. */
747 if (this.callRefcount === 0 && this.refcount === 0) {
748 if (this.channelzEnabled) {
749 this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
750 }
751 this.transitionToState(
752 [ConnectivityState.CONNECTING, ConnectivityState.READY],
753 ConnectivityState.IDLE
754 );
755 if (this.channelzEnabled) {
756 unregisterChannelzRef(this.channelzRef);
757 }
758 }
759 }
760
761 callRef() {
762 this.refTrace(
763 'callRefcount ' +
764 this.callRefcount +
765 ' -> ' +
766 (this.callRefcount + 1)
767 );
768 if (this.callRefcount === 0) {
769 if (this.session) {
770 this.session.ref();
771 }
772 this.backoffTimeout.ref();
773 if (!this.keepaliveWithoutCalls) {
774 this.startKeepalivePings();
775 }
776 }
777 this.callRefcount += 1;
778 }
779
780 callUnref() {
781 this.refTrace(
782 'callRefcount ' +
783 this.callRefcount +
784 ' -> ' +
785 (this.callRefcount - 1)
786 );
787 this.callRefcount -= 1;
788 if (this.callRefcount === 0) {
789 if (this.session) {
790 this.session.unref();
791 }
792 this.backoffTimeout.unref();
793 if (!this.keepaliveWithoutCalls) {
794 clearInterval(this.keepaliveIntervalId);
795 }
796 this.checkBothRefcounts();
797 }
798 }
799
800 ref() {
801 this.refTrace(
802 'refcount ' +
803 this.refcount +
804 ' -> ' +
805 (this.refcount + 1)
806 );
807 this.refcount += 1;
808 }
809
810 unref() {
811 this.refTrace(
812 'refcount ' +
813 this.refcount +
814 ' -> ' +
815 (this.refcount - 1)
816 );
817 this.refcount -= 1;
818 this.checkBothRefcounts();
819 }
820
821 unrefIfOneRef(): boolean {
822 if (this.refcount === 1) {
823 this.unref();
824 return true;
825 }
826 return false;
827 }
828
829 /**
830 * Start a stream on the current session with the given `metadata` as headers
831 * and then attach it to the `callStream`. Must only be called if the
832 * subchannel's current connectivity state is READY.
833 * @param metadata
834 * @param callStream
835 */
836 startCallStream(
837 metadata: Metadata,
838 callStream: Http2CallStream,
839 extraFilters: Filter[]
840 ) {
841 const headers = metadata.toHttp2Headers();
842 headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
843 headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
844 headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
845 headers[HTTP2_HEADER_METHOD] = 'POST';
846 headers[HTTP2_HEADER_PATH] = callStream.getMethod();
847 headers[HTTP2_HEADER_TE] = 'trailers';
848 let http2Stream: http2.ClientHttp2Stream;
849 /* In theory, if an error is thrown by session.request because session has
850 * become unusable (e.g. because it has received a goaway), this subchannel
851 * should soon see the corresponding close or goaway event anyway and leave
852 * READY. But we have seen reports that this does not happen
853 * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096)
854 * so for defense in depth, we just discard the session when we see an
855 * error here.
856 */
857 try {
858 http2Stream = this.session!.request(headers);
859 } catch (e) {
860 this.transitionToState(
861 [ConnectivityState.READY],
862 ConnectivityState.TRANSIENT_FAILURE
863 );
864 throw e;
865 }
866 let headersString = '';
867 for (const header of Object.keys(headers)) {
868 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
869 }
870 logging.trace(
871 LogVerbosity.DEBUG,
872 'call_stream',
873 'Starting stream [' + callStream.getCallNumber() + '] on subchannel ' +
874 '(' + this.channelzRef.id + ') ' +
875 this.subchannelAddressString +
876 ' with headers\n' +
877 headersString
878 );
879 this.flowControlTrace(
880 'local window size: ' +
881 this.session!.state.localWindowSize +
882 ' remote window size: ' +
883 this.session!.state.remoteWindowSize
884 );
885 const streamSession = this.session;
886 this.internalsTrace(
887 'session.closed=' +
888 streamSession!.closed +
889 ' session.destroyed=' +
890 streamSession!.destroyed +
891 ' session.socket.destroyed=' +
892 streamSession!.socket.destroyed);
893 let statsTracker: SubchannelCallStatsTracker;
894 if (this.channelzEnabled) {
895 this.callTracker.addCallStarted();
896 callStream.addStatusWatcher(status => {
897 if (status.code === Status.OK) {
898 this.callTracker.addCallSucceeded();
899 } else {
900 this.callTracker.addCallFailed();
901 }
902 });
903 this.streamTracker.addCallStarted();
904 callStream.addStreamEndWatcher(success => {
905 if (streamSession === this.session) {
906 if (success) {
907 this.streamTracker.addCallSucceeded();
908 } else {
909 this.streamTracker.addCallFailed();
910 }
911 }
912 });
913 statsTracker = {
914 addMessageSent: () => {
915 this.messagesSent += 1;
916 this.lastMessageSentTimestamp = new Date();
917 },
918 addMessageReceived: () => {
919 this.messagesReceived += 1;
920 }
921 }
922 } else {
923 statsTracker = {
924 addMessageSent: () => {},
925 addMessageReceived: () => {}
926 }
927 }
928 callStream.attachHttp2Stream(http2Stream, this, extraFilters, statsTracker);
929 }
930
931 /**
932 * If the subchannel is currently IDLE, start connecting and switch to the
933 * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE,
934 * the next time it would transition to IDLE, start connecting again instead.
935 * Otherwise, do nothing.
936 */
937 startConnecting() {
938 /* First, try to transition from IDLE to connecting. If that doesn't happen
939 * because the state is not currently IDLE, check if it is
940 * TRANSIENT_FAILURE, and if so indicate that it should go back to
941 * connecting after the backoff timer ends. Otherwise do nothing */
942 if (
943 !this.transitionToState(
944 [ConnectivityState.IDLE],
945 ConnectivityState.CONNECTING
946 )
947 ) {
948 if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
949 this.continueConnecting = true;
950 }
951 }
952 }
953
954 /**
955 * Get the subchannel's current connectivity state.
956 */
957 getConnectivityState() {
958 return this.connectivityState;
959 }
960
961 /**
962 * Add a listener function to be called whenever the subchannel's
963 * connectivity state changes.
964 * @param listener
965 */
966 addConnectivityStateListener(listener: ConnectivityStateListener) {
967 this.stateListeners.push(listener);
968 }
969
970 /**
971 * Remove a listener previously added with `addConnectivityStateListener`
972 * @param listener A reference to a function previously passed to
973 * `addConnectivityStateListener`
974 */
975 removeConnectivityStateListener(listener: ConnectivityStateListener) {
976 const listenerIndex = this.stateListeners.indexOf(listener);
977 if (listenerIndex > -1) {
978 this.stateListeners.splice(listenerIndex, 1);
979 }
980 }
981
982 addDisconnectListener(listener: () => void) {
983 this.disconnectListeners.add(listener);
984 }
985
986 removeDisconnectListener(listener: () => void) {
987 this.disconnectListeners.delete(listener);
988 }
989
990 /**
991 * Reset the backoff timeout, and immediately start connecting if in backoff.
992 */
993 resetBackoff() {
994 this.backoffTimeout.reset();
995 this.transitionToState(
996 [ConnectivityState.TRANSIENT_FAILURE],
997 ConnectivityState.CONNECTING
998 );
999 }
1000
1001 getAddress(): string {
1002 return this.subchannelAddressString;
1003 }
1004
1005 getChannelzRef(): SubchannelRef {
1006 return this.channelzRef;
1007 }
1008
1009 getRealSubchannel(): this {
1010 return this;
1011 }
1012}