UNPKG

35.7 kBJavaScriptView Raw
1/*
2 * Copyright DataStax, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16'use strict';
17const events = require('events');
18const util = require('util');
19const net = require('net');
20const dns = require('dns');
21
22const errors = require('./errors');
23const { Host, HostMap } = require('./host');
24const Metadata = require('./metadata');
25const EventDebouncer = require('./metadata/event-debouncer');
26const Connection = require('./connection');
27const requests = require('./requests');
28const utils = require('./utils');
29const types = require('./types');
30const promiseUtils = require('./promise-utils');
31const f = util.format;
32
33const selectPeers = "SELECT * FROM system.peers";
34const selectLocal = "SELECT * FROM system.local WHERE key='local'";
35const newNodeDelay = 1000;
36const metadataQueryAbortTimeout = 2000;
37const schemaChangeTypes = {
38 created: 'CREATED',
39 updated: 'UPDATED',
40 dropped: 'DROPPED'
41};
42const supportedProductTypeKey = 'PRODUCT_TYPE';
43const supportedDbaas = 'DATASTAX_APOLLO';
44
45/**
46 * Represents a connection used by the driver to receive events and to check the status of the cluster.
47 * <p>It uses an existing connection from the hosts' connection pool to maintain the driver metadata up-to-date.</p>
48 */
49class ControlConnection extends events.EventEmitter {
50
51 /**
52 * Creates a new instance of <code>ControlConnection</code>.
53 * @param {Object} options
54 * @param {ProfileManager} profileManager
55 * @param {{borrowHostConnection: function, createConnection: function}} [context] An object containing methods to
56 * allow dependency injection.
57 */
58 constructor(options, profileManager, context) {
59 super();
60
61 this.protocolVersion = null;
62 this.hosts = new HostMap();
63 this.setMaxListeners(0);
64 this.log = utils.log;
65 Object.defineProperty(this, "options", { value: options, enumerable: false, writable: false});
66
67 /**
68 * Cluster metadata that is going to be shared between the Client and ControlConnection
69 */
70 this.metadata = new Metadata(this.options, this);
71 this.initialized = false;
72
73 /**
74 * Host used by the control connection
75 * @type {Host|null}
76 */
77 this.host = null;
78
79 /**
80 * Connection used to retrieve metadata and subscribed to events
81 * @type {Connection|null}
82 */
83 this.connection = null;
84
85 this._addressTranslator = this.options.policies.addressResolution;
86 this._reconnectionPolicy = this.options.policies.reconnection;
87 this._reconnectionSchedule = this._reconnectionPolicy.newSchedule();
88 this._isShuttingDown = false;
89
90 // Reference to the encoder of the last valid connection
91 this._encoder = null;
92 this._debouncer = new EventDebouncer(options.refreshSchemaDelay, this.log.bind(this));
93 this._profileManager = profileManager;
94 this._triedHosts = null;
95 this._resolvedContactPoints = new Map();
96 this._contactPoints = new Set();
97
98 // Timeout used for delayed handling of topology changes
99 this._topologyChangeTimeout = null;
100 // Timeout used for delayed handling of node status changes
101 this._nodeStatusChangeTimeout = null;
102
103 if (context && context.borrowHostConnection) {
104 this._borrowHostConnection = context.borrowHostConnection;
105 }
106
107 if (context && context.createConnection) {
108 this._createConnection = context.createConnection;
109 }
110 }
111
112 /**
113 * Stores the contact point information and what it resolved to.
114 * @param {String|null} address
115 * @param {String} port
116 * @param {String} name
117 * @param {Boolean} isIPv6
118 */
119 _addContactPoint(address, port, name, isIPv6) {
120 if (address === null) {
121 // Contact point could not be resolved, store that the resolution came back empty
122 this._resolvedContactPoints.set(name, utils.emptyArray);
123 return;
124 }
125
126 const portNumber = parseInt(port, 10) || this.options.protocolOptions.port;
127 const endpoint = `${address}:${portNumber}`;
128 this._contactPoints.add(endpoint);
129
130 // Use RFC 3986 for IPv4 and IPv6
131 const standardEndpoint = !isIPv6 ? endpoint : `[${address}]:${portNumber}`;
132
133 let resolvedAddressedByName = this._resolvedContactPoints.get(name);
134
135 // NODEJS-646
136 //
137 // We might have a frozen empty array if DNS resolution wasn't working when this name was
138 // initially added, and if that's the case we can't add anything. Detect that case and
139 // reset to a mutable array.
140 if (resolvedAddressedByName === undefined || resolvedAddressedByName === utils.emptyArray) {
141 resolvedAddressedByName = [];
142 this._resolvedContactPoints.set(name, resolvedAddressedByName);
143 }
144
145 resolvedAddressedByName.push(standardEndpoint);
146 }
147
148 async _parseContactPoint(name) {
149 let addressOrName = name;
150 let port = null;
151
152 if (name.indexOf('[') === 0 && name.indexOf(']:') > 1) {
153 // IPv6 host notation [ip]:port (RFC 3986 section 3.2.2)
154 const index = name.lastIndexOf(']:');
155 addressOrName = name.substr(1, index - 1);
156 port = name.substr(index + 2);
157 } else if (name.indexOf(':') > 0) {
158 // IPv4 or host name with port notation
159 const parts = name.split(':');
160 if (parts.length === 2) {
161 addressOrName = parts[0];
162 port = parts[1];
163 }
164 }
165
166 if (net.isIP(addressOrName)) {
167 this._addContactPoint(addressOrName, port, name, net.isIPv6(addressOrName));
168 return;
169 }
170
171 const addresses = await this._resolveAll(addressOrName);
172 if (addresses.length > 0) {
173 addresses.forEach(addressInfo => this._addContactPoint(addressInfo.address, port, name, addressInfo.isIPv6));
174 } else {
175 // Store that we attempted resolving the name but was not found
176 this._addContactPoint(null, null, name, false);
177 }
178 }
179
180 /**
181 * Initializes the control connection by establishing a Connection using a suitable protocol
182 * version to be used and retrieving cluster metadata.
183 */
184 async init() {
185 if (this.initialized) {
186 // Prevent multiple serial initializations
187 return;
188 }
189
190 if (!this.options.sni) {
191 // Parse and resolve contact points
192 await Promise.all(this.options.contactPoints.map(name => this._parseContactPoint(name)));
193 } else {
194 this.options.contactPoints.forEach(cp => this._contactPoints.add(cp));
195 const address = this.options.sni.address;
196 const separatorIndex = address.lastIndexOf(':');
197
198 if (separatorIndex === -1) {
199 throw new new errors.DriverInternalError('The SNI endpoint address should contain ip/name and port');
200 }
201
202 const nameOrIp = address.substr(0, separatorIndex);
203 this.options.sni.port = address.substr(separatorIndex + 1);
204 this.options.sni.addressResolver = new utils.AddressResolver({ nameOrIp, dns });
205 await this.options.sni.addressResolver.init();
206 }
207
208 if (this._contactPoints.size === 0) {
209 throw new errors.NoHostAvailableError({}, 'No host could be resolved');
210 }
211
212 await this._initializeConnection();
213 }
214
215 _setHealthListeners(host, connection) {
216 const self = this;
217 let wasRefreshCalled = 0;
218
219 function removeListeners() {
220 host.removeListener('down', downOrIgnoredHandler);
221 host.removeListener('ignore', downOrIgnoredHandler);
222 connection.removeListener('socketClose', socketClosedHandler);
223 }
224
225 function startReconnecting(hostDown) {
226 if (wasRefreshCalled++ !== 0) {
227 // Prevent multiple calls to reconnect
228 return;
229 }
230
231 removeListeners();
232
233 if (self._isShuttingDown) {
234 // Don't attempt to reconnect when the ControlConnection is being shutdown
235 return;
236 }
237
238 if (hostDown) {
239 self.log('warning',
240 `Host ${host.address} used by the ControlConnection DOWN, ` +
241 `connection to ${connection.endpointFriendlyName} will not longer be used`);
242 } else {
243 self.log('warning', `Connection to ${connection.endpointFriendlyName} used by the ControlConnection was closed`);
244 }
245
246 promiseUtils.toBackground(self._refresh());
247 }
248
249 function downOrIgnoredHandler() {
250 startReconnecting(true);
251 }
252
253 function socketClosedHandler() {
254 startReconnecting(false);
255 }
256
257 host.once('down', downOrIgnoredHandler);
258 host.once('ignore', downOrIgnoredHandler);
259 connection.once('socketClose', socketClosedHandler);
260 }
261
262 /**
263 * Iterates through the hostIterator and Gets the following open connection.
264 * @param {Iterator<Host>} hostIterator
265 * @returns {Connection!}
266 */
267 _borrowAConnection(hostIterator) {
268 let connection = null;
269
270 while (!connection) {
271 const item = hostIterator.next();
272 const host = item.value;
273
274 if (item.done) {
275 throw new errors.NoHostAvailableError(this._triedHosts);
276 }
277
278 // Only check distance once the load-balancing policies have been initialized
279 const distance = this._profileManager.getDistance(host);
280 if (!host.isUp() || distance === types.distance.ignored) {
281 continue;
282 }
283
284 try {
285 connection = this._borrowHostConnection(host);
286 } catch (err) {
287 this._triedHosts[host.address] = err;
288 }
289 }
290
291 return connection;
292 }
293
294 /**
295 * Iterates through the contact points and tries to open a connection.
296 * @param {Iterator<string>} contactPointsIterator
297 * @returns {Promise<void>}
298 */
299 async _borrowFirstConnection(contactPointsIterator) {
300 let connection = null;
301
302 while (!connection) {
303 const item = contactPointsIterator.next();
304 const contactPoint = item.value;
305
306 if (item.done) {
307 throw new errors.NoHostAvailableError(this._triedHosts);
308 }
309
310 try {
311 connection = await this._createConnection(contactPoint);
312 } catch (err) {
313 this._triedHosts[contactPoint] = err;
314 }
315 }
316
317 if (!connection) {
318 const err = new errors.NoHostAvailableError(this._triedHosts);
319 this.log('error', 'ControlConnection failed to acquire a connection');
320 throw err;
321 }
322
323 this.protocolVersion = connection.protocolVersion;
324 this._encoder = connection.encoder;
325 this.connection = connection;
326 }
327
328 /** Default implementation for borrowing connections, that can be injected at constructor level */
329 _borrowHostConnection(host) {
330 // Borrow any open connection, regardless of the keyspace
331 return host.borrowConnection();
332 }
333
334 /**
335 * Default implementation for creating initial connections, that can be injected at constructor level
336 * @param {String} contactPoint
337 */
338 async _createConnection(contactPoint) {
339 const c = new Connection(contactPoint, null, this.options);
340
341 try {
342 await c.openAsync();
343 } catch (err) {
344 promiseUtils.toBackground(c.closeAsync());
345 throw err;
346 }
347
348 return c;
349 }
350
351 /**
352 * Gets the info from local and peer metadata, reloads the keyspaces metadata and rebuilds tokens.
353 * <p>It throws an error when there's a failure or when reconnecting and there's no connection.</p>
354 * @param {Boolean} initializing Determines whether this function was called in order to initialize the control
355 * connection the first time
356 * @param {Boolean} isReconnecting Determines whether the refresh is being done because the ControlConnection is
357 * switching to use this connection to this host.
358 */
359 async _refreshHosts(initializing, isReconnecting) {
360 // Get a reference to the current connection as it might change from external events
361 const c = this.connection;
362
363 if (!c) {
364 if (isReconnecting) {
365 throw new errors.DriverInternalError('Connection reference has been lost when reconnecting');
366 }
367
368 // it's possible that this was called as a result of a topology change, but the connection was lost
369 // between scheduling time and now. This will be called again when there is a new connection.
370 return;
371 }
372
373 this.log('info', 'Refreshing local and peers info');
374
375 const rsLocal = await c.send(new requests.QueryRequest(selectLocal), null);
376 this._setLocalInfo(initializing, isReconnecting, c, rsLocal);
377
378 if (!this.host) {
379 throw new errors.DriverInternalError('Information from system.local could not be retrieved');
380 }
381
382 const rsPeers = await c.send(new requests.QueryRequest(selectPeers), null);
383 await this.setPeersInfo(initializing, rsPeers);
384
385 if (!this.initialized) {
386 // resolve protocol version from highest common version among hosts.
387 const highestCommon = types.protocolVersion.getHighestCommon(c, this.hosts);
388 const reconnect = highestCommon !== this.protocolVersion;
389
390 // set protocol version on each host.
391 this.protocolVersion = highestCommon;
392 this.hosts.forEach(h => h.setProtocolVersion(this.protocolVersion));
393
394 // if protocol version changed, reconnect the control connection with new version.
395 if (reconnect) {
396 this.log('info', `Reconnecting since the protocol version changed to 0x${highestCommon.toString(16)}`);
397 c.decreaseVersion(this.protocolVersion);
398 await c.closeAsync();
399
400 try {
401 await c.openAsync();
402 } catch (err) {
403 // Close in the background
404 promiseUtils.toBackground(c.closeAsync());
405
406 throw err;
407 }
408 }
409
410 // To acquire metadata we need to specify the cassandra version
411 this.metadata.setCassandraVersion(this.host.getCassandraVersion());
412 this.metadata.buildTokens(this.hosts);
413
414 if (!this.options.isMetadataSyncEnabled) {
415 this.metadata.initialized = true;
416 return;
417 }
418
419 await this.metadata.refreshKeyspacesInternal(false);
420 this.metadata.initialized = true;
421 }
422 }
423
424 async _refreshControlConnection(hostIterator) {
425
426 if (this.options.sni) {
427 this.connection = this._borrowAConnection(hostIterator);
428 }
429 else {
430 try { this.connection = this._borrowAConnection(hostIterator); }
431 catch(err) {
432
433 /* NODEJS-632: refresh nodes before getting hosts for reconnect since some hostnames may have
434 * shifted during the flight. */
435 this.log("info", "ControlConnection could not reconnect using existing connections. Refreshing contact points and retrying");
436 this._contactPoints.clear();
437 this._resolvedContactPoints.clear();
438 await Promise.all(this.options.contactPoints.map(name => this._parseContactPoint(name)));
439 const refreshedContactPoints = Array.from(this._contactPoints).join(',');
440 this.log('info', `Refreshed contact points: ${refreshedContactPoints}`);
441 await this._initializeConnection();
442 }
443 }
444 }
445
446 /**
447 * Acquires a new connection and refreshes topology and keyspace metadata.
448 * <p>When it fails obtaining a connection and there aren't any more hosts, it schedules reconnection.</p>
449 * <p>When it fails obtaining the metadata, it marks connection and/or host unusable and retries using the same
450 * iterator from query plan / host list</p>
451 * @param {Iterator<Host>} [hostIterator]
452 */
453 async _refresh(hostIterator) {
454 if (this._isShuttingDown) {
455 this.log('info', 'The ControlConnection will not be refreshed as the Client is being shutdown');
456 return;
457 }
458
459 // Reset host and connection
460 this.host = null;
461 this.connection = null;
462
463 try {
464 if (!hostIterator) {
465 this.log('info', 'Trying to acquire a connection to a new host');
466 this._triedHosts = {};
467 hostIterator = await promiseUtils.newQueryPlan(this._profileManager.getDefaultLoadBalancing(), null, null);
468 }
469
470 await this._refreshControlConnection(hostIterator);
471 } catch (err) {
472 // There was a failure obtaining a connection or during metadata retrieval
473 this.log('error', 'ControlConnection failed to acquire a connection', err);
474
475 if (!this._isShuttingDown) {
476 const delay = this._reconnectionSchedule.next().value;
477 this.log('warning', `ControlConnection could not reconnect, scheduling reconnection in ${delay}ms`);
478 setTimeout(() => this._refresh(), delay);
479 this.emit('newConnection', err);
480 }
481
482 return;
483 }
484
485 this.log('info',`ControlConnection connected to ${this.connection.endpointFriendlyName}`);
486
487 try {
488 await this._refreshHosts(false, true);
489
490 await this._registerToConnectionEvents();
491 } catch (err) {
492 this.log('error', 'ControlConnection failed to retrieve topology and keyspaces information', err);
493 this._triedHosts[this.connection.endpoint] = err;
494
495 if (err.isSocketError && this.host) {
496 this.host.removeFromPool(this.connection);
497 }
498
499 // Retry the whole thing with the same query plan
500 return await this._refresh(hostIterator);
501 }
502
503 this._reconnectionSchedule = this._reconnectionPolicy.newSchedule();
504 this._setHealthListeners(this.host, this.connection);
505 this.emit('newConnection', null, this.connection, this.host);
506
507 this.log('info', `ControlConnection connected to ${this.connection.endpointFriendlyName} and up to date`);
508 }
509
510 /**
511 * Acquires a connection and refreshes topology and keyspace metadata for the first time.
512 * @returns {Promise<void>}
513 */
514 async _initializeConnection() {
515 this.log('info', 'Getting first connection');
516
517 // Reset host and connection
518 this.host = null;
519 this.connection = null;
520 this._triedHosts = {};
521
522 // Randomize order of contact points resolved.
523 const contactPointsIterator = utils.shuffleArray(Array.from(this._contactPoints))[Symbol.iterator]();
524
525 while (true) {
526 await this._borrowFirstConnection(contactPointsIterator);
527
528 this.log('info', `ControlConnection using protocol version 0x${
529 this.protocolVersion.toString(16)}, connected to ${this.connection.endpointFriendlyName}`);
530
531 try {
532 await this._getSupportedOptions();
533 await this._refreshHosts(true, true);
534 await this._registerToConnectionEvents();
535
536 // We have a valid connection, leave the loop
537 break;
538
539 } catch (err) {
540 this.log('error', 'ControlConnection failed to retrieve topology and keyspaces information', err);
541 this._triedHosts[this.connection.endpoint] = err;
542 }
543 }
544
545 // The healthy connection used to initialize should be part of the Host pool
546 this.host.pool.addExistingConnection(this.connection);
547
548 this.initialized = true;
549 this._setHealthListeners(this.host, this.connection);
550 this.log('info', `ControlConnection connected to ${this.connection.endpointFriendlyName}`);
551 }
552
553 async _getSupportedOptions() {
554 const response = await this.connection.send(requests.options, null);
555
556 // response.supported is a string multi map, decoded as an Object.
557 const productType = response.supported && response.supported[supportedProductTypeKey];
558 if (Array.isArray(productType) && productType[0] === supportedDbaas) {
559 this.metadata.setProductTypeAsDbaas();
560 }
561 }
562
563 async _registerToConnectionEvents() {
564 this.connection.on('nodeTopologyChange', this._nodeTopologyChangeHandler.bind(this));
565 this.connection.on('nodeStatusChange', this._nodeStatusChangeHandler.bind(this));
566 this.connection.on('nodeSchemaChange', this._nodeSchemaChangeHandler.bind(this));
567 const request = new requests.RegisterRequest(['TOPOLOGY_CHANGE', 'STATUS_CHANGE', 'SCHEMA_CHANGE']);
568 await this.connection.send(request, null);
569 }
570
571 /**
572 * Handles a TOPOLOGY_CHANGE event
573 */
574 _nodeTopologyChangeHandler(event) {
575 this.log('info', 'Received topology change', event);
576
577 // all hosts information needs to be refreshed as tokens might have changed
578 clearTimeout(this._topologyChangeTimeout);
579
580 // Use an additional timer to make sure that the refresh hosts is executed only AFTER the delay
581 // In this case, the event debouncer doesn't help because it could not honor the sliding delay (ie: processNow)
582 this._topologyChangeTimeout = setTimeout(() =>
583 promiseUtils.toBackground(this._scheduleRefreshHosts()), newNodeDelay);
584 }
585
586 /**
587 * Handles a STATUS_CHANGE event
588 */
589 _nodeStatusChangeHandler(event) {
590 const self = this;
591 const addressToTranslate = event.inet.address.toString();
592 const port = this.options.protocolOptions.port;
593 this._addressTranslator.translate(addressToTranslate, port, function translateCallback(endPoint) {
594 const host = self.hosts.get(endPoint);
595 if (!host) {
596 self.log('warning', 'Received status change event but host was not found: ' + addressToTranslate);
597 return;
598 }
599 const distance = self._profileManager.getDistance(host);
600 if (event.up) {
601 if (distance === types.distance.ignored) {
602 return host.setUp(true);
603 }
604 clearTimeout(self._nodeStatusChangeTimeout);
605 // Waits a couple of seconds before marking it as UP
606 self._nodeStatusChangeTimeout = setTimeout(() => host.checkIsUp(), newNodeDelay);
607 return;
608 }
609 // marked as down
610 if (distance === types.distance.ignored) {
611 return host.setDown();
612 }
613 self.log('warning', 'Received status change to DOWN for host ' + host.address);
614 });
615 }
616
617 /**
618 * Handles a SCHEMA_CHANGE event
619 */
620 _nodeSchemaChangeHandler(event) {
621 this.log('info', 'Schema change', event);
622 if (!this.options.isMetadataSyncEnabled) {
623 return;
624 }
625
626 promiseUtils.toBackground(this.handleSchemaChange(event, false));
627 }
628
629 /**
630 * Schedules metadata refresh and callbacks when is refreshed.
631 * @param {{keyspace: string, isKeyspace: boolean, schemaChangeType, table, udt, functionName, aggregate}} event
632 * @param {Boolean} processNow
633 * @returns {Promise<void>}
634 */
635 handleSchemaChange(event, processNow) {
636 const self = this;
637 let handler, cqlObject;
638
639 if (event.isKeyspace) {
640 if (event.schemaChangeType === schemaChangeTypes.dropped) {
641 handler = function removeKeyspace() {
642 // if on the same event queue there is a creation, this handler is not going to be executed
643 // it is safe to remove the keyspace metadata
644 delete self.metadata.keyspaces[event.keyspace];
645 };
646
647 return this._scheduleObjectRefresh(handler, event.keyspace, null, processNow);
648 }
649
650 return this._scheduleKeyspaceRefresh(event.keyspace, processNow);
651 }
652
653 const ksInfo = this.metadata.keyspaces[event.keyspace];
654 if (!ksInfo) {
655 // it hasn't been loaded and it is not part of the metadata, don't mind
656 return Promise.resolve();
657 }
658
659 if (event.table) {
660 cqlObject = event.table;
661 handler = function clearTableState() {
662 delete ksInfo.tables[event.table];
663 delete ksInfo.views[event.table];
664 };
665 }
666 else if (event.udt) {
667 cqlObject = event.udt;
668 handler = function clearUdtState() {
669 delete ksInfo.udts[event.udt];
670 };
671 }
672 else if (event.functionName) {
673 cqlObject = event.functionName;
674 handler = function clearFunctionState() {
675 delete ksInfo.functions[event.functionName];
676 };
677 }
678 else if (event.aggregate) {
679 cqlObject = event.aggregate;
680 handler = function clearKeyspaceState() {
681 delete ksInfo.aggregates[event.aggregate];
682 };
683 }
684
685 if (!handler) {
686 // Forward compatibility
687 return Promise.resolve();
688 }
689
690 // It's a cql object change clean the internal cache
691 return this._scheduleObjectRefresh(handler, event.keyspace, cqlObject, processNow);
692 }
693
694 /**
695 * @param {Function} handler
696 * @param {String} keyspace
697 * @param {String} cqlObject
698 * @param {Boolean} processNow
699 * @returns {Promise<void>}
700 */
701 _scheduleObjectRefresh(handler, keyspace, cqlObject, processNow) {
702 return this._debouncer.eventReceived({ handler, keyspace, cqlObject }, processNow);
703 }
704
705 /**
706 * @param {String} keyspace
707 * @param {Boolean} processNow
708 * @returns {Promise<void>}
709 */
710 _scheduleKeyspaceRefresh(keyspace, processNow) {
711 return this._debouncer.eventReceived({
712 handler: () => this.metadata.refreshKeyspace(keyspace),
713 keyspace
714 }, processNow);
715 }
716
717 /** @returns {Promise<void>} */
718 _scheduleRefreshHosts() {
719 return this._debouncer.eventReceived({
720 handler: () => this._refreshHosts(false, false),
721 all: true
722 }, false);
723 }
724
725 /**
726 * Sets the information for the host used by the control connection.
727 * @param {Boolean} initializing
728 * @param {Connection} c
729 * @param {Boolean} setCurrentHost Determines if the host retrieved must be set as the current host
730 * @param result
731 */
732 _setLocalInfo(initializing, setCurrentHost, c, result) {
733 if (!result || !result.rows || !result.rows.length) {
734 this.log('warning', 'No local info could be obtained');
735 return;
736 }
737
738 const row = result.rows[0];
739
740 let localHost;
741
742 // Note that with SNI enabled, we can trust that rpc_address will contain a valid value.
743 const endpoint = !this.options.sni
744 ? c.endpoint
745 : `${row['rpc_address']}:${this.options.protocolOptions.port}`;
746
747 if (initializing) {
748 localHost = new Host(endpoint, this.protocolVersion, this.options, this.metadata);
749 this.hosts.set(endpoint, localHost);
750 this.log('info', `Adding host ${endpoint}`);
751 } else {
752 localHost = this.hosts.get(endpoint);
753
754 if (!localHost) {
755 this.log('error', 'Localhost could not be found');
756 return;
757 }
758 }
759
760 localHost.datacenter = row['data_center'];
761 localHost.rack = row['rack'];
762 localHost.tokens = row['tokens'];
763 localHost.hostId = row['host_id'];
764 localHost.cassandraVersion = row['release_version'];
765 setDseParameters(localHost, row);
766 this.metadata.setPartitioner(row['partitioner']);
767 this.log('info', 'Local info retrieved');
768
769 if (setCurrentHost) {
770 // Set the host as the one being used by the ControlConnection.
771 this.host = localHost;
772 }
773 }
774
775 /**
776 * @param {Boolean} initializing Determines whether this function was called in order to initialize the control
777 * connection the first time.
778 * @param {ResultSet} result
779 */
780 async setPeersInfo(initializing, result) {
781 if (!result || !result.rows) {
782 return;
783 }
784
785 // A map of peers, could useful for in case there are discrepancies
786 const peers = {};
787 const port = this.options.protocolOptions.port;
788 const foundDataCenters = new Set();
789
790 if (this.host && this.host.datacenter) {
791 foundDataCenters.add(this.host.datacenter);
792 }
793
794 for (const row of result.rows) {
795 const endpoint = await this.getAddressForPeerHost(row, port);
796
797 if (!endpoint) {
798 continue;
799 }
800
801 peers[endpoint] = true;
802 let host = this.hosts.get(endpoint);
803 let isNewHost = !host;
804
805 if (isNewHost) {
806 host = new Host(endpoint, this.protocolVersion, this.options, this.metadata);
807 this.log('info', `Adding host ${endpoint}`);
808 isNewHost = true;
809 }
810
811 host.datacenter = row['data_center'];
812 host.rack = row['rack'];
813 host.tokens = row['tokens'];
814 host.hostId = row['host_id'];
815 host.cassandraVersion = row['release_version'];
816 setDseParameters(host, row);
817
818 if (host.datacenter) {
819 foundDataCenters.add(host.datacenter);
820 }
821
822 if (isNewHost) {
823 // Add it to the map (and trigger events) after all the properties
824 // were set to avoid race conditions
825 this.hosts.set(endpoint, host);
826
827 if (!initializing) {
828 // Set the distance at Host level, that way the connection pool is created with the correct settings
829 this._profileManager.getDistance(host);
830
831 // When we are not initializing, we start with the node set as DOWN
832 host.setDown();
833 }
834 }
835 }
836
837 // Is there a difference in number between peers + local != hosts
838 if (this.hosts.length > result.rows.length + 1) {
839 // There are hosts in the current state that don't belong (nodes removed or wrong contactPoints)
840 this.log('info', 'Removing nodes from the pool');
841 const toRemove = [];
842
843 this.hosts.forEach(h => {
844 //It is not a peer and it is not local host
845 if (!peers[h.address] && h !== this.host) {
846 this.log('info', 'Removing host ' + h.address);
847 toRemove.push(h.address);
848 h.shutdown(true);
849 }
850 });
851
852 this.hosts.removeMultiple(toRemove);
853 }
854
855 if (initializing && this.options.localDataCenter) {
856 const localDc = this.options.localDataCenter;
857
858 if (!foundDataCenters.has(localDc)) {
859 throw new errors.ArgumentError(`localDataCenter was configured as '${
860 localDc}', but only found hosts in data centers: [${Array.from(foundDataCenters).join(', ')}]`);
861 }
862 }
863
864 this.log('info', 'Peers info retrieved');
865 }
866
867 /**
868 * Gets the address from a peers row and translates the address.
869 * @param {Object|Row} row
870 * @param {Number} defaultPort
871 * @returns {Promise<string>}
872 */
873 getAddressForPeerHost(row, defaultPort) {
874 return new Promise(resolve => {
875 let address = row['rpc_address'];
876 const peer = row['peer'];
877 const bindAllAddress = '0.0.0.0';
878
879 if (!address) {
880 this.log('error', f('No rpc_address found for host %s in %s\'s peers system table. %s will be ignored.',
881 peer, this.host.address, peer));
882 return resolve(null);
883 }
884
885 if (address.toString() === bindAllAddress) {
886 this.log('warning', f('Found host with 0.0.0.0 as rpc_address, using listen_address (%s) to contact it instead.' +
887 ' If this is incorrect you should avoid the use of 0.0.0.0 server side.', peer));
888 address = peer;
889 }
890
891 this._addressTranslator.translate(address.toString(), defaultPort, resolve);
892 });
893 }
894
895 /**
896 * Uses the DNS protocol to resolve a IPv4 and IPv6 addresses (A and AAAA records) for the hostname.
897 * It returns an Array of addresses that can be empty and logs the error.
898 * @private
899 * @param name
900 */
901 async _resolveAll(name) {
902 const addresses = [];
903 const resolve4 = util.promisify(dns.resolve4);
904 const resolve6 = util.promisify(dns.resolve6);
905 const lookup = util.promisify(dns.lookup);
906
907 // Ignore errors for resolve calls
908 const ipv4Promise = resolve4(name).catch(() => {}).then(r => r || utils.emptyArray);
909 const ipv6Promise = resolve6(name).catch(() => {}).then(r => r || utils.emptyArray);
910
911 let arr;
912 arr = await ipv4Promise;
913 arr.forEach(address => addresses.push({address, isIPv6: false}));
914
915 arr = await ipv6Promise;
916 arr.forEach(address => addresses.push({address, isIPv6: true}));
917
918 if (addresses.length === 0) {
919 // In case dns.resolve*() methods don't yield a valid address for the host name
920 // Use system call getaddrinfo() that might resolve according to host system definitions
921 try {
922 arr = await lookup(name, { all: true });
923 arr.forEach(({address, family}) => addresses.push({address, isIPv6: family === 6}));
924 } catch (err) {
925 this.log('error', `Host with name ${name} could not be resolved`, err);
926 }
927 }
928
929 return addresses;
930 }
931
932 /**
933 * Waits for a connection to be available. If timeout expires before getting a connection it callbacks in error.
934 * @returns {Promise<void>}
935 */
936 _waitForReconnection() {
937 return new Promise((resolve, reject) => {
938 const callback = promiseUtils.getCallback(resolve, reject);
939
940 // eslint-disable-next-line prefer-const
941 let timeout;
942
943 function newConnectionListener(err) {
944 clearTimeout(timeout);
945 callback(err);
946 }
947
948 this.once('newConnection', newConnectionListener);
949
950 timeout = setTimeout(() => {
951 this.removeListener('newConnection', newConnectionListener);
952 callback(new errors.OperationTimedOutError('A connection could not be acquired before timeout.'));
953 }, metadataQueryAbortTimeout);
954 });
955 }
956
957 /**
958 * Executes a query using the active connection
959 * @param {String|Request} cqlQuery
960 * @param {Boolean} [waitReconnect] Determines if it should wait for reconnection in case the control connection is not
961 * connected at the moment. Default: true.
962 */
963 async query(cqlQuery, waitReconnect = true) {
964 const queryOnConnection = async () => {
965 if (!this.connection || this._isShuttingDown) {
966 throw new errors.NoHostAvailableError({}, 'ControlConnection is not connected at the time');
967 }
968
969 const request = typeof cqlQuery === 'string' ? new requests.QueryRequest(cqlQuery, null, null) : cqlQuery;
970 return await this.connection.send(request, null);
971 };
972
973 if (!this.connection && waitReconnect) {
974 // Wait until its reconnected (or timer elapses)
975 await this._waitForReconnection();
976 }
977
978 return await queryOnConnection();
979 }
980
981 /** @returns {Encoder} The encoder used by the current connection */
982 getEncoder() {
983 if (!this._encoder) {
984 throw new errors.DriverInternalError('Encoder is not defined');
985 }
986 return this._encoder;
987 }
988
989 /**
990 * Cancels all timers and shuts down synchronously.
991 */
992 shutdown() {
993 this._isShuttingDown = true;
994 this._debouncer.shutdown();
995 // Emit a "newConnection" event with Error, as it may clear timeouts that were waiting new connections
996 this.emit('newConnection', new errors.DriverError('ControlConnection is being shutdown'));
997 // Cancel timers
998 clearTimeout(this._topologyChangeTimeout);
999 clearTimeout(this._nodeStatusChangeTimeout);
1000 }
1001
1002 /**
1003 * Resets the Connection to its initial state.
1004 */
1005 async reset() {
1006 // Reset the internal state of the ControlConnection for future initialization attempts
1007 const currentHosts = this.hosts.clear();
1008
1009 // Set the shutting down flag temporarily to avoid reconnects.
1010 this._isShuttingDown = true;
1011
1012 // Shutdown all individual pools, ignoring any shutdown error
1013 await Promise.all(currentHosts.map(h => h.shutdown()));
1014
1015 this.initialized = false;
1016 this._isShuttingDown = false;
1017 }
1018
1019 /**
1020 * Gets a Map containing the original contact points and the addresses that each one resolved to.
1021 */
1022 getResolvedContactPoints() {
1023 return this._resolvedContactPoints;
1024 }
1025
1026 /**
1027 * Gets the local IP address to which the control connection socket is bound to.
1028 * @returns {String|undefined}
1029 */
1030 getLocalAddress() {
1031 if (!this.connection) {
1032 return undefined;
1033 }
1034
1035 return this.connection.getLocalAddress();
1036 }
1037
1038 /**
1039 * Gets the address and port of host the control connection is connected to.
1040 * @returns {String|undefined}
1041 */
1042 getEndpoint() {
1043 if (!this.connection) {
1044 return undefined;
1045 }
1046
1047 return this.connection.endpoint;
1048 }
1049}
1050
1051/**
1052 * Parses the DSE workload and assigns it to a host.
1053 * @param {Host} host
1054 * @param {Row} row
1055 * @private
1056 */
1057function setDseParameters(host, row) {
1058 if (row['workloads'] !== undefined) {
1059 host.workloads = row['workloads'];
1060 }
1061 else if (row['workload']) {
1062 host.workloads = [ row['workload'] ];
1063 }
1064 else {
1065 host.workloads = utils.emptyArray;
1066 }
1067
1068 if (row['dse_version'] !== undefined) {
1069 host.dseVersion = row['dse_version'];
1070 }
1071}
1072
1073module.exports = ControlConnection;