UNPKG

37.3 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Subchannel = void 0;
20const http2 = require("http2");
21const tls_1 = require("tls");
22const connectivity_state_1 = require("./connectivity-state");
23const backoff_timeout_1 = require("./backoff-timeout");
24const resolver_1 = require("./resolver");
25const logging = require("./logging");
26const constants_1 = require("./constants");
27const http_proxy_1 = require("./http_proxy");
28const net = require("net");
29const uri_parser_1 = require("./uri-parser");
30const subchannel_address_1 = require("./subchannel-address");
31const channelz_1 = require("./channelz");
32const clientVersion = require('../../package.json').version;
33const TRACER_NAME = 'subchannel';
34const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl';
35const MIN_CONNECT_TIMEOUT_MS = 20000;
36const INITIAL_BACKOFF_MS = 1000;
37const BACKOFF_MULTIPLIER = 1.6;
38const MAX_BACKOFF_MS = 120000;
39const BACKOFF_JITTER = 0.2;
40/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
41 * have a constant for the max signed 32 bit integer, so this is a simple way
42 * to calculate it */
43const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
44const KEEPALIVE_TIMEOUT_MS = 20000;
45const { HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_CONTENT_TYPE, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, HTTP2_HEADER_TE, HTTP2_HEADER_USER_AGENT, } = http2.constants;
46/**
47 * Get a number uniformly at random in the range [min, max)
48 * @param min
49 * @param max
50 */
51function uniformRandom(min, max) {
52 return Math.random() * (max - min) + min;
53}
54const tooManyPingsData = Buffer.from('too_many_pings', 'ascii');
55class Subchannel {
56 /**
57 * A class representing a connection to a single backend.
58 * @param channelTarget The target string for the channel as a whole
59 * @param subchannelAddress The address for the backend that this subchannel
60 * will connect to
61 * @param options The channel options, plus any specific subchannel options
62 * for this subchannel
63 * @param credentials The channel credentials used to establish this
64 * connection
65 */
66 constructor(channelTarget, subchannelAddress, options, credentials) {
67 this.channelTarget = channelTarget;
68 this.subchannelAddress = subchannelAddress;
69 this.options = options;
70 this.credentials = credentials;
71 /**
72 * The subchannel's current connectivity state. Invariant: `session` === `null`
73 * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE.
74 */
75 this.connectivityState = connectivity_state_1.ConnectivityState.IDLE;
76 /**
77 * The underlying http2 session used to make requests.
78 */
79 this.session = null;
80 /**
81 * Indicates that the subchannel should transition from TRANSIENT_FAILURE to
82 * CONNECTING instead of IDLE when the backoff timeout ends.
83 */
84 this.continueConnecting = false;
85 /**
86 * A list of listener functions that will be called whenever the connectivity
87 * state changes. Will be modified by `addConnectivityStateListener` and
88 * `removeConnectivityStateListener`
89 */
90 this.stateListeners = [];
91 /**
92 * A list of listener functions that will be called when the underlying
93 * socket disconnects. Used for ending active calls with an UNAVAILABLE
94 * status.
95 */
96 this.disconnectListeners = [];
97 /**
98 * The amount of time in between sending pings
99 */
100 this.keepaliveTimeMs = KEEPALIVE_MAX_TIME_MS;
101 /**
102 * The amount of time to wait for an acknowledgement after sending a ping
103 */
104 this.keepaliveTimeoutMs = KEEPALIVE_TIMEOUT_MS;
105 /**
106 * Indicates whether keepalive pings should be sent without any active calls
107 */
108 this.keepaliveWithoutCalls = false;
109 /**
110 * Tracks calls with references to this subchannel
111 */
112 this.callRefcount = 0;
113 /**
114 * Tracks channels and subchannel pools with references to this subchannel
115 */
116 this.refcount = 0;
117 // Channelz info
118 this.channelzEnabled = true;
119 this.callTracker = new channelz_1.ChannelzCallTracker();
120 this.childrenTracker = new channelz_1.ChannelzChildrenTracker();
121 // Channelz socket info
122 this.channelzSocketRef = null;
123 /**
124 * Name of the remote server, if it is not the same as the subchannel
125 * address, i.e. if connecting through an HTTP CONNECT proxy.
126 */
127 this.remoteName = null;
128 this.streamTracker = new channelz_1.ChannelzCallTracker();
129 this.keepalivesSent = 0;
130 this.messagesSent = 0;
131 this.messagesReceived = 0;
132 this.lastMessageSentTimestamp = null;
133 this.lastMessageReceivedTimestamp = null;
134 // Build user-agent string.
135 this.userAgent = [
136 options['grpc.primary_user_agent'],
137 `grpc-node-js/${clientVersion}`,
138 options['grpc.secondary_user_agent'],
139 ]
140 .filter((e) => e)
141 .join(' '); // remove falsey values first
142 if ('grpc.keepalive_time_ms' in options) {
143 this.keepaliveTimeMs = options['grpc.keepalive_time_ms'];
144 }
145 if ('grpc.keepalive_timeout_ms' in options) {
146 this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms'];
147 }
148 if ('grpc.keepalive_permit_without_calls' in options) {
149 this.keepaliveWithoutCalls =
150 options['grpc.keepalive_permit_without_calls'] === 1;
151 }
152 else {
153 this.keepaliveWithoutCalls = false;
154 }
155 this.keepaliveIntervalId = setTimeout(() => { }, 0);
156 clearTimeout(this.keepaliveIntervalId);
157 this.keepaliveTimeoutId = setTimeout(() => { }, 0);
158 clearTimeout(this.keepaliveTimeoutId);
159 const backoffOptions = {
160 initialDelay: options['grpc.initial_reconnect_backoff_ms'],
161 maxDelay: options['grpc.max_reconnect_backoff_ms'],
162 };
163 this.backoffTimeout = new backoff_timeout_1.BackoffTimeout(() => {
164 this.handleBackoffTimer();
165 }, backoffOptions);
166 this.subchannelAddressString = subchannel_address_1.subchannelAddressToString(subchannelAddress);
167 if (options['grpc.enable_channelz'] === 0) {
168 this.channelzEnabled = false;
169 }
170 this.channelzTrace = new channelz_1.ChannelzTrace();
171 this.channelzRef = channelz_1.registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled);
172 if (this.channelzEnabled) {
173 this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
174 }
175 this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
176 }
177 getChannelzInfo() {
178 return {
179 state: this.connectivityState,
180 trace: this.channelzTrace,
181 callTracker: this.callTracker,
182 children: this.childrenTracker.getChildLists(),
183 target: this.subchannelAddressString
184 };
185 }
186 getChannelzSocketInfo() {
187 var _a, _b, _c;
188 if (this.session === null) {
189 return null;
190 }
191 const sessionSocket = this.session.socket;
192 const remoteAddress = sessionSocket.remoteAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
193 const localAddress = sessionSocket.localAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null;
194 let tlsInfo;
195 if (this.session.encrypted) {
196 const tlsSocket = sessionSocket;
197 const cipherInfo = tlsSocket.getCipher();
198 const certificate = tlsSocket.getCertificate();
199 const peerCertificate = tlsSocket.getPeerCertificate();
200 tlsInfo = {
201 cipherSuiteStandardName: (_a = cipherInfo.standardName) !== null && _a !== void 0 ? _a : null,
202 cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
203 localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
204 remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
205 };
206 }
207 else {
208 tlsInfo = null;
209 }
210 const socketInfo = {
211 remoteAddress: remoteAddress,
212 localAddress: localAddress,
213 security: tlsInfo,
214 remoteName: this.remoteName,
215 streamsStarted: this.streamTracker.callsStarted,
216 streamsSucceeded: this.streamTracker.callsSucceeded,
217 streamsFailed: this.streamTracker.callsFailed,
218 messagesSent: this.messagesSent,
219 messagesReceived: this.messagesReceived,
220 keepAlivesSent: this.keepalivesSent,
221 lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp,
222 lastRemoteStreamCreatedTimestamp: null,
223 lastMessageSentTimestamp: this.lastMessageSentTimestamp,
224 lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp,
225 localFlowControlWindow: (_b = this.session.state.localWindowSize) !== null && _b !== void 0 ? _b : null,
226 remoteFlowControlWindow: (_c = this.session.state.remoteWindowSize) !== null && _c !== void 0 ? _c : null
227 };
228 return socketInfo;
229 }
230 resetChannelzSocketInfo() {
231 if (!this.channelzEnabled) {
232 return;
233 }
234 if (this.channelzSocketRef) {
235 channelz_1.unregisterChannelzRef(this.channelzSocketRef);
236 this.childrenTracker.unrefChild(this.channelzSocketRef);
237 this.channelzSocketRef = null;
238 }
239 this.remoteName = null;
240 this.streamTracker = new channelz_1.ChannelzCallTracker();
241 this.keepalivesSent = 0;
242 this.messagesSent = 0;
243 this.messagesReceived = 0;
244 this.lastMessageSentTimestamp = null;
245 this.lastMessageReceivedTimestamp = null;
246 }
247 trace(text) {
248 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
249 }
250 refTrace(text) {
251 logging.trace(constants_1.LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
252 }
253 flowControlTrace(text) {
254 logging.trace(constants_1.LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
255 }
256 internalsTrace(text) {
257 logging.trace(constants_1.LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
258 }
259 keepaliveTrace(text) {
260 logging.trace(constants_1.LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
261 }
262 handleBackoffTimer() {
263 if (this.continueConnecting) {
264 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.CONNECTING);
265 }
266 else {
267 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.IDLE);
268 }
269 }
270 /**
271 * Start a backoff timer with the current nextBackoff timeout
272 */
273 startBackoff() {
274 this.backoffTimeout.runOnce();
275 }
276 stopBackoff() {
277 this.backoffTimeout.stop();
278 this.backoffTimeout.reset();
279 }
280 sendPing() {
281 var _a, _b;
282 if (this.channelzEnabled) {
283 this.keepalivesSent += 1;
284 }
285 this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms');
286 this.keepaliveTimeoutId = setTimeout(() => {
287 this.keepaliveTrace('Ping timeout passed without response');
288 this.handleDisconnect();
289 }, this.keepaliveTimeoutMs);
290 (_b = (_a = this.keepaliveTimeoutId).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
291 this.session.ping((err, duration, payload) => {
292 this.keepaliveTrace('Received ping response');
293 clearTimeout(this.keepaliveTimeoutId);
294 });
295 }
296 startKeepalivePings() {
297 var _a, _b;
298 this.keepaliveIntervalId = setInterval(() => {
299 this.sendPing();
300 }, this.keepaliveTimeMs);
301 (_b = (_a = this.keepaliveIntervalId).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
302 /* Don't send a ping immediately because whatever caused us to start
303 * sending pings should also involve some network activity. */
304 }
305 /**
306 * Stop keepalive pings when terminating a connection. This discards the
307 * outstanding ping timeout, so it should not be called if the same
308 * connection will still be used.
309 */
310 stopKeepalivePings() {
311 clearInterval(this.keepaliveIntervalId);
312 clearTimeout(this.keepaliveTimeoutId);
313 }
314 createSession(proxyConnectionResult) {
315 var _a, _b, _c;
316 if (proxyConnectionResult.realTarget) {
317 this.remoteName = uri_parser_1.uriToString(proxyConnectionResult.realTarget);
318 this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget);
319 }
320 else {
321 this.remoteName = null;
322 this.trace('creating HTTP/2 session');
323 }
324 const targetAuthority = resolver_1.getDefaultAuthority((_a = proxyConnectionResult.realTarget) !== null && _a !== void 0 ? _a : this.channelTarget);
325 let connectionOptions = this.credentials._getConnectionOptions() || {};
326 connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER;
327 if ('grpc-node.max_session_memory' in this.options) {
328 connectionOptions.maxSessionMemory = this.options['grpc-node.max_session_memory'];
329 }
330 else {
331 /* By default, set a very large max session memory limit, to effectively
332 * disable enforcement of the limit. Some testing indicates that Node's
333 * behavior degrades badly when this limit is reached, so we solve that
334 * by disabling the check entirely. */
335 connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
336 }
337 let addressScheme = 'http://';
338 if ('secureContext' in connectionOptions) {
339 addressScheme = 'https://';
340 // If provided, the value of grpc.ssl_target_name_override should be used
341 // to override the target hostname when checking server identity.
342 // This option is used for testing only.
343 if (this.options['grpc.ssl_target_name_override']) {
344 const sslTargetNameOverride = this.options['grpc.ssl_target_name_override'];
345 connectionOptions.checkServerIdentity = (host, cert) => {
346 return tls_1.checkServerIdentity(sslTargetNameOverride, cert);
347 };
348 connectionOptions.servername = sslTargetNameOverride;
349 }
350 else {
351 const authorityHostname = (_c = (_b = uri_parser_1.splitHostPort(targetAuthority)) === null || _b === void 0 ? void 0 : _b.host) !== null && _c !== void 0 ? _c : 'localhost';
352 // We want to always set servername to support SNI
353 connectionOptions.servername = authorityHostname;
354 }
355 if (proxyConnectionResult.socket) {
356 /* This is part of the workaround for
357 * https://github.com/nodejs/node/issues/32922. Without that bug,
358 * proxyConnectionResult.socket would always be a plaintext socket and
359 * this would say
360 * connectionOptions.socket = proxyConnectionResult.socket; */
361 connectionOptions.createConnection = (authority, option) => {
362 return proxyConnectionResult.socket;
363 };
364 }
365 }
366 else {
367 /* In all but the most recent versions of Node, http2.connect does not use
368 * the options when establishing plaintext connections, so we need to
369 * establish that connection explicitly. */
370 connectionOptions.createConnection = (authority, option) => {
371 if (proxyConnectionResult.socket) {
372 return proxyConnectionResult.socket;
373 }
374 else {
375 /* net.NetConnectOpts is declared in a way that is more restrictive
376 * than what net.connect will actually accept, so we use the type
377 * assertion to work around that. */
378 return net.connect(this.subchannelAddress);
379 }
380 };
381 }
382 connectionOptions = Object.assign(Object.assign({}, connectionOptions), this.subchannelAddress);
383 /* http2.connect uses the options here:
384 * https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036
385 * The spread operator overides earlier values with later ones, so any port
386 * or host values in the options will be used rather than any values extracted
387 * from the first argument. In addition, the path overrides the host and port,
388 * as documented for plaintext connections here:
389 * https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener
390 * and for TLS connections here:
391 * https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In
392 * earlier versions of Node, http2.connect passes these options to
393 * tls.connect but not net.connect, so in the insecure case we still need
394 * to set the createConnection option above to create the connection
395 * explicitly. We cannot do that in the TLS case because http2.connect
396 * passes necessary additional options to tls.connect.
397 * The first argument just needs to be parseable as a URL and the scheme
398 * determines whether the connection will be established over TLS or not.
399 */
400 const session = http2.connect(addressScheme + targetAuthority, connectionOptions);
401 this.session = session;
402 this.channelzSocketRef = channelz_1.registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo(), this.channelzEnabled);
403 if (this.channelzEnabled) {
404 this.childrenTracker.refChild(this.channelzSocketRef);
405 }
406 session.unref();
407 /* For all of these events, check if the session at the time of the event
408 * is the same one currently attached to this subchannel, to ensure that
409 * old events from previous connection attempts cannot cause invalid state
410 * transitions. */
411 session.once('connect', () => {
412 if (this.session === session) {
413 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.READY);
414 }
415 });
416 session.once('close', () => {
417 if (this.session === session) {
418 this.trace('connection closed');
419 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
420 /* Transitioning directly to IDLE here should be OK because we are not
421 * doing any backoff, because a connection was established at some
422 * point */
423 this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE);
424 }
425 });
426 session.once('goaway', (errorCode, lastStreamID, opaqueData) => {
427 if (this.session === session) {
428 /* See the last paragraph of
429 * https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */
430 if (errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
431 opaqueData.equals(tooManyPingsData)) {
432 this.keepaliveTimeMs = Math.min(2 * this.keepaliveTimeMs, KEEPALIVE_MAX_TIME_MS);
433 logging.log(constants_1.LogVerbosity.ERROR, `Connection to ${uri_parser_1.uriToString(this.channelTarget)} at ${this.subchannelAddressString} rejected by server because of excess pings. Increasing ping interval to ${this.keepaliveTimeMs} ms`);
434 }
435 this.trace('connection closed by GOAWAY with code ' +
436 errorCode);
437 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING, connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE);
438 }
439 });
440 session.once('error', (error) => {
441 /* Do nothing here. Any error should also trigger a close event, which is
442 * where we want to handle that. */
443 this.trace('connection closed with error ' +
444 error.message);
445 });
446 if (logging.isTracerEnabled(TRACER_NAME)) {
447 session.on('remoteSettings', (settings) => {
448 this.trace('new settings received' +
449 (this.session !== session ? ' on the old connection' : '') +
450 ': ' +
451 JSON.stringify(settings));
452 });
453 session.on('localSettings', (settings) => {
454 this.trace('local settings acknowledged by remote' +
455 (this.session !== session ? ' on the old connection' : '') +
456 ': ' +
457 JSON.stringify(settings));
458 });
459 }
460 }
461 startConnectingInternal() {
462 var _a, _b;
463 /* Pass connection options through to the proxy so that it's able to
464 * upgrade it's connection to support tls if needed.
465 * This is a workaround for https://github.com/nodejs/node/issues/32922
466 * See https://github.com/grpc/grpc-node/pull/1369 for more info. */
467 const connectionOptions = this.credentials._getConnectionOptions() || {};
468 if ('secureContext' in connectionOptions) {
469 connectionOptions.ALPNProtocols = ['h2'];
470 // If provided, the value of grpc.ssl_target_name_override should be used
471 // to override the target hostname when checking server identity.
472 // This option is used for testing only.
473 if (this.options['grpc.ssl_target_name_override']) {
474 const sslTargetNameOverride = this.options['grpc.ssl_target_name_override'];
475 connectionOptions.checkServerIdentity = (host, cert) => {
476 return tls_1.checkServerIdentity(sslTargetNameOverride, cert);
477 };
478 connectionOptions.servername = sslTargetNameOverride;
479 }
480 else {
481 if ('grpc.http_connect_target' in this.options) {
482 /* This is more or less how servername will be set in createSession
483 * if a connection is successfully established through the proxy.
484 * If the proxy is not used, these connectionOptions are discarded
485 * anyway */
486 const targetPath = resolver_1.getDefaultAuthority((_a = uri_parser_1.parseUri(this.options['grpc.http_connect_target'])) !== null && _a !== void 0 ? _a : {
487 path: 'localhost',
488 });
489 const hostPort = uri_parser_1.splitHostPort(targetPath);
490 connectionOptions.servername = (_b = hostPort === null || hostPort === void 0 ? void 0 : hostPort.host) !== null && _b !== void 0 ? _b : targetPath;
491 }
492 }
493 }
494 http_proxy_1.getProxiedConnection(this.subchannelAddress, this.options, connectionOptions).then((result) => {
495 this.createSession(result);
496 }, (reason) => {
497 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
498 });
499 }
500 handleDisconnect() {
501 this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
502 for (const listener of this.disconnectListeners) {
503 listener();
504 }
505 }
506 /**
507 * Initiate a state transition from any element of oldStates to the new
508 * state. If the current connectivityState is not in oldStates, do nothing.
509 * @param oldStates The set of states to transition from
510 * @param newState The state to transition to
511 * @returns True if the state changed, false otherwise
512 */
513 transitionToState(oldStates, newState) {
514 if (oldStates.indexOf(this.connectivityState) === -1) {
515 return false;
516 }
517 this.trace(connectivity_state_1.ConnectivityState[this.connectivityState] +
518 ' -> ' +
519 connectivity_state_1.ConnectivityState[newState]);
520 if (this.channelzEnabled) {
521 this.channelzTrace.addTrace('CT_INFO', connectivity_state_1.ConnectivityState[this.connectivityState] + ' -> ' + connectivity_state_1.ConnectivityState[newState]);
522 }
523 const previousState = this.connectivityState;
524 this.connectivityState = newState;
525 switch (newState) {
526 case connectivity_state_1.ConnectivityState.READY:
527 this.stopBackoff();
528 const session = this.session;
529 session.socket.once('close', () => {
530 if (this.session === session) {
531 this.handleDisconnect();
532 }
533 });
534 if (this.keepaliveWithoutCalls) {
535 this.startKeepalivePings();
536 }
537 break;
538 case connectivity_state_1.ConnectivityState.CONNECTING:
539 this.startBackoff();
540 this.startConnectingInternal();
541 this.continueConnecting = false;
542 break;
543 case connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE:
544 if (this.session) {
545 this.session.close();
546 }
547 this.session = null;
548 this.resetChannelzSocketInfo();
549 this.stopKeepalivePings();
550 /* If the backoff timer has already ended by the time we get to the
551 * TRANSIENT_FAILURE state, we want to immediately transition out of
552 * TRANSIENT_FAILURE as though the backoff timer is ending right now */
553 if (!this.backoffTimeout.isRunning()) {
554 process.nextTick(() => {
555 this.handleBackoffTimer();
556 });
557 }
558 break;
559 case connectivity_state_1.ConnectivityState.IDLE:
560 if (this.session) {
561 this.session.close();
562 }
563 this.session = null;
564 this.resetChannelzSocketInfo();
565 this.stopKeepalivePings();
566 break;
567 default:
568 throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
569 }
570 /* We use a shallow copy of the stateListeners array in case a listener
571 * is removed during this iteration */
572 for (const listener of [...this.stateListeners]) {
573 listener(this, previousState, newState);
574 }
575 return true;
576 }
577 /**
578 * Check if the subchannel associated with zero calls and with zero channels.
579 * If so, shut it down.
580 */
581 checkBothRefcounts() {
582 /* If no calls, channels, or subchannel pools have any more references to
583 * this subchannel, we can be sure it will never be used again. */
584 if (this.callRefcount === 0 && this.refcount === 0) {
585 if (this.channelzEnabled) {
586 this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
587 }
588 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING, connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE);
589 if (this.channelzEnabled) {
590 channelz_1.unregisterChannelzRef(this.channelzRef);
591 }
592 }
593 }
594 callRef() {
595 this.refTrace('callRefcount ' +
596 this.callRefcount +
597 ' -> ' +
598 (this.callRefcount + 1));
599 if (this.callRefcount === 0) {
600 if (this.session) {
601 this.session.ref();
602 }
603 this.backoffTimeout.ref();
604 if (!this.keepaliveWithoutCalls) {
605 this.startKeepalivePings();
606 }
607 }
608 this.callRefcount += 1;
609 }
610 callUnref() {
611 this.refTrace('callRefcount ' +
612 this.callRefcount +
613 ' -> ' +
614 (this.callRefcount - 1));
615 this.callRefcount -= 1;
616 if (this.callRefcount === 0) {
617 if (this.session) {
618 this.session.unref();
619 }
620 this.backoffTimeout.unref();
621 if (!this.keepaliveWithoutCalls) {
622 clearInterval(this.keepaliveIntervalId);
623 }
624 this.checkBothRefcounts();
625 }
626 }
627 ref() {
628 this.refTrace('refcount ' +
629 this.refcount +
630 ' -> ' +
631 (this.refcount + 1));
632 this.refcount += 1;
633 }
634 unref() {
635 this.refTrace('refcount ' +
636 this.refcount +
637 ' -> ' +
638 (this.refcount - 1));
639 this.refcount -= 1;
640 this.checkBothRefcounts();
641 }
642 unrefIfOneRef() {
643 if (this.refcount === 1) {
644 this.unref();
645 return true;
646 }
647 return false;
648 }
649 /**
650 * Start a stream on the current session with the given `metadata` as headers
651 * and then attach it to the `callStream`. Must only be called if the
652 * subchannel's current connectivity state is READY.
653 * @param metadata
654 * @param callStream
655 */
656 startCallStream(metadata, callStream, extraFilters) {
657 const headers = metadata.toHttp2Headers();
658 headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
659 headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
660 headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
661 headers[HTTP2_HEADER_METHOD] = 'POST';
662 headers[HTTP2_HEADER_PATH] = callStream.getMethod();
663 headers[HTTP2_HEADER_TE] = 'trailers';
664 let http2Stream;
665 /* In theory, if an error is thrown by session.request because session has
666 * become unusable (e.g. because it has received a goaway), this subchannel
667 * should soon see the corresponding close or goaway event anyway and leave
668 * READY. But we have seen reports that this does not happen
669 * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096)
670 * so for defense in depth, we just discard the session when we see an
671 * error here.
672 */
673 try {
674 http2Stream = this.session.request(headers);
675 }
676 catch (e) {
677 this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
678 throw e;
679 }
680 let headersString = '';
681 for (const header of Object.keys(headers)) {
682 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
683 }
684 logging.trace(constants_1.LogVerbosity.DEBUG, 'call_stream', 'Starting stream [' + callStream.getCallNumber() + '] on subchannel ' +
685 '(' + this.channelzRef.id + ') ' +
686 this.subchannelAddressString +
687 ' with headers\n' +
688 headersString);
689 this.flowControlTrace('local window size: ' +
690 this.session.state.localWindowSize +
691 ' remote window size: ' +
692 this.session.state.remoteWindowSize);
693 const streamSession = this.session;
694 this.internalsTrace('session.closed=' +
695 streamSession.closed +
696 ' session.destroyed=' +
697 streamSession.destroyed +
698 ' session.socket.destroyed=' +
699 streamSession.socket.destroyed);
700 let statsTracker;
701 if (this.channelzEnabled) {
702 this.callTracker.addCallStarted();
703 callStream.addStatusWatcher(status => {
704 if (status.code === constants_1.Status.OK) {
705 this.callTracker.addCallSucceeded();
706 }
707 else {
708 this.callTracker.addCallFailed();
709 }
710 });
711 this.streamTracker.addCallStarted();
712 callStream.addStreamEndWatcher(success => {
713 if (streamSession === this.session) {
714 if (success) {
715 this.streamTracker.addCallSucceeded();
716 }
717 else {
718 this.streamTracker.addCallFailed();
719 }
720 }
721 });
722 statsTracker = {
723 addMessageSent: () => {
724 this.messagesSent += 1;
725 this.lastMessageSentTimestamp = new Date();
726 },
727 addMessageReceived: () => {
728 this.messagesReceived += 1;
729 }
730 };
731 }
732 else {
733 statsTracker = {
734 addMessageSent: () => { },
735 addMessageReceived: () => { }
736 };
737 }
738 callStream.attachHttp2Stream(http2Stream, this, extraFilters, statsTracker);
739 }
740 /**
741 * If the subchannel is currently IDLE, start connecting and switch to the
742 * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE,
743 * the next time it would transition to IDLE, start connecting again instead.
744 * Otherwise, do nothing.
745 */
746 startConnecting() {
747 /* First, try to transition from IDLE to connecting. If that doesn't happen
748 * because the state is not currently IDLE, check if it is
749 * TRANSIENT_FAILURE, and if so indicate that it should go back to
750 * connecting after the backoff timer ends. Otherwise do nothing */
751 if (!this.transitionToState([connectivity_state_1.ConnectivityState.IDLE], connectivity_state_1.ConnectivityState.CONNECTING)) {
752 if (this.connectivityState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) {
753 this.continueConnecting = true;
754 }
755 }
756 }
757 /**
758 * Get the subchannel's current connectivity state.
759 */
760 getConnectivityState() {
761 return this.connectivityState;
762 }
763 /**
764 * Add a listener function to be called whenever the subchannel's
765 * connectivity state changes.
766 * @param listener
767 */
768 addConnectivityStateListener(listener) {
769 this.stateListeners.push(listener);
770 }
771 /**
772 * Remove a listener previously added with `addConnectivityStateListener`
773 * @param listener A reference to a function previously passed to
774 * `addConnectivityStateListener`
775 */
776 removeConnectivityStateListener(listener) {
777 const listenerIndex = this.stateListeners.indexOf(listener);
778 if (listenerIndex > -1) {
779 this.stateListeners.splice(listenerIndex, 1);
780 }
781 }
782 addDisconnectListener(listener) {
783 this.disconnectListeners.push(listener);
784 }
785 removeDisconnectListener(listener) {
786 const listenerIndex = this.disconnectListeners.indexOf(listener);
787 if (listenerIndex > -1) {
788 this.disconnectListeners.splice(listenerIndex, 1);
789 }
790 }
791 /**
792 * Reset the backoff timeout, and immediately start connecting if in backoff.
793 */
794 resetBackoff() {
795 this.backoffTimeout.reset();
796 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.CONNECTING);
797 }
798 getAddress() {
799 return this.subchannelAddressString;
800 }
801 getChannelzRef() {
802 return this.channelzRef;
803 }
804 getRealSubchannel() {
805 return this;
806 }
807}
808exports.Subchannel = Subchannel;
809//# sourceMappingURL=subchannel.js.map
\No newline at end of file