UNPKG

34.8 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Subchannel = void 0;
20const http2 = require("http2");
21const tls_1 = require("tls");
22const connectivity_state_1 = require("./connectivity-state");
23const backoff_timeout_1 = require("./backoff-timeout");
24const resolver_1 = require("./resolver");
25const logging = require("./logging");
26const constants_1 = require("./constants");
27const http_proxy_1 = require("./http_proxy");
28const net = require("net");
29const uri_parser_1 = require("./uri-parser");
30const subchannel_address_1 = require("./subchannel-address");
31const channelz_1 = require("./channelz");
32const clientVersion = require('../../package.json').version;
33const TRACER_NAME = 'subchannel';
34const MIN_CONNECT_TIMEOUT_MS = 20000;
35const INITIAL_BACKOFF_MS = 1000;
36const BACKOFF_MULTIPLIER = 1.6;
37const MAX_BACKOFF_MS = 120000;
38const BACKOFF_JITTER = 0.2;
39/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
40 * have a constant for the max signed 32 bit integer, so this is a simple way
41 * to calculate it */
42const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
43const KEEPALIVE_TIMEOUT_MS = 20000;
44const { HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_CONTENT_TYPE, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, HTTP2_HEADER_TE, HTTP2_HEADER_USER_AGENT, } = http2.constants;
45/**
46 * Get a number uniformly at random in the range [min, max)
47 * @param min
48 * @param max
49 */
50function uniformRandom(min, max) {
51 return Math.random() * (max - min) + min;
52}
53const tooManyPingsData = Buffer.from('too_many_pings', 'ascii');
54class Subchannel {
55 /**
56 * A class representing a connection to a single backend.
57 * @param channelTarget The target string for the channel as a whole
58 * @param subchannelAddress The address for the backend that this subchannel
59 * will connect to
60 * @param options The channel options, plus any specific subchannel options
61 * for this subchannel
62 * @param credentials The channel credentials used to establish this
63 * connection
64 */
65 constructor(channelTarget, subchannelAddress, options, credentials) {
66 this.channelTarget = channelTarget;
67 this.subchannelAddress = subchannelAddress;
68 this.options = options;
69 this.credentials = credentials;
70 /**
71 * The subchannel's current connectivity state. Invariant: `session` === `null`
72 * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE.
73 */
74 this.connectivityState = connectivity_state_1.ConnectivityState.IDLE;
75 /**
76 * The underlying http2 session used to make requests.
77 */
78 this.session = null;
79 /**
80 * Indicates that the subchannel should transition from TRANSIENT_FAILURE to
81 * CONNECTING instead of IDLE when the backoff timeout ends.
82 */
83 this.continueConnecting = false;
84 /**
85 * A list of listener functions that will be called whenever the connectivity
86 * state changes. Will be modified by `addConnectivityStateListener` and
87 * `removeConnectivityStateListener`
88 */
89 this.stateListeners = [];
90 /**
91 * A list of listener functions that will be called when the underlying
92 * socket disconnects. Used for ending active calls with an UNAVAILABLE
93 * status.
94 */
95 this.disconnectListeners = [];
96 /**
97 * The amount of time in between sending pings
98 */
99 this.keepaliveTimeMs = KEEPALIVE_MAX_TIME_MS;
100 /**
101 * The amount of time to wait for an acknowledgement after sending a ping
102 */
103 this.keepaliveTimeoutMs = KEEPALIVE_TIMEOUT_MS;
104 /**
105 * Indicates whether keepalive pings should be sent without any active calls
106 */
107 this.keepaliveWithoutCalls = false;
108 /**
109 * Tracks calls with references to this subchannel
110 */
111 this.callRefcount = 0;
112 /**
113 * Tracks channels and subchannel pools with references to this subchannel
114 */
115 this.refcount = 0;
116 // Channelz info
117 this.channelzEnabled = true;
118 this.callTracker = new channelz_1.ChannelzCallTracker();
119 this.childrenTracker = new channelz_1.ChannelzChildrenTracker();
120 // Channelz socket info
121 this.channelzSocketRef = null;
122 /**
123 * Name of the remote server, if it is not the same as the subchannel
124 * address, i.e. if connecting through an HTTP CONNECT proxy.
125 */
126 this.remoteName = null;
127 this.streamTracker = new channelz_1.ChannelzCallTracker();
128 this.keepalivesSent = 0;
129 this.messagesSent = 0;
130 this.messagesReceived = 0;
131 this.lastMessageSentTimestamp = null;
132 this.lastMessageReceivedTimestamp = null;
133 // Build user-agent string.
134 this.userAgent = [
135 options['grpc.primary_user_agent'],
136 `grpc-node-js/${clientVersion}`,
137 options['grpc.secondary_user_agent'],
138 ]
139 .filter((e) => e)
140 .join(' '); // remove falsey values first
141 if ('grpc.keepalive_time_ms' in options) {
142 this.keepaliveTimeMs = options['grpc.keepalive_time_ms'];
143 }
144 if ('grpc.keepalive_timeout_ms' in options) {
145 this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms'];
146 }
147 if ('grpc.keepalive_permit_without_calls' in options) {
148 this.keepaliveWithoutCalls =
149 options['grpc.keepalive_permit_without_calls'] === 1;
150 }
151 else {
152 this.keepaliveWithoutCalls = false;
153 }
154 this.keepaliveIntervalId = setTimeout(() => { }, 0);
155 clearTimeout(this.keepaliveIntervalId);
156 this.keepaliveTimeoutId = setTimeout(() => { }, 0);
157 clearTimeout(this.keepaliveTimeoutId);
158 const backoffOptions = {
159 initialDelay: options['grpc.initial_reconnect_backoff_ms'],
160 maxDelay: options['grpc.max_reconnect_backoff_ms'],
161 };
162 this.backoffTimeout = new backoff_timeout_1.BackoffTimeout(() => {
163 this.handleBackoffTimer();
164 }, backoffOptions);
165 this.subchannelAddressString = subchannel_address_1.subchannelAddressToString(subchannelAddress);
166 if (options['grpc.enable_channelz'] === 0) {
167 this.channelzEnabled = false;
168 }
169 this.channelzTrace = new channelz_1.ChannelzTrace();
170 if (this.channelzEnabled) {
171 this.channelzRef = channelz_1.registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo());
172 this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
173 }
174 else {
175 // Dummy channelz ref that will never be used
176 this.channelzRef = {
177 kind: 'subchannel',
178 id: -1,
179 name: ''
180 };
181 }
182 this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
183 }
184 getChannelzInfo() {
185 return {
186 state: this.connectivityState,
187 trace: this.channelzTrace,
188 callTracker: this.callTracker,
189 children: this.childrenTracker.getChildLists(),
190 target: this.subchannelAddressString
191 };
192 }
193 getChannelzSocketInfo() {
194 var _a, _b, _c;
195 if (this.session === null) {
196 return null;
197 }
198 const sessionSocket = this.session.socket;
199 const remoteAddress = sessionSocket.remoteAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
200 const localAddress = sessionSocket.localAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null;
201 let tlsInfo;
202 if (this.session.encrypted) {
203 const tlsSocket = sessionSocket;
204 const cipherInfo = tlsSocket.getCipher();
205 const certificate = tlsSocket.getCertificate();
206 const peerCertificate = tlsSocket.getPeerCertificate();
207 tlsInfo = {
208 cipherSuiteStandardName: (_a = cipherInfo.standardName) !== null && _a !== void 0 ? _a : null,
209 cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
210 localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
211 remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
212 };
213 }
214 else {
215 tlsInfo = null;
216 }
217 const socketInfo = {
218 remoteAddress: remoteAddress,
219 localAddress: localAddress,
220 security: tlsInfo,
221 remoteName: this.remoteName,
222 streamsStarted: this.streamTracker.callsStarted,
223 streamsSucceeded: this.streamTracker.callsSucceeded,
224 streamsFailed: this.streamTracker.callsFailed,
225 messagesSent: this.messagesSent,
226 messagesReceived: this.messagesReceived,
227 keepAlivesSent: this.keepalivesSent,
228 lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp,
229 lastRemoteStreamCreatedTimestamp: null,
230 lastMessageSentTimestamp: this.lastMessageSentTimestamp,
231 lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp,
232 localFlowControlWindow: (_b = this.session.state.localWindowSize) !== null && _b !== void 0 ? _b : null,
233 remoteFlowControlWindow: (_c = this.session.state.remoteWindowSize) !== null && _c !== void 0 ? _c : null
234 };
235 return socketInfo;
236 }
237 resetChannelzSocketInfo() {
238 if (!this.channelzEnabled) {
239 return;
240 }
241 if (this.channelzSocketRef) {
242 channelz_1.unregisterChannelzRef(this.channelzSocketRef);
243 this.childrenTracker.unrefChild(this.channelzSocketRef);
244 this.channelzSocketRef = null;
245 }
246 this.remoteName = null;
247 this.streamTracker = new channelz_1.ChannelzCallTracker();
248 this.keepalivesSent = 0;
249 this.messagesSent = 0;
250 this.messagesReceived = 0;
251 this.lastMessageSentTimestamp = null;
252 this.lastMessageReceivedTimestamp = null;
253 }
254 trace(text) {
255 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
256 }
257 refTrace(text) {
258 logging.trace(constants_1.LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
259 }
260 handleBackoffTimer() {
261 if (this.continueConnecting) {
262 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.CONNECTING);
263 }
264 else {
265 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.IDLE);
266 }
267 }
268 /**
269 * Start a backoff timer with the current nextBackoff timeout
270 */
271 startBackoff() {
272 this.backoffTimeout.runOnce();
273 }
274 stopBackoff() {
275 this.backoffTimeout.stop();
276 this.backoffTimeout.reset();
277 }
278 sendPing() {
279 var _a, _b;
280 if (this.channelzEnabled) {
281 this.keepalivesSent += 1;
282 }
283 logging.trace(constants_1.LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' +
284 'Sending ping');
285 this.keepaliveTimeoutId = setTimeout(() => {
286 this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE);
287 }, this.keepaliveTimeoutMs);
288 (_b = (_a = this.keepaliveTimeoutId).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
289 this.session.ping((err, duration, payload) => {
290 clearTimeout(this.keepaliveTimeoutId);
291 });
292 }
293 startKeepalivePings() {
294 var _a, _b;
295 this.keepaliveIntervalId = setInterval(() => {
296 this.sendPing();
297 }, this.keepaliveTimeMs);
298 (_b = (_a = this.keepaliveIntervalId).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
299 /* Don't send a ping immediately because whatever caused us to start
300 * sending pings should also involve some network activity. */
301 }
302 stopKeepalivePings() {
303 clearInterval(this.keepaliveIntervalId);
304 clearTimeout(this.keepaliveTimeoutId);
305 }
306 createSession(proxyConnectionResult) {
307 var _a, _b, _c;
308 if (proxyConnectionResult.realTarget) {
309 this.remoteName = uri_parser_1.uriToString(proxyConnectionResult.realTarget);
310 this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget);
311 }
312 else {
313 this.remoteName = null;
314 this.trace('creating HTTP/2 session');
315 }
316 const targetAuthority = resolver_1.getDefaultAuthority((_a = proxyConnectionResult.realTarget) !== null && _a !== void 0 ? _a : this.channelTarget);
317 let connectionOptions = this.credentials._getConnectionOptions() || {};
318 connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER;
319 if ('grpc-node.max_session_memory' in this.options) {
320 connectionOptions.maxSessionMemory = this.options['grpc-node.max_session_memory'];
321 }
322 let addressScheme = 'http://';
323 if ('secureContext' in connectionOptions) {
324 addressScheme = 'https://';
325 // If provided, the value of grpc.ssl_target_name_override should be used
326 // to override the target hostname when checking server identity.
327 // This option is used for testing only.
328 if (this.options['grpc.ssl_target_name_override']) {
329 const sslTargetNameOverride = this.options['grpc.ssl_target_name_override'];
330 connectionOptions.checkServerIdentity = (host, cert) => {
331 return tls_1.checkServerIdentity(sslTargetNameOverride, cert);
332 };
333 connectionOptions.servername = sslTargetNameOverride;
334 }
335 else {
336 const authorityHostname = (_c = (_b = uri_parser_1.splitHostPort(targetAuthority)) === null || _b === void 0 ? void 0 : _b.host) !== null && _c !== void 0 ? _c : 'localhost';
337 // We want to always set servername to support SNI
338 connectionOptions.servername = authorityHostname;
339 }
340 if (proxyConnectionResult.socket) {
341 /* This is part of the workaround for
342 * https://github.com/nodejs/node/issues/32922. Without that bug,
343 * proxyConnectionResult.socket would always be a plaintext socket and
344 * this would say
345 * connectionOptions.socket = proxyConnectionResult.socket; */
346 connectionOptions.createConnection = (authority, option) => {
347 return proxyConnectionResult.socket;
348 };
349 }
350 }
351 else {
352 /* In all but the most recent versions of Node, http2.connect does not use
353 * the options when establishing plaintext connections, so we need to
354 * establish that connection explicitly. */
355 connectionOptions.createConnection = (authority, option) => {
356 if (proxyConnectionResult.socket) {
357 return proxyConnectionResult.socket;
358 }
359 else {
360 /* net.NetConnectOpts is declared in a way that is more restrictive
361 * than what net.connect will actually accept, so we use the type
362 * assertion to work around that. */
363 return net.connect(this.subchannelAddress);
364 }
365 };
366 }
367 connectionOptions = Object.assign(Object.assign({}, connectionOptions), this.subchannelAddress);
368 /* http2.connect uses the options here:
369 * https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036
370 * The spread operator overides earlier values with later ones, so any port
371 * or host values in the options will be used rather than any values extracted
372 * from the first argument. In addition, the path overrides the host and port,
373 * as documented for plaintext connections here:
374 * https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener
375 * and for TLS connections here:
376 * https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In
377 * earlier versions of Node, http2.connect passes these options to
378 * tls.connect but not net.connect, so in the insecure case we still need
379 * to set the createConnection option above to create the connection
380 * explicitly. We cannot do that in the TLS case because http2.connect
381 * passes necessary additional options to tls.connect.
382 * The first argument just needs to be parseable as a URL and the scheme
383 * determines whether the connection will be established over TLS or not.
384 */
385 const session = http2.connect(addressScheme + targetAuthority, connectionOptions);
386 this.session = session;
387 if (this.channelzEnabled) {
388 this.channelzSocketRef = channelz_1.registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo());
389 this.childrenTracker.refChild(this.channelzSocketRef);
390 }
391 session.unref();
392 /* For all of these events, check if the session at the time of the event
393 * is the same one currently attached to this subchannel, to ensure that
394 * old events from previous connection attempts cannot cause invalid state
395 * transitions. */
396 session.once('connect', () => {
397 if (this.session === session) {
398 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.READY);
399 }
400 });
401 session.once('close', () => {
402 if (this.session === session) {
403 this.trace('connection closed');
404 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
405 /* Transitioning directly to IDLE here should be OK because we are not
406 * doing any backoff, because a connection was established at some
407 * point */
408 this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE);
409 }
410 });
411 session.once('goaway', (errorCode, lastStreamID, opaqueData) => {
412 if (this.session === session) {
413 /* See the last paragraph of
414 * https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */
415 if (errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
416 opaqueData.equals(tooManyPingsData)) {
417 this.keepaliveTimeMs = Math.min(2 * this.keepaliveTimeMs, KEEPALIVE_MAX_TIME_MS);
418 logging.log(constants_1.LogVerbosity.ERROR, `Connection to ${uri_parser_1.uriToString(this.channelTarget)} at ${this.subchannelAddressString} rejected by server because of excess pings. Increasing ping interval to ${this.keepaliveTimeMs} ms`);
419 }
420 this.trace('connection closed by GOAWAY with code ' +
421 errorCode);
422 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING, connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE);
423 }
424 });
425 session.once('error', (error) => {
426 /* Do nothing here. Any error should also trigger a close event, which is
427 * where we want to handle that. */
428 this.trace('connection closed with error ' +
429 error.message);
430 });
431 }
432 startConnectingInternal() {
433 var _a, _b;
434 /* Pass connection options through to the proxy so that it's able to
435 * upgrade it's connection to support tls if needed.
436 * This is a workaround for https://github.com/nodejs/node/issues/32922
437 * See https://github.com/grpc/grpc-node/pull/1369 for more info. */
438 const connectionOptions = this.credentials._getConnectionOptions() || {};
439 if ('secureContext' in connectionOptions) {
440 connectionOptions.ALPNProtocols = ['h2'];
441 // If provided, the value of grpc.ssl_target_name_override should be used
442 // to override the target hostname when checking server identity.
443 // This option is used for testing only.
444 if (this.options['grpc.ssl_target_name_override']) {
445 const sslTargetNameOverride = this.options['grpc.ssl_target_name_override'];
446 connectionOptions.checkServerIdentity = (host, cert) => {
447 return tls_1.checkServerIdentity(sslTargetNameOverride, cert);
448 };
449 connectionOptions.servername = sslTargetNameOverride;
450 }
451 else {
452 if ('grpc.http_connect_target' in this.options) {
453 /* This is more or less how servername will be set in createSession
454 * if a connection is successfully established through the proxy.
455 * If the proxy is not used, these connectionOptions are discarded
456 * anyway */
457 const targetPath = resolver_1.getDefaultAuthority((_a = uri_parser_1.parseUri(this.options['grpc.http_connect_target'])) !== null && _a !== void 0 ? _a : {
458 path: 'localhost',
459 });
460 const hostPort = uri_parser_1.splitHostPort(targetPath);
461 connectionOptions.servername = (_b = hostPort === null || hostPort === void 0 ? void 0 : hostPort.host) !== null && _b !== void 0 ? _b : targetPath;
462 }
463 }
464 }
465 http_proxy_1.getProxiedConnection(this.subchannelAddress, this.options, connectionOptions).then((result) => {
466 this.createSession(result);
467 }, (reason) => {
468 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
469 });
470 }
471 /**
472 * Initiate a state transition from any element of oldStates to the new
473 * state. If the current connectivityState is not in oldStates, do nothing.
474 * @param oldStates The set of states to transition from
475 * @param newState The state to transition to
476 * @returns True if the state changed, false otherwise
477 */
478 transitionToState(oldStates, newState) {
479 if (oldStates.indexOf(this.connectivityState) === -1) {
480 return false;
481 }
482 this.trace(connectivity_state_1.ConnectivityState[this.connectivityState] +
483 ' -> ' +
484 connectivity_state_1.ConnectivityState[newState]);
485 if (this.channelzEnabled) {
486 this.channelzTrace.addTrace('CT_INFO', connectivity_state_1.ConnectivityState[this.connectivityState] + ' -> ' + connectivity_state_1.ConnectivityState[newState]);
487 }
488 const previousState = this.connectivityState;
489 this.connectivityState = newState;
490 switch (newState) {
491 case connectivity_state_1.ConnectivityState.READY:
492 this.stopBackoff();
493 this.session.socket.once('close', () => {
494 for (const listener of this.disconnectListeners) {
495 listener();
496 }
497 });
498 if (this.keepaliveWithoutCalls) {
499 this.startKeepalivePings();
500 }
501 break;
502 case connectivity_state_1.ConnectivityState.CONNECTING:
503 this.startBackoff();
504 this.startConnectingInternal();
505 this.continueConnecting = false;
506 break;
507 case connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE:
508 if (this.session) {
509 this.session.close();
510 }
511 this.session = null;
512 this.resetChannelzSocketInfo();
513 this.stopKeepalivePings();
514 /* If the backoff timer has already ended by the time we get to the
515 * TRANSIENT_FAILURE state, we want to immediately transition out of
516 * TRANSIENT_FAILURE as though the backoff timer is ending right now */
517 if (!this.backoffTimeout.isRunning()) {
518 process.nextTick(() => {
519 this.handleBackoffTimer();
520 });
521 }
522 break;
523 case connectivity_state_1.ConnectivityState.IDLE:
524 if (this.session) {
525 this.session.close();
526 }
527 this.session = null;
528 this.resetChannelzSocketInfo();
529 this.stopKeepalivePings();
530 break;
531 default:
532 throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
533 }
534 /* We use a shallow copy of the stateListeners array in case a listener
535 * is removed during this iteration */
536 for (const listener of [...this.stateListeners]) {
537 listener(this, previousState, newState);
538 }
539 return true;
540 }
541 /**
542 * Check if the subchannel associated with zero calls and with zero channels.
543 * If so, shut it down.
544 */
545 checkBothRefcounts() {
546 /* If no calls, channels, or subchannel pools have any more references to
547 * this subchannel, we can be sure it will never be used again. */
548 if (this.callRefcount === 0 && this.refcount === 0) {
549 if (this.channelzEnabled) {
550 this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
551 }
552 this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING, connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
553 if (this.channelzEnabled) {
554 channelz_1.unregisterChannelzRef(this.channelzRef);
555 }
556 }
557 }
558 callRef() {
559 this.refTrace('callRefcount ' +
560 this.callRefcount +
561 ' -> ' +
562 (this.callRefcount + 1));
563 if (this.callRefcount === 0) {
564 if (this.session) {
565 this.session.ref();
566 }
567 this.backoffTimeout.ref();
568 if (!this.keepaliveWithoutCalls) {
569 this.startKeepalivePings();
570 }
571 }
572 this.callRefcount += 1;
573 }
574 callUnref() {
575 this.refTrace('callRefcount ' +
576 this.callRefcount +
577 ' -> ' +
578 (this.callRefcount - 1));
579 this.callRefcount -= 1;
580 if (this.callRefcount === 0) {
581 if (this.session) {
582 this.session.unref();
583 }
584 this.backoffTimeout.unref();
585 if (!this.keepaliveWithoutCalls) {
586 this.stopKeepalivePings();
587 }
588 this.checkBothRefcounts();
589 }
590 }
591 ref() {
592 this.refTrace('refcount ' +
593 this.refcount +
594 ' -> ' +
595 (this.refcount + 1));
596 this.refcount += 1;
597 }
598 unref() {
599 this.refTrace('refcount ' +
600 this.refcount +
601 ' -> ' +
602 (this.refcount - 1));
603 this.refcount -= 1;
604 this.checkBothRefcounts();
605 }
606 unrefIfOneRef() {
607 if (this.refcount === 1) {
608 this.unref();
609 return true;
610 }
611 return false;
612 }
613 /**
614 * Start a stream on the current session with the given `metadata` as headers
615 * and then attach it to the `callStream`. Must only be called if the
616 * subchannel's current connectivity state is READY.
617 * @param metadata
618 * @param callStream
619 */
620 startCallStream(metadata, callStream, extraFilters) {
621 const headers = metadata.toHttp2Headers();
622 headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
623 headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
624 headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
625 headers[HTTP2_HEADER_METHOD] = 'POST';
626 headers[HTTP2_HEADER_PATH] = callStream.getMethod();
627 headers[HTTP2_HEADER_TE] = 'trailers';
628 let http2Stream;
629 /* In theory, if an error is thrown by session.request because session has
630 * become unusable (e.g. because it has received a goaway), this subchannel
631 * should soon see the corresponding close or goaway event anyway and leave
632 * READY. But we have seen reports that this does not happen
633 * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096)
634 * so for defense in depth, we just discard the session when we see an
635 * error here.
636 */
637 try {
638 http2Stream = this.session.request(headers);
639 }
640 catch (e) {
641 this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE);
642 throw e;
643 }
644 let headersString = '';
645 for (const header of Object.keys(headers)) {
646 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
647 }
648 logging.trace(constants_1.LogVerbosity.DEBUG, 'call_stream', 'Starting stream on subchannel ' +
649 '(' + this.channelzRef.id + ') ' +
650 this.subchannelAddressString +
651 ' with headers\n' +
652 headersString);
653 const streamSession = this.session;
654 let statsTracker;
655 if (this.channelzEnabled) {
656 this.callTracker.addCallStarted();
657 callStream.addStatusWatcher(status => {
658 if (status.code === constants_1.Status.OK) {
659 this.callTracker.addCallSucceeded();
660 }
661 else {
662 this.callTracker.addCallFailed();
663 }
664 });
665 this.streamTracker.addCallStarted();
666 callStream.addStreamEndWatcher(success => {
667 if (streamSession === this.session) {
668 if (success) {
669 this.streamTracker.addCallSucceeded();
670 }
671 else {
672 this.streamTracker.addCallFailed();
673 }
674 }
675 });
676 statsTracker = {
677 addMessageSent: () => {
678 this.messagesSent += 1;
679 this.lastMessageSentTimestamp = new Date();
680 },
681 addMessageReceived: () => {
682 this.messagesReceived += 1;
683 }
684 };
685 }
686 else {
687 statsTracker = {
688 addMessageSent: () => { },
689 addMessageReceived: () => { }
690 };
691 }
692 callStream.attachHttp2Stream(http2Stream, this, extraFilters, statsTracker);
693 }
694 /**
695 * If the subchannel is currently IDLE, start connecting and switch to the
696 * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE,
697 * the next time it would transition to IDLE, start connecting again instead.
698 * Otherwise, do nothing.
699 */
700 startConnecting() {
701 /* First, try to transition from IDLE to connecting. If that doesn't happen
702 * because the state is not currently IDLE, check if it is
703 * TRANSIENT_FAILURE, and if so indicate that it should go back to
704 * connecting after the backoff timer ends. Otherwise do nothing */
705 if (!this.transitionToState([connectivity_state_1.ConnectivityState.IDLE], connectivity_state_1.ConnectivityState.CONNECTING)) {
706 if (this.connectivityState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) {
707 this.continueConnecting = true;
708 }
709 }
710 }
711 /**
712 * Get the subchannel's current connectivity state.
713 */
714 getConnectivityState() {
715 return this.connectivityState;
716 }
717 /**
718 * Add a listener function to be called whenever the subchannel's
719 * connectivity state changes.
720 * @param listener
721 */
722 addConnectivityStateListener(listener) {
723 this.stateListeners.push(listener);
724 }
725 /**
726 * Remove a listener previously added with `addConnectivityStateListener`
727 * @param listener A reference to a function previously passed to
728 * `addConnectivityStateListener`
729 */
730 removeConnectivityStateListener(listener) {
731 const listenerIndex = this.stateListeners.indexOf(listener);
732 if (listenerIndex > -1) {
733 this.stateListeners.splice(listenerIndex, 1);
734 }
735 }
736 addDisconnectListener(listener) {
737 this.disconnectListeners.push(listener);
738 }
739 removeDisconnectListener(listener) {
740 const listenerIndex = this.disconnectListeners.indexOf(listener);
741 if (listenerIndex > -1) {
742 this.disconnectListeners.splice(listenerIndex, 1);
743 }
744 }
745 /**
746 * Reset the backoff timeout, and immediately start connecting if in backoff.
747 */
748 resetBackoff() {
749 this.backoffTimeout.reset();
750 this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.CONNECTING);
751 }
752 getAddress() {
753 return this.subchannelAddressString;
754 }
755 getChannelzRef() {
756 return this.channelzRef;
757 }
758}
759exports.Subchannel = Subchannel;
760//# sourceMappingURL=subchannel.js.map
\No newline at end of file