UNPKG

37.5 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Subchannel = void 0;
20const http2 = require("http2");
21const tls_1 = require("tls");
22const connectivity_state_1 = require("./connectivity-state");
23const backoff_timeout_1 = require("./backoff-timeout");
24const resolver_1 = require("./resolver");
25const logging = require("./logging");
26const constants_1 = require("./constants");
27const http_proxy_1 = require("./http_proxy");
28const net = require("net");
29const uri_parser_1 = require("./uri-parser");
30const subchannel_address_1 = require("./subchannel-address");
31const channelz_1 = require("./channelz");
32const clientVersion = require('../../package.json').version;
33const TRACER_NAME = 'subchannel';
34const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl';
35const MIN_CONNECT_TIMEOUT_MS = 20000;
36const INITIAL_BACKOFF_MS = 1000;
37const BACKOFF_MULTIPLIER = 1.6;
38const MAX_BACKOFF_MS = 120000;
39const BACKOFF_JITTER = 0.2;
40/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
41 * have a constant for the max signed 32 bit integer, so this is a simple way
42 * to calculate it */
43const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
44const KEEPALIVE_TIMEOUT_MS = 20000;
45const { HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_CONTENT_TYPE, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, HTTP2_HEADER_TE, HTTP2_HEADER_USER_AGENT, } = http2.constants;
46/**
47 * Get a number uniformly at random in the range [min, max)
48 * @param min
49 * @param max
50 */
51function uniformRandom(min, max) {
52 return Math.random() * (max - min) + min;
53}
54const tooManyPingsData = Buffer.from('too_many_pings', 'ascii');
55class Subchannel {
56 /**
57 * A class representing a connection to a single backend.
58 * @param channelTarget The target string for the channel as a whole
59 * @param subchannelAddress The address for the backend that this subchannel
60 * will connect to
61 * @param options The channel options, plus any specific subchannel options
62 * for this subchannel
63 * @param credentials The channel credentials used to establish this
64 * connection
65 */
66 constructor(channelTarget, subchannelAddress, options, credentials) {
67 this.channelTarget = channelTarget;
68 this.subchannelAddress = subchannelAddress;
69 this.options = options;
70 this.credentials = credentials;
71 /**
72 * The subchannel's current connectivity state. Invariant: `session` === `null`
73 * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE.
74 */
75 this.connectivityState = connectivity_state_1.ConnectivityState.IDLE;
76 /**
77 * The underlying http2 session used to make requests.
78 */
79 this.session = null;
80 /**
81 * Indicates that the subchannel should transition from TRANSIENT_FAILURE to
82 * CONNECTING instead of IDLE when the backoff timeout ends.
83 */
84 this.continueConnecting = false;
85 /**
86 * A list of listener functions that will be called whenever the connectivity
87 * state changes. Will be modified by `addConnectivityStateListener` and
88 * `removeConnectivityStateListener`
89 */
90 this.stateListeners = [];
91 /**
92 * A list of listener functions that will be called when the underlying
93 * socket disconnects. Used for ending active calls with an UNAVAILABLE
94 * status.
95 */
96 this.disconnectListeners = new Set();
97 /**
98 * The amount of time in between sending pings
99 */
100 this.keepaliveTimeMs = KEEPALIVE_MAX_TIME_MS;
101 /**
102 * The amount of time to wait for an acknowledgement after sending a ping
103 */
104 this.keepaliveTimeoutMs = KEEPALIVE_TIMEOUT_MS;
105 /**
106 * Indicates whether keepalive pings should be sent without any active calls
107 */
108 this.keepaliveWithoutCalls = false;
109 /**
110 * Tracks calls with references to this subchannel
111 */
112 this.callRefcount = 0;
113 /**
114 * Tracks channels and subchannel pools with references to this subchannel
115 */
116 this.refcount = 0;
117 // Channelz info
118 this.channelzEnabled = true;
119 this.callTracker = new channelz_1.ChannelzCallTracker();
120 this.childrenTracker = new channelz_1.ChannelzChildrenTracker();
121 // Channelz socket info
122 this.channelzSocketRef = null;
123 /**
124 * Name of the remote server, if it is not the same as the subchannel
125 * address, i.e. if connecting through an HTTP CONNECT proxy.
126 */
127 this.remoteName = null;
128 this.streamTracker = new channelz_1.ChannelzCallTracker();
129 this.keepalivesSent = 0;
130 this.messagesSent = 0;
131 this.messagesReceived = 0;
132 this.lastMessageSentTimestamp = null;
133 this.lastMessageReceivedTimestamp = null;
134 // Build user-agent string.
135 this.userAgent = [
136 options['grpc.primary_user_agent'],
137 `grpc-node-js/${clientVersion}`,
138 options['grpc.secondary_user_agent'],
139 ]
140 .filter((e) => e)
141 .join(' '); // remove falsey values first
142 if ('grpc.keepalive_time_ms' in options) {
143 this.keepaliveTimeMs = options['grpc.keepalive_time_ms'];
144 }
145 if ('grpc.keepalive_timeout_ms' in options) {
146 this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms'];
147 }
148 if ('grpc.keepalive_permit_without_calls' in options) {
149 this.keepaliveWithoutCalls =
150 options['grpc.keepalive_permit_without_calls'] === 1;
151 }
152 else {
153 this.keepaliveWithoutCalls = false;
154 }
155 this.keepaliveIntervalId = setTimeout(() => { }, 0);
156 clearTimeout(this.keepaliveIntervalId);
157 this.keepaliveTimeoutId = setTimeout(() => { }, 0);
158 clearTimeout(this.keepaliveTimeoutId);
159 const backoffOptions = {
160 initialDelay: options['grpc.initial_reconnect_backoff_ms'],
161 maxDelay: options['grpc.max_reconnect_backoff_ms'],
162 };
163 this.backoffTimeout = new backoff_timeout_1.BackoffTimeout(() => {
164 this.handleBackoffTimer();
165 }, backoffOptions);
166 this.subchannelAddressString = subchannel_address_1.subchannelAddressToString(subchannelAddress);
167 if (options['grpc.enable_channelz'] === 0) {
168 this.channelzEnabled = false;
169 }
170 this.channelzTrace = new channelz_1.ChannelzTrace();
171 this.channelzRef = channelz_1.registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled);
172 if (this.channelzEnabled) {
173 this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
174 }
175 this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
176 }
177 getChannelzInfo() {
178 return {
179 state: this.connectivityState,
180 trace: this.channelzTrace,
181 callTracker: this.callTracker,
182 children: this.childrenTracker.getChildLists(),
183 target: this.subchannelAddressString
184 };
185 }
186 getChannelzSocketInfo() {
187 var _a, _b, _c;
188 if (this.session === null) {
189 return null;
190 }
191 const sessionSocket = this.session.socket;
192 const remoteAddress = sessionSocket.remoteAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
193 const localAddress = sessionSocket.localAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null;
194 let tlsInfo;
195 if (this.session.encrypted) {
196 const tlsSocket = sessionSocket;
197 const cipherInfo = tlsSocket.getCipher();
198 const certificate = tlsSocket.getCertificate();
199 const peerCertificate = tlsSocket.getPeerCertificate();
200 tlsInfo = {
201 cipherSuiteStandardName: (_a = cipherInfo.standardName) !== null && _a !== void 0 ? _a : null,
202 cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
203 localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
204 remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
205 };
206 }
207 else {
208 tlsInfo = null;
209 }
210 const socketInfo = {
211 remoteAddress: remoteAddress,
212 localAddress: localAddress,
213 security: tlsInfo,
214 remoteName: this.remoteName,
215 streamsStarted: this.streamTracker.callsStarted,
216 streamsSucceeded: this.streamTracker.callsSucceeded,
217 streamsFailed: this.streamTracker.callsFailed,
218 messagesSent: this.messagesSent,
219 messagesReceived: this.messagesReceived,
220 keepAlivesSent: this.keepalivesSent,
221 lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp,
222 lastRemoteStreamCreatedTimestamp: null,
223 lastMessageSentTimestamp: this.lastMessageSentTimestamp,
224 lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp,
225 localFlowControlWindow: (_b = this.session.state.localWindowSize) !== null && _b !== void 0 ? _b : null,
226 remoteFlowControlWindow: (_c = this.session.state.remoteWindowSize) !== null && _c !== void 0 ? _c : null
227 };
228 return socketInfo;
229 }
230 resetChannelzSocketInfo() {
231 if (!this.channelzEnabled) {
232 return;
233 }
234 if (this.channelzSocketRef) {
235 channelz_1.unregisterChannelzRef(this.channelzSocketRef);
236 this.childrenTracker.unrefChild(this.channelzSocketRef);
237 this.channelzSocketRef = null;
238 }
239 this.remoteName = null;
240 this.streamTracker = new channelz_1.ChannelzCallTracker();
241 this.keepalivesSent = 0;
242 this.messagesSent = 0;
243 this.messagesReceived = 0;
244 this.lastMessageSentTimestamp = null;
245 this.lastMessageReceivedTimestamp = null;
246 }
247 trace(text) {
248 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
249 }
250 refTrace(text) {
251 logging.trace(constants_1.LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
252 }
253 flowControlTrace(text) {
254 logging.trace(constants_1.LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
255 }
256 internalsTrace(text) {
257 logging.trace(constants_1.LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
258 }
259 keepaliveTrace(text) {
260 logging.trace(constants_1.LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
261 }
262 handleBackoffTimer() {
263 if (this.continueConnecting) {
264 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.CONNECTING);
265 }
266 else {
267 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.IDLE);
268 }
269 }
270 /**
271 * Start a backoff timer with the current nextBackoff timeout
272 */
273 startBackoff() {
274 this.backoffTimeout.runOnce();
275 }
276 stopBackoff() {
277 this.backoffTimeout.stop();
278 this.backoffTimeout.reset();
279 }
280 sendPing() {
281 var _a, _b;
282 if (this.channelzEnabled) {
283 this.keepalivesSent += 1;
284 }
285 this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms');
286 this.keepaliveTimeoutId = setTimeout(() => {
287 this.keepaliveTrace('Ping timeout passed without response');
288 this.handleDisconnect();
289 }, this.keepaliveTimeoutMs);
290 (_b = (_a = this.keepaliveTimeoutId).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
291 try {
292 this.session.ping((err, duration, payload) => {
293 this.keepaliveTrace('Received ping response');
294 clearTimeout(this.keepaliveTimeoutId);
295 });
296 }
297 catch (e) {
298 /* If we fail to send a ping, the connection is no longer functional, so
299 * we should discard it. */
300 this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
301 }
302 }
303 startKeepalivePings() {
304 var _a, _b;
305 this.keepaliveIntervalId = setInterval(() => {
306 this.sendPing();
307 }, this.keepaliveTimeMs);
308 (_b = (_a = this.keepaliveIntervalId).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
309 /* Don't send a ping immediately because whatever caused us to start
310 * sending pings should also involve some network activity. */
311 }
312 /**
313 * Stop keepalive pings when terminating a connection. This discards the
314 * outstanding ping timeout, so it should not be called if the same
315 * connection will still be used.
316 */
317 stopKeepalivePings() {
318 clearInterval(this.keepaliveIntervalId);
319 clearTimeout(this.keepaliveTimeoutId);
320 }
321 createSession(proxyConnectionResult) {
322 var _a, _b, _c;
323 if (proxyConnectionResult.realTarget) {
324 this.remoteName = uri_parser_1.uriToString(proxyConnectionResult.realTarget);
325 this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget);
326 }
327 else {
328 this.remoteName = null;
329 this.trace('creating HTTP/2 session');
330 }
331 const targetAuthority = resolver_1.getDefaultAuthority((_a = proxyConnectionResult.realTarget) !== null && _a !== void 0 ? _a : this.channelTarget);
332 let connectionOptions = this.credentials._getConnectionOptions() || {};
333 connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER;
334 if ('grpc-node.max_session_memory' in this.options) {
335 connectionOptions.maxSessionMemory = this.options['grpc-node.max_session_memory'];
336 }
337 else {
338 /* By default, set a very large max session memory limit, to effectively
339 * disable enforcement of the limit. Some testing indicates that Node's
340 * behavior degrades badly when this limit is reached, so we solve that
341 * by disabling the check entirely. */
342 connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
343 }
344 let addressScheme = 'http://';
345 if ('secureContext' in connectionOptions) {
346 addressScheme = 'https://';
347 // If provided, the value of grpc.ssl_target_name_override should be used
348 // to override the target hostname when checking server identity.
349 // This option is used for testing only.
350 if (this.options['grpc.ssl_target_name_override']) {
351 const sslTargetNameOverride = this.options['grpc.ssl_target_name_override'];
352 connectionOptions.checkServerIdentity = (host, cert) => {
353 return tls_1.checkServerIdentity(sslTargetNameOverride, cert);
354 };
355 connectionOptions.servername = sslTargetNameOverride;
356 }
357 else {
358 const authorityHostname = (_c = (_b = uri_parser_1.splitHostPort(targetAuthority)) === null || _b === void 0 ? void 0 : _b.host) !== null && _c !== void 0 ? _c : 'localhost';
359 // We want to always set servername to support SNI
360 connectionOptions.servername = authorityHostname;
361 }
362 if (proxyConnectionResult.socket) {
363 /* This is part of the workaround for
364 * https://github.com/nodejs/node/issues/32922. Without that bug,
365 * proxyConnectionResult.socket would always be a plaintext socket and
366 * this would say
367 * connectionOptions.socket = proxyConnectionResult.socket; */
368 connectionOptions.createConnection = (authority, option) => {
369 return proxyConnectionResult.socket;
370 };
371 }
372 }
373 else {
374 /* In all but the most recent versions of Node, http2.connect does not use
375 * the options when establishing plaintext connections, so we need to
376 * establish that connection explicitly. */
377 connectionOptions.createConnection = (authority, option) => {
378 if (proxyConnectionResult.socket) {
379 return proxyConnectionResult.socket;
380 }
381 else {
382 /* net.NetConnectOpts is declared in a way that is more restrictive
383 * than what net.connect will actually accept, so we use the type
384 * assertion to work around that. */
385 return net.connect(this.subchannelAddress);
386 }
387 };
388 }
389 connectionOptions = Object.assign(Object.assign({}, connectionOptions), this.subchannelAddress);
390 /* http2.connect uses the options here:
391 * https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036
392 * The spread operator overides earlier values with later ones, so any port
393 * or host values in the options will be used rather than any values extracted
394 * from the first argument. In addition, the path overrides the host and port,
395 * as documented for plaintext connections here:
396 * https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener
397 * and for TLS connections here:
398 * https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In
399 * earlier versions of Node, http2.connect passes these options to
400 * tls.connect but not net.connect, so in the insecure case we still need
401 * to set the createConnection option above to create the connection
402 * explicitly. We cannot do that in the TLS case because http2.connect
403 * passes necessary additional options to tls.connect.
404 * The first argument just needs to be parseable as a URL and the scheme
405 * determines whether the connection will be established over TLS or not.
406 */
407 const session = http2.connect(addressScheme + targetAuthority, connectionOptions);
408 this.session = session;
409 this.channelzSocketRef = channelz_1.registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo(), this.channelzEnabled);
410 if (this.channelzEnabled) {
411 this.childrenTracker.refChild(this.channelzSocketRef);
412 }
413 session.unref();
414 /* For all of these events, check if the session at the time of the event
415 * is the same one currently attached to this subchannel, to ensure that
416 * old events from previous connection attempts cannot cause invalid state
417 * transitions. */
418 session.once('connect', () => {
419 if (this.session === session) {
420 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.READY);
421 }
422 });
423 session.once('close', () => {
424 if (this.session === session) {
425 this.trace('connection closed');
426 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
427 /* Transitioning directly to IDLE here should be OK because we are not
428 * doing any backoff, because a connection was established at some
429 * point */
430 this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE);
431 }
432 });
433 session.once('goaway', (errorCode, lastStreamID, opaqueData) => {
434 if (this.session === session) {
435 /* See the last paragraph of
436 * https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */
437 if (errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
438 opaqueData.equals(tooManyPingsData)) {
439 this.keepaliveTimeMs = Math.min(2 * this.keepaliveTimeMs, KEEPALIVE_MAX_TIME_MS);
440 logging.log(constants_1.LogVerbosity.ERROR, `Connection to ${uri_parser_1.uriToString(this.channelTarget)} at ${this.subchannelAddressString} rejected by server because of excess pings. Increasing ping interval to ${this.keepaliveTimeMs} ms`);
441 }
442 this.trace('connection closed by GOAWAY with code ' +
443 errorCode);
444 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING, connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE);
445 }
446 });
447 session.once('error', (error) => {
448 /* Do nothing here. Any error should also trigger a close event, which is
449 * where we want to handle that. */
450 this.trace('connection closed with error ' +
451 error.message);
452 });
453 if (logging.isTracerEnabled(TRACER_NAME)) {
454 session.on('remoteSettings', (settings) => {
455 this.trace('new settings received' +
456 (this.session !== session ? ' on the old connection' : '') +
457 ': ' +
458 JSON.stringify(settings));
459 });
460 session.on('localSettings', (settings) => {
461 this.trace('local settings acknowledged by remote' +
462 (this.session !== session ? ' on the old connection' : '') +
463 ': ' +
464 JSON.stringify(settings));
465 });
466 }
467 }
468 startConnectingInternal() {
469 var _a, _b;
470 /* Pass connection options through to the proxy so that it's able to
471 * upgrade it's connection to support tls if needed.
472 * This is a workaround for https://github.com/nodejs/node/issues/32922
473 * See https://github.com/grpc/grpc-node/pull/1369 for more info. */
474 const connectionOptions = this.credentials._getConnectionOptions() || {};
475 if ('secureContext' in connectionOptions) {
476 connectionOptions.ALPNProtocols = ['h2'];
477 // If provided, the value of grpc.ssl_target_name_override should be used
478 // to override the target hostname when checking server identity.
479 // This option is used for testing only.
480 if (this.options['grpc.ssl_target_name_override']) {
481 const sslTargetNameOverride = this.options['grpc.ssl_target_name_override'];
482 connectionOptions.checkServerIdentity = (host, cert) => {
483 return tls_1.checkServerIdentity(sslTargetNameOverride, cert);
484 };
485 connectionOptions.servername = sslTargetNameOverride;
486 }
487 else {
488 if ('grpc.http_connect_target' in this.options) {
489 /* This is more or less how servername will be set in createSession
490 * if a connection is successfully established through the proxy.
491 * If the proxy is not used, these connectionOptions are discarded
492 * anyway */
493 const targetPath = resolver_1.getDefaultAuthority((_a = uri_parser_1.parseUri(this.options['grpc.http_connect_target'])) !== null && _a !== void 0 ? _a : {
494 path: 'localhost',
495 });
496 const hostPort = uri_parser_1.splitHostPort(targetPath);
497 connectionOptions.servername = (_b = hostPort === null || hostPort === void 0 ? void 0 : hostPort.host) !== null && _b !== void 0 ? _b : targetPath;
498 }
499 }
500 }
501 http_proxy_1.getProxiedConnection(this.subchannelAddress, this.options, connectionOptions).then((result) => {
502 this.createSession(result);
503 }, (reason) => {
504 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
505 });
506 }
507 handleDisconnect() {
508 this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
509 for (const listener of this.disconnectListeners.values()) {
510 listener();
511 }
512 }
513 /**
514 * Initiate a state transition from any element of oldStates to the new
515 * state. If the current connectivityState is not in oldStates, do nothing.
516 * @param oldStates The set of states to transition from
517 * @param newState The state to transition to
518 * @returns True if the state changed, false otherwise
519 */
520 transitionToState(oldStates, newState) {
521 if (oldStates.indexOf(this.connectivityState) === -1) {
522 return false;
523 }
524 this.trace(connectivity_state_1.ConnectivityState[this.connectivityState] +
525 ' -> ' +
526 connectivity_state_1.ConnectivityState[newState]);
527 if (this.channelzEnabled) {
528 this.channelzTrace.addTrace('CT_INFO', connectivity_state_1.ConnectivityState[this.connectivityState] + ' -> ' + connectivity_state_1.ConnectivityState[newState]);
529 }
530 const previousState = this.connectivityState;
531 this.connectivityState = newState;
532 switch (newState) {
533 case connectivity_state_1.ConnectivityState.READY:
534 this.stopBackoff();
535 const session = this.session;
536 session.socket.once('close', () => {
537 if (this.session === session) {
538 this.handleDisconnect();
539 }
540 });
541 if (this.keepaliveWithoutCalls) {
542 this.startKeepalivePings();
543 }
544 break;
545 case connectivity_state_1.ConnectivityState.CONNECTING:
546 this.startBackoff();
547 this.startConnectingInternal();
548 this.continueConnecting = false;
549 break;
550 case connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE:
551 if (this.session) {
552 this.session.close();
553 }
554 this.session = null;
555 this.resetChannelzSocketInfo();
556 this.stopKeepalivePings();
557 /* If the backoff timer has already ended by the time we get to the
558 * TRANSIENT_FAILURE state, we want to immediately transition out of
559 * TRANSIENT_FAILURE as though the backoff timer is ending right now */
560 if (!this.backoffTimeout.isRunning()) {
561 process.nextTick(() => {
562 this.handleBackoffTimer();
563 });
564 }
565 break;
566 case connectivity_state_1.ConnectivityState.IDLE:
567 if (this.session) {
568 this.session.close();
569 }
570 this.session = null;
571 this.resetChannelzSocketInfo();
572 this.stopKeepalivePings();
573 break;
574 default:
575 throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
576 }
577 /* We use a shallow copy of the stateListeners array in case a listener
578 * is removed during this iteration */
579 for (const listener of [...this.stateListeners]) {
580 listener(this, previousState, newState);
581 }
582 return true;
583 }
584 /**
585 * Check if the subchannel associated with zero calls and with zero channels.
586 * If so, shut it down.
587 */
588 checkBothRefcounts() {
589 /* If no calls, channels, or subchannel pools have any more references to
590 * this subchannel, we can be sure it will never be used again. */
591 if (this.callRefcount === 0 && this.refcount === 0) {
592 if (this.channelzEnabled) {
593 this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
594 }
595 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING, connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE);
596 if (this.channelzEnabled) {
597 channelz_1.unregisterChannelzRef(this.channelzRef);
598 }
599 }
600 }
601 callRef() {
602 this.refTrace('callRefcount ' +
603 this.callRefcount +
604 ' -> ' +
605 (this.callRefcount + 1));
606 if (this.callRefcount === 0) {
607 if (this.session) {
608 this.session.ref();
609 }
610 this.backoffTimeout.ref();
611 if (!this.keepaliveWithoutCalls) {
612 this.startKeepalivePings();
613 }
614 }
615 this.callRefcount += 1;
616 }
617 callUnref() {
618 this.refTrace('callRefcount ' +
619 this.callRefcount +
620 ' -> ' +
621 (this.callRefcount - 1));
622 this.callRefcount -= 1;
623 if (this.callRefcount === 0) {
624 if (this.session) {
625 this.session.unref();
626 }
627 this.backoffTimeout.unref();
628 if (!this.keepaliveWithoutCalls) {
629 clearInterval(this.keepaliveIntervalId);
630 }
631 this.checkBothRefcounts();
632 }
633 }
634 ref() {
635 this.refTrace('refcount ' +
636 this.refcount +
637 ' -> ' +
638 (this.refcount + 1));
639 this.refcount += 1;
640 }
641 unref() {
642 this.refTrace('refcount ' +
643 this.refcount +
644 ' -> ' +
645 (this.refcount - 1));
646 this.refcount -= 1;
647 this.checkBothRefcounts();
648 }
649 unrefIfOneRef() {
650 if (this.refcount === 1) {
651 this.unref();
652 return true;
653 }
654 return false;
655 }
656 /**
657 * Start a stream on the current session with the given `metadata` as headers
658 * and then attach it to the `callStream`. Must only be called if the
659 * subchannel's current connectivity state is READY.
660 * @param metadata
661 * @param callStream
662 */
663 startCallStream(metadata, callStream, extraFilters) {
664 const headers = metadata.toHttp2Headers();
665 headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
666 headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
667 headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
668 headers[HTTP2_HEADER_METHOD] = 'POST';
669 headers[HTTP2_HEADER_PATH] = callStream.getMethod();
670 headers[HTTP2_HEADER_TE] = 'trailers';
671 let http2Stream;
672 /* In theory, if an error is thrown by session.request because session has
673 * become unusable (e.g. because it has received a goaway), this subchannel
674 * should soon see the corresponding close or goaway event anyway and leave
675 * READY. But we have seen reports that this does not happen
676 * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096)
677 * so for defense in depth, we just discard the session when we see an
678 * error here.
679 */
680 try {
681 http2Stream = this.session.request(headers);
682 }
683 catch (e) {
684 this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
685 throw e;
686 }
687 let headersString = '';
688 for (const header of Object.keys(headers)) {
689 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
690 }
691 logging.trace(constants_1.LogVerbosity.DEBUG, 'call_stream', 'Starting stream [' + callStream.getCallNumber() + '] on subchannel ' +
692 '(' + this.channelzRef.id + ') ' +
693 this.subchannelAddressString +
694 ' with headers\n' +
695 headersString);
696 this.flowControlTrace('local window size: ' +
697 this.session.state.localWindowSize +
698 ' remote window size: ' +
699 this.session.state.remoteWindowSize);
700 const streamSession = this.session;
701 this.internalsTrace('session.closed=' +
702 streamSession.closed +
703 ' session.destroyed=' +
704 streamSession.destroyed +
705 ' session.socket.destroyed=' +
706 streamSession.socket.destroyed);
707 let statsTracker;
708 if (this.channelzEnabled) {
709 this.callTracker.addCallStarted();
710 callStream.addStatusWatcher(status => {
711 if (status.code === constants_1.Status.OK) {
712 this.callTracker.addCallSucceeded();
713 }
714 else {
715 this.callTracker.addCallFailed();
716 }
717 });
718 this.streamTracker.addCallStarted();
719 callStream.addStreamEndWatcher(success => {
720 if (streamSession === this.session) {
721 if (success) {
722 this.streamTracker.addCallSucceeded();
723 }
724 else {
725 this.streamTracker.addCallFailed();
726 }
727 }
728 });
729 statsTracker = {
730 addMessageSent: () => {
731 this.messagesSent += 1;
732 this.lastMessageSentTimestamp = new Date();
733 },
734 addMessageReceived: () => {
735 this.messagesReceived += 1;
736 }
737 };
738 }
739 else {
740 statsTracker = {
741 addMessageSent: () => { },
742 addMessageReceived: () => { }
743 };
744 }
745 callStream.attachHttp2Stream(http2Stream, this, extraFilters, statsTracker);
746 }
747 /**
748 * If the subchannel is currently IDLE, start connecting and switch to the
749 * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE,
750 * the next time it would transition to IDLE, start connecting again instead.
751 * Otherwise, do nothing.
752 */
753 startConnecting() {
754 /* First, try to transition from IDLE to connecting. If that doesn't happen
755 * because the state is not currently IDLE, check if it is
756 * TRANSIENT_FAILURE, and if so indicate that it should go back to
757 * connecting after the backoff timer ends. Otherwise do nothing */
758 if (!this.transitionToState([connectivity_state_1.ConnectivityState.IDLE], connectivity_state_1.ConnectivityState.CONNECTING)) {
759 if (this.connectivityState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) {
760 this.continueConnecting = true;
761 }
762 }
763 }
764 /**
765 * Get the subchannel's current connectivity state.
766 */
767 getConnectivityState() {
768 return this.connectivityState;
769 }
770 /**
771 * Add a listener function to be called whenever the subchannel's
772 * connectivity state changes.
773 * @param listener
774 */
775 addConnectivityStateListener(listener) {
776 this.stateListeners.push(listener);
777 }
778 /**
779 * Remove a listener previously added with `addConnectivityStateListener`
780 * @param listener A reference to a function previously passed to
781 * `addConnectivityStateListener`
782 */
783 removeConnectivityStateListener(listener) {
784 const listenerIndex = this.stateListeners.indexOf(listener);
785 if (listenerIndex > -1) {
786 this.stateListeners.splice(listenerIndex, 1);
787 }
788 }
789 addDisconnectListener(listener) {
790 this.disconnectListeners.add(listener);
791 }
792 removeDisconnectListener(listener) {
793 this.disconnectListeners.delete(listener);
794 }
795 /**
796 * Reset the backoff timeout, and immediately start connecting if in backoff.
797 */
798 resetBackoff() {
799 this.backoffTimeout.reset();
800 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.CONNECTING);
801 }
802 getAddress() {
803 return this.subchannelAddressString;
804 }
805 getChannelzRef() {
806 return this.channelzRef;
807 }
808 getRealSubchannel() {
809 return this;
810 }
811}
812exports.Subchannel = Subchannel;
813//# sourceMappingURL=subchannel.js.map
\No newline at end of file