/* * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ import * as http2 from 'http2'; import { ChannelCredentials } from './channel-credentials'; import { Metadata } from './metadata'; import { Call, Http2CallStream, WriteObject } from './call-stream'; import { ChannelOptions } from './channel-options'; import { PeerCertificate, checkServerIdentity, TLSSocket, CipherNameAndProtocol } from 'tls'; import { ConnectivityState } from './connectivity-state'; import { BackoffTimeout, BackoffOptions } from './backoff-timeout'; import { getDefaultAuthority } from './resolver'; import * as logging from './logging'; import { LogVerbosity, Status } from './constants'; import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; import * as net from 'net'; import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser'; import { ConnectionOptions } from 'tls'; import { FilterFactory, Filter, BaseFilter } from './filter'; import { stringToSubchannelAddress, SubchannelAddress, subchannelAddressToString, } from './subchannel-address'; import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz'; import { ConnectivityStateListener } from './subchannel-interface'; const clientVersion = require('../../package.json').version; const TRACER_NAME = 'subchannel'; const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl'; const MIN_CONNECT_TIMEOUT_MS = 20000; const INITIAL_BACKOFF_MS = 1000; const BACKOFF_MULTIPLIER = 1.6; const MAX_BACKOFF_MS = 120000; const BACKOFF_JITTER = 0.2; /* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't * have a constant for the max signed 32 bit integer, so this is a simple way * to calculate it */ const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); const KEEPALIVE_TIMEOUT_MS = 20000; export interface SubchannelCallStatsTracker { addMessageSent(): void; addMessageReceived(): void; } const { HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_CONTENT_TYPE, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, HTTP2_HEADER_TE, HTTP2_HEADER_USER_AGENT, } = http2.constants; /** * Get a number uniformly at random in the range [min, max) * @param min * @param max */ function uniformRandom(min: number, max: number) { return Math.random() * (max - min) + min; } const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii'); export class Subchannel { /** * The subchannel's current connectivity state. Invariant: `session` === `null` * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE. */ private connectivityState: ConnectivityState = ConnectivityState.IDLE; /** * The underlying http2 session used to make requests. */ private session: http2.ClientHttp2Session | null = null; /** * Indicates that the subchannel should transition from TRANSIENT_FAILURE to * CONNECTING instead of IDLE when the backoff timeout ends. */ private continueConnecting = false; /** * A list of listener functions that will be called whenever the connectivity * state changes. Will be modified by `addConnectivityStateListener` and * `removeConnectivityStateListener` */ private stateListeners: ConnectivityStateListener[] = []; /** * A list of listener functions that will be called when the underlying * socket disconnects. Used for ending active calls with an UNAVAILABLE * status. */ private disconnectListeners: Set<() => void> = new Set(); private backoffTimeout: BackoffTimeout; /** * The complete user agent string constructed using channel args. */ private userAgent: string; /** * The amount of time in between sending pings */ private keepaliveTimeMs: number = KEEPALIVE_MAX_TIME_MS; /** * The amount of time to wait for an acknowledgement after sending a ping */ private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; /** * Timer reference for timeout that indicates when to send the next ping */ private keepaliveIntervalId: NodeJS.Timer; /** * Timer reference tracking when the most recent ping will be considered lost */ private keepaliveTimeoutId: NodeJS.Timer; /** * Indicates whether keepalive pings should be sent without any active calls */ private keepaliveWithoutCalls = false; /** * Tracks calls with references to this subchannel */ private callRefcount = 0; /** * Tracks channels and subchannel pools with references to this subchannel */ private refcount = 0; /** * A string representation of the subchannel address, for logging/tracing */ private subchannelAddressString: string; // Channelz info private readonly channelzEnabled: boolean = true; private channelzRef: SubchannelRef; private channelzTrace: ChannelzTrace; private callTracker = new ChannelzCallTracker(); private childrenTracker = new ChannelzChildrenTracker(); // Channelz socket info private channelzSocketRef: SocketRef | null = null; /** * Name of the remote server, if it is not the same as the subchannel * address, i.e. if connecting through an HTTP CONNECT proxy. */ private remoteName: string | null = null; private streamTracker = new ChannelzCallTracker(); private keepalivesSent = 0; private messagesSent = 0; private messagesReceived = 0; private lastMessageSentTimestamp: Date | null = null; private lastMessageReceivedTimestamp: Date | null = null; /** * A class representing a connection to a single backend. * @param channelTarget The target string for the channel as a whole * @param subchannelAddress The address for the backend that this subchannel * will connect to * @param options The channel options, plus any specific subchannel options * for this subchannel * @param credentials The channel credentials used to establish this * connection */ constructor( private channelTarget: GrpcUri, private subchannelAddress: SubchannelAddress, private options: ChannelOptions, private credentials: ChannelCredentials ) { // Build user-agent string. this.userAgent = [ options['grpc.primary_user_agent'], `grpc-node-js/${clientVersion}`, options['grpc.secondary_user_agent'], ] .filter((e) => e) .join(' '); // remove falsey values first if ('grpc.keepalive_time_ms' in options) { this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; } if ('grpc.keepalive_timeout_ms' in options) { this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!; } if ('grpc.keepalive_permit_without_calls' in options) { this.keepaliveWithoutCalls = options['grpc.keepalive_permit_without_calls'] === 1; } else { this.keepaliveWithoutCalls = false; } this.keepaliveIntervalId = setTimeout(() => {}, 0); clearTimeout(this.keepaliveIntervalId); this.keepaliveTimeoutId = setTimeout(() => {}, 0); clearTimeout(this.keepaliveTimeoutId); const backoffOptions: BackoffOptions = { initialDelay: options['grpc.initial_reconnect_backoff_ms'], maxDelay: options['grpc.max_reconnect_backoff_ms'], }; this.backoffTimeout = new BackoffTimeout(() => { this.handleBackoffTimer(); }, backoffOptions); this.subchannelAddressString = subchannelAddressToString(subchannelAddress); if (options['grpc.enable_channelz'] === 0) { this.channelzEnabled = false; } this.channelzTrace = new ChannelzTrace(); this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled); if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Subchannel created'); } this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2)); } private getChannelzInfo(): SubchannelInfo { return { state: this.connectivityState, trace: this.channelzTrace, callTracker: this.callTracker, children: this.childrenTracker.getChildLists(), target: this.subchannelAddressString }; } private getChannelzSocketInfo(): SocketInfo | null { if (this.session === null) { return null; } const sessionSocket = this.session.socket; const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null; const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null; let tlsInfo: TlsInfo | null; if (this.session.encrypted) { const tlsSocket: TLSSocket = sessionSocket as TLSSocket; const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher(); const certificate = tlsSocket.getCertificate(); const peerCertificate = tlsSocket.getPeerCertificate(); tlsInfo = { cipherSuiteStandardName: cipherInfo.standardName ?? null, cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null, remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null }; } else { tlsInfo = null; } const socketInfo: SocketInfo = { remoteAddress: remoteAddress, localAddress: localAddress, security: tlsInfo, remoteName: this.remoteName, streamsStarted: this.streamTracker.callsStarted, streamsSucceeded: this.streamTracker.callsSucceeded, streamsFailed: this.streamTracker.callsFailed, messagesSent: this.messagesSent, messagesReceived: this.messagesReceived, keepAlivesSent: this.keepalivesSent, lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp, lastRemoteStreamCreatedTimestamp: null, lastMessageSentTimestamp: this.lastMessageSentTimestamp, lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp, localFlowControlWindow: this.session.state.localWindowSize ?? null, remoteFlowControlWindow: this.session.state.remoteWindowSize ?? null }; return socketInfo; } private resetChannelzSocketInfo() { if (!this.channelzEnabled) { return; } if (this.channelzSocketRef) { unregisterChannelzRef(this.channelzSocketRef); this.childrenTracker.unrefChild(this.channelzSocketRef); this.channelzSocketRef = null; } this.remoteName = null; this.streamTracker = new ChannelzCallTracker(); this.keepalivesSent = 0; this.messagesSent = 0; this.messagesReceived = 0; this.lastMessageSentTimestamp = null; this.lastMessageReceivedTimestamp = null; } private trace(text: string): void { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); } private refTrace(text: string): void { logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); } private flowControlTrace(text: string): void { logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); } private internalsTrace(text: string): void { logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); } private keepaliveTrace(text: string): void { logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); } private handleBackoffTimer() { if (this.continueConnecting) { this.transitionToState( [ConnectivityState.TRANSIENT_FAILURE], ConnectivityState.CONNECTING ); } else { this.transitionToState( [ConnectivityState.TRANSIENT_FAILURE], ConnectivityState.IDLE ); } } /** * Start a backoff timer with the current nextBackoff timeout */ private startBackoff() { this.backoffTimeout.runOnce(); } private stopBackoff() { this.backoffTimeout.stop(); this.backoffTimeout.reset(); } private sendPing() { if (this.channelzEnabled) { this.keepalivesSent += 1; } this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'); this.keepaliveTimeoutId = setTimeout(() => { this.keepaliveTrace('Ping timeout passed without response'); this.handleDisconnect(); }, this.keepaliveTimeoutMs); this.keepaliveTimeoutId.unref?.(); try { this.session!.ping( (err: Error | null, duration: number, payload: Buffer) => { this.keepaliveTrace('Received ping response'); clearTimeout(this.keepaliveTimeoutId); } ); } catch (e) { /* If we fail to send a ping, the connection is no longer functional, so * we should discard it. */ this.transitionToState( [ConnectivityState.READY], ConnectivityState.TRANSIENT_FAILURE ); } } private startKeepalivePings() { this.keepaliveIntervalId = setInterval(() => { this.sendPing(); }, this.keepaliveTimeMs); this.keepaliveIntervalId.unref?.(); /* Don't send a ping immediately because whatever caused us to start * sending pings should also involve some network activity. */ } /** * Stop keepalive pings when terminating a connection. This discards the * outstanding ping timeout, so it should not be called if the same * connection will still be used. */ private stopKeepalivePings() { clearInterval(this.keepaliveIntervalId); clearTimeout(this.keepaliveTimeoutId); } private createSession(proxyConnectionResult: ProxyConnectionResult) { if (proxyConnectionResult.realTarget) { this.remoteName = uriToString(proxyConnectionResult.realTarget); this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget); } else { this.remoteName = null; this.trace('creating HTTP/2 session'); } const targetAuthority = getDefaultAuthority( proxyConnectionResult.realTarget ?? this.channelTarget ); let connectionOptions: http2.SecureClientSessionOptions = this.credentials._getConnectionOptions() || {}; connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER; if ('grpc-node.max_session_memory' in this.options) { connectionOptions.maxSessionMemory = this.options[ 'grpc-node.max_session_memory' ]; } else { /* By default, set a very large max session memory limit, to effectively * disable enforcement of the limit. Some testing indicates that Node's * behavior degrades badly when this limit is reached, so we solve that * by disabling the check entirely. */ connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER; } let addressScheme = 'http://'; if ('secureContext' in connectionOptions) { addressScheme = 'https://'; // If provided, the value of grpc.ssl_target_name_override should be used // to override the target hostname when checking server identity. // This option is used for testing only. if (this.options['grpc.ssl_target_name_override']) { const sslTargetNameOverride = this.options[ 'grpc.ssl_target_name_override' ]!; connectionOptions.checkServerIdentity = ( host: string, cert: PeerCertificate ): Error | undefined => { return checkServerIdentity(sslTargetNameOverride, cert); }; connectionOptions.servername = sslTargetNameOverride; } else { const authorityHostname = splitHostPort(targetAuthority)?.host ?? 'localhost'; // We want to always set servername to support SNI connectionOptions.servername = authorityHostname; } if (proxyConnectionResult.socket) { /* This is part of the workaround for * https://github.com/nodejs/node/issues/32922. Without that bug, * proxyConnectionResult.socket would always be a plaintext socket and * this would say * connectionOptions.socket = proxyConnectionResult.socket; */ connectionOptions.createConnection = (authority, option) => { return proxyConnectionResult.socket!; }; } } else { /* In all but the most recent versions of Node, http2.connect does not use * the options when establishing plaintext connections, so we need to * establish that connection explicitly. */ connectionOptions.createConnection = (authority, option) => { if (proxyConnectionResult.socket) { return proxyConnectionResult.socket; } else { /* net.NetConnectOpts is declared in a way that is more restrictive * than what net.connect will actually accept, so we use the type * assertion to work around that. */ return net.connect(this.subchannelAddress); } }; } connectionOptions = { ...connectionOptions, ...this.subchannelAddress, }; /* http2.connect uses the options here: * https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036 * The spread operator overides earlier values with later ones, so any port * or host values in the options will be used rather than any values extracted * from the first argument. In addition, the path overrides the host and port, * as documented for plaintext connections here: * https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener * and for TLS connections here: * https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In * earlier versions of Node, http2.connect passes these options to * tls.connect but not net.connect, so in the insecure case we still need * to set the createConnection option above to create the connection * explicitly. We cannot do that in the TLS case because http2.connect * passes necessary additional options to tls.connect. * The first argument just needs to be parseable as a URL and the scheme * determines whether the connection will be established over TLS or not. */ const session = http2.connect( addressScheme + targetAuthority, connectionOptions ); this.session = session; this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!, this.channelzEnabled); if (this.channelzEnabled) { this.childrenTracker.refChild(this.channelzSocketRef); } session.unref(); /* For all of these events, check if the session at the time of the event * is the same one currently attached to this subchannel, to ensure that * old events from previous connection attempts cannot cause invalid state * transitions. */ session.once('connect', () => { if (this.session === session) { this.transitionToState( [ConnectivityState.CONNECTING], ConnectivityState.READY ); } }); session.once('close', () => { if (this.session === session) { this.trace('connection closed'); this.transitionToState( [ConnectivityState.CONNECTING], ConnectivityState.TRANSIENT_FAILURE ); /* Transitioning directly to IDLE here should be OK because we are not * doing any backoff, because a connection was established at some * point */ this.transitionToState( [ConnectivityState.READY], ConnectivityState.IDLE ); } }); session.once( 'goaway', (errorCode: number, lastStreamID: number, opaqueData: Buffer) => { if (this.session === session) { /* See the last paragraph of * https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */ if ( errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM && opaqueData.equals(tooManyPingsData) ) { this.keepaliveTimeMs = Math.min( 2 * this.keepaliveTimeMs, KEEPALIVE_MAX_TIME_MS ); logging.log( LogVerbosity.ERROR, `Connection to ${uriToString(this.channelTarget)} at ${ this.subchannelAddressString } rejected by server because of excess pings. Increasing ping interval to ${ this.keepaliveTimeMs } ms` ); } this.trace( 'connection closed by GOAWAY with code ' + errorCode ); this.transitionToState( [ConnectivityState.CONNECTING, ConnectivityState.READY], ConnectivityState.IDLE ); } } ); session.once('error', (error) => { /* Do nothing here. Any error should also trigger a close event, which is * where we want to handle that. */ this.trace( 'connection closed with error ' + (error as Error).message ); }); if (logging.isTracerEnabled(TRACER_NAME)) { session.on('remoteSettings', (settings: http2.Settings) => { this.trace( 'new settings received' + (this.session !== session ? ' on the old connection' : '') + ': ' + JSON.stringify(settings) ); }); session.on('localSettings', (settings: http2.Settings) => { this.trace( 'local settings acknowledged by remote' + (this.session !== session ? ' on the old connection' : '') + ': ' + JSON.stringify(settings) ); }); } } private startConnectingInternal() { /* Pass connection options through to the proxy so that it's able to * upgrade it's connection to support tls if needed. * This is a workaround for https://github.com/nodejs/node/issues/32922 * See https://github.com/grpc/grpc-node/pull/1369 for more info. */ const connectionOptions: ConnectionOptions = this.credentials._getConnectionOptions() || {}; if ('secureContext' in connectionOptions) { connectionOptions.ALPNProtocols = ['h2']; // If provided, the value of grpc.ssl_target_name_override should be used // to override the target hostname when checking server identity. // This option is used for testing only. if (this.options['grpc.ssl_target_name_override']) { const sslTargetNameOverride = this.options[ 'grpc.ssl_target_name_override' ]!; connectionOptions.checkServerIdentity = ( host: string, cert: PeerCertificate ): Error | undefined => { return checkServerIdentity(sslTargetNameOverride, cert); }; connectionOptions.servername = sslTargetNameOverride; } else { if ('grpc.http_connect_target' in this.options) { /* This is more or less how servername will be set in createSession * if a connection is successfully established through the proxy. * If the proxy is not used, these connectionOptions are discarded * anyway */ const targetPath = getDefaultAuthority( parseUri(this.options['grpc.http_connect_target'] as string) ?? { path: 'localhost', } ); const hostPort = splitHostPort(targetPath); connectionOptions.servername = hostPort?.host ?? targetPath; } } } getProxiedConnection( this.subchannelAddress, this.options, connectionOptions ).then( (result) => { this.createSession(result); }, (reason) => { this.transitionToState( [ConnectivityState.CONNECTING], ConnectivityState.TRANSIENT_FAILURE ); } ); } private handleDisconnect() { this.transitionToState( [ConnectivityState.READY], ConnectivityState.TRANSIENT_FAILURE); for (const listener of this.disconnectListeners.values()) { listener(); } } /** * Initiate a state transition from any element of oldStates to the new * state. If the current connectivityState is not in oldStates, do nothing. * @param oldStates The set of states to transition from * @param newState The state to transition to * @returns True if the state changed, false otherwise */ private transitionToState( oldStates: ConnectivityState[], newState: ConnectivityState ): boolean { if (oldStates.indexOf(this.connectivityState) === -1) { return false; } this.trace( ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState] ); if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]); } const previousState = this.connectivityState; this.connectivityState = newState; switch (newState) { case ConnectivityState.READY: this.stopBackoff(); const session = this.session!; session.socket.once('close', () => { if (this.session === session) { this.handleDisconnect(); } }); if (this.keepaliveWithoutCalls) { this.startKeepalivePings(); } break; case ConnectivityState.CONNECTING: this.startBackoff(); this.startConnectingInternal(); this.continueConnecting = false; break; case ConnectivityState.TRANSIENT_FAILURE: if (this.session) { this.session.close(); } this.session = null; this.resetChannelzSocketInfo(); this.stopKeepalivePings(); /* If the backoff timer has already ended by the time we get to the * TRANSIENT_FAILURE state, we want to immediately transition out of * TRANSIENT_FAILURE as though the backoff timer is ending right now */ if (!this.backoffTimeout.isRunning()) { process.nextTick(() => { this.handleBackoffTimer(); }); } break; case ConnectivityState.IDLE: if (this.session) { this.session.close(); } this.session = null; this.resetChannelzSocketInfo(); this.stopKeepalivePings(); break; default: throw new Error(`Invalid state: unknown ConnectivityState ${newState}`); } /* We use a shallow copy of the stateListeners array in case a listener * is removed during this iteration */ for (const listener of [...this.stateListeners]) { listener(this, previousState, newState); } return true; } /** * Check if the subchannel associated with zero calls and with zero channels. * If so, shut it down. */ private checkBothRefcounts() { /* If no calls, channels, or subchannel pools have any more references to * this subchannel, we can be sure it will never be used again. */ if (this.callRefcount === 0 && this.refcount === 0) { if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); } this.transitionToState( [ConnectivityState.CONNECTING, ConnectivityState.READY], ConnectivityState.IDLE ); if (this.channelzEnabled) { unregisterChannelzRef(this.channelzRef); } } } callRef() { this.refTrace( 'callRefcount ' + this.callRefcount + ' -> ' + (this.callRefcount + 1) ); if (this.callRefcount === 0) { if (this.session) { this.session.ref(); } this.backoffTimeout.ref(); if (!this.keepaliveWithoutCalls) { this.startKeepalivePings(); } } this.callRefcount += 1; } callUnref() { this.refTrace( 'callRefcount ' + this.callRefcount + ' -> ' + (this.callRefcount - 1) ); this.callRefcount -= 1; if (this.callRefcount === 0) { if (this.session) { this.session.unref(); } this.backoffTimeout.unref(); if (!this.keepaliveWithoutCalls) { clearInterval(this.keepaliveIntervalId); } this.checkBothRefcounts(); } } ref() { this.refTrace( 'refcount ' + this.refcount + ' -> ' + (this.refcount + 1) ); this.refcount += 1; } unref() { this.refTrace( 'refcount ' + this.refcount + ' -> ' + (this.refcount - 1) ); this.refcount -= 1; this.checkBothRefcounts(); } unrefIfOneRef(): boolean { if (this.refcount === 1) { this.unref(); return true; } return false; } /** * Start a stream on the current session with the given `metadata` as headers * and then attach it to the `callStream`. Must only be called if the * subchannel's current connectivity state is READY. * @param metadata * @param callStream */ startCallStream( metadata: Metadata, callStream: Http2CallStream, extraFilters: Filter[] ) { const headers = metadata.toHttp2Headers(); headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost(); headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; headers[HTTP2_HEADER_METHOD] = 'POST'; headers[HTTP2_HEADER_PATH] = callStream.getMethod(); headers[HTTP2_HEADER_TE] = 'trailers'; let http2Stream: http2.ClientHttp2Stream; /* In theory, if an error is thrown by session.request because session has * become unusable (e.g. because it has received a goaway), this subchannel * should soon see the corresponding close or goaway event anyway and leave * READY. But we have seen reports that this does not happen * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096) * so for defense in depth, we just discard the session when we see an * error here. */ try { http2Stream = this.session!.request(headers); } catch (e) { this.transitionToState( [ConnectivityState.READY], ConnectivityState.TRANSIENT_FAILURE ); throw e; } let headersString = ''; for (const header of Object.keys(headers)) { headersString += '\t\t' + header + ': ' + headers[header] + '\n'; } logging.trace( LogVerbosity.DEBUG, 'call_stream', 'Starting stream [' + callStream.getCallNumber() + '] on subchannel ' + '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' with headers\n' + headersString ); this.flowControlTrace( 'local window size: ' + this.session!.state.localWindowSize + ' remote window size: ' + this.session!.state.remoteWindowSize ); const streamSession = this.session; this.internalsTrace( 'session.closed=' + streamSession!.closed + ' session.destroyed=' + streamSession!.destroyed + ' session.socket.destroyed=' + streamSession!.socket.destroyed); let statsTracker: SubchannelCallStatsTracker; if (this.channelzEnabled) { this.callTracker.addCallStarted(); callStream.addStatusWatcher(status => { if (status.code === Status.OK) { this.callTracker.addCallSucceeded(); } else { this.callTracker.addCallFailed(); } }); this.streamTracker.addCallStarted(); callStream.addStreamEndWatcher(success => { if (streamSession === this.session) { if (success) { this.streamTracker.addCallSucceeded(); } else { this.streamTracker.addCallFailed(); } } }); statsTracker = { addMessageSent: () => { this.messagesSent += 1; this.lastMessageSentTimestamp = new Date(); }, addMessageReceived: () => { this.messagesReceived += 1; } } } else { statsTracker = { addMessageSent: () => {}, addMessageReceived: () => {} } } callStream.attachHttp2Stream(http2Stream, this, extraFilters, statsTracker); } /** * If the subchannel is currently IDLE, start connecting and switch to the * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE, * the next time it would transition to IDLE, start connecting again instead. * Otherwise, do nothing. */ startConnecting() { /* First, try to transition from IDLE to connecting. If that doesn't happen * because the state is not currently IDLE, check if it is * TRANSIENT_FAILURE, and if so indicate that it should go back to * connecting after the backoff timer ends. Otherwise do nothing */ if ( !this.transitionToState( [ConnectivityState.IDLE], ConnectivityState.CONNECTING ) ) { if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) { this.continueConnecting = true; } } } /** * Get the subchannel's current connectivity state. */ getConnectivityState() { return this.connectivityState; } /** * Add a listener function to be called whenever the subchannel's * connectivity state changes. * @param listener */ addConnectivityStateListener(listener: ConnectivityStateListener) { this.stateListeners.push(listener); } /** * Remove a listener previously added with `addConnectivityStateListener` * @param listener A reference to a function previously passed to * `addConnectivityStateListener` */ removeConnectivityStateListener(listener: ConnectivityStateListener) { const listenerIndex = this.stateListeners.indexOf(listener); if (listenerIndex > -1) { this.stateListeners.splice(listenerIndex, 1); } } addDisconnectListener(listener: () => void) { this.disconnectListeners.add(listener); } removeDisconnectListener(listener: () => void) { this.disconnectListeners.delete(listener); } /** * Reset the backoff timeout, and immediately start connecting if in backoff. */ resetBackoff() { this.backoffTimeout.reset(); this.transitionToState( [ConnectivityState.TRANSIENT_FAILURE], ConnectivityState.CONNECTING ); } getAddress(): string { return this.subchannelAddressString; } getChannelzRef(): SubchannelRef { return this.channelzRef; } getRealSubchannel(): this { return this; } }