1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 | 'use strict';
|
17 | const events = require('events');
|
18 | const util = require('util');
|
19 | const net = require('net');
|
20 | const dns = require('dns');
|
21 |
|
22 | const errors = require('./errors');
|
23 | const { Host, HostMap } = require('./host');
|
24 | const Metadata = require('./metadata');
|
25 | const EventDebouncer = require('./metadata/event-debouncer');
|
26 | const Connection = require('./connection');
|
27 | const requests = require('./requests');
|
28 | const utils = require('./utils');
|
29 | const types = require('./types');
|
30 | const promiseUtils = require('./promise-utils');
|
31 | const f = util.format;
|
32 |
|
33 | const selectPeers = "SELECT * FROM system.peers";
|
34 | const selectLocal = "SELECT * FROM system.local WHERE key='local'";
|
35 | const newNodeDelay = 1000;
|
36 | const metadataQueryAbortTimeout = 2000;
|
37 | const schemaChangeTypes = {
|
38 | created: 'CREATED',
|
39 | updated: 'UPDATED',
|
40 | dropped: 'DROPPED'
|
41 | };
|
42 | const supportedProductTypeKey = 'PRODUCT_TYPE';
|
43 | const supportedDbaas = 'DATASTAX_APOLLO';
|
44 |
|
45 |
|
46 |
|
47 |
|
48 |
|
49 | class ControlConnection extends events.EventEmitter {
|
50 |
|
51 | |
52 |
|
53 |
|
54 |
|
55 |
|
56 |
|
57 |
|
58 | constructor(options, profileManager, context) {
|
59 | super();
|
60 |
|
61 | this.protocolVersion = null;
|
62 | this.hosts = new HostMap();
|
63 | this.setMaxListeners(0);
|
64 | this.log = utils.log;
|
65 | Object.defineProperty(this, "options", { value: options, enumerable: false, writable: false});
|
66 |
|
67 | |
68 |
|
69 |
|
70 | this.metadata = new Metadata(this.options, this);
|
71 | this.initialized = false;
|
72 |
|
73 | |
74 |
|
75 |
|
76 |
|
77 | this.host = null;
|
78 |
|
79 | |
80 |
|
81 |
|
82 |
|
83 | this.connection = null;
|
84 |
|
85 | this._addressTranslator = this.options.policies.addressResolution;
|
86 | this._reconnectionPolicy = this.options.policies.reconnection;
|
87 | this._reconnectionSchedule = this._reconnectionPolicy.newSchedule();
|
88 | this._isShuttingDown = false;
|
89 |
|
90 |
|
91 | this._encoder = null;
|
92 | this._debouncer = new EventDebouncer(options.refreshSchemaDelay, this.log.bind(this));
|
93 | this._profileManager = profileManager;
|
94 | this._triedHosts = null;
|
95 | this._resolvedContactPoints = new Map();
|
96 | this._contactPoints = new Set();
|
97 |
|
98 |
|
99 | this._topologyChangeTimeout = null;
|
100 |
|
101 | this._nodeStatusChangeTimeout = null;
|
102 |
|
103 | if (context && context.borrowHostConnection) {
|
104 | this._borrowHostConnection = context.borrowHostConnection;
|
105 | }
|
106 |
|
107 | if (context && context.createConnection) {
|
108 | this._createConnection = context.createConnection;
|
109 | }
|
110 | }
|
111 |
|
112 | |
113 |
|
114 |
|
115 |
|
116 |
|
117 |
|
118 |
|
119 | _addContactPoint(address, port, name, isIPv6) {
|
120 | if (address === null) {
|
121 |
|
122 | this._resolvedContactPoints.set(name, utils.emptyArray);
|
123 | return;
|
124 | }
|
125 |
|
126 | const portNumber = parseInt(port, 10) || this.options.protocolOptions.port;
|
127 | const endpoint = `${address}:${portNumber}`;
|
128 | this._contactPoints.add(endpoint);
|
129 |
|
130 |
|
131 | const standardEndpoint = !isIPv6 ? endpoint : `[${address}]:${portNumber}`;
|
132 |
|
133 | let resolvedAddressedByName = this._resolvedContactPoints.get(name);
|
134 |
|
135 |
|
136 |
|
137 |
|
138 |
|
139 |
|
140 | if (resolvedAddressedByName === undefined || resolvedAddressedByName === utils.emptyArray) {
|
141 | resolvedAddressedByName = [];
|
142 | this._resolvedContactPoints.set(name, resolvedAddressedByName);
|
143 | }
|
144 |
|
145 | resolvedAddressedByName.push(standardEndpoint);
|
146 | }
|
147 |
|
148 | async _parseContactPoint(name) {
|
149 | let addressOrName = name;
|
150 | let port = null;
|
151 |
|
152 | if (name.indexOf('[') === 0 && name.indexOf(']:') > 1) {
|
153 |
|
154 | const index = name.lastIndexOf(']:');
|
155 | addressOrName = name.substr(1, index - 1);
|
156 | port = name.substr(index + 2);
|
157 | } else if (name.indexOf(':') > 0) {
|
158 |
|
159 | const parts = name.split(':');
|
160 | if (parts.length === 2) {
|
161 | addressOrName = parts[0];
|
162 | port = parts[1];
|
163 | }
|
164 | }
|
165 |
|
166 | if (net.isIP(addressOrName)) {
|
167 | this._addContactPoint(addressOrName, port, name, net.isIPv6(addressOrName));
|
168 | return;
|
169 | }
|
170 |
|
171 | const addresses = await this._resolveAll(addressOrName);
|
172 | if (addresses.length > 0) {
|
173 | addresses.forEach(addressInfo => this._addContactPoint(addressInfo.address, port, name, addressInfo.isIPv6));
|
174 | } else {
|
175 |
|
176 | this._addContactPoint(null, null, name, false);
|
177 | }
|
178 | }
|
179 |
|
180 | |
181 |
|
182 |
|
183 |
|
184 | async init() {
|
185 | if (this.initialized) {
|
186 |
|
187 | return;
|
188 | }
|
189 |
|
190 | if (!this.options.sni) {
|
191 |
|
192 | await Promise.all(this.options.contactPoints.map(name => this._parseContactPoint(name)));
|
193 | } else {
|
194 | this.options.contactPoints.forEach(cp => this._contactPoints.add(cp));
|
195 | const address = this.options.sni.address;
|
196 | const separatorIndex = address.lastIndexOf(':');
|
197 |
|
198 | if (separatorIndex === -1) {
|
199 | throw new new errors.DriverInternalError('The SNI endpoint address should contain ip/name and port');
|
200 | }
|
201 |
|
202 | const nameOrIp = address.substr(0, separatorIndex);
|
203 | this.options.sni.port = address.substr(separatorIndex + 1);
|
204 | this.options.sni.addressResolver = new utils.AddressResolver({ nameOrIp, dns });
|
205 | await this.options.sni.addressResolver.init();
|
206 | }
|
207 |
|
208 | if (this._contactPoints.size === 0) {
|
209 | throw new errors.NoHostAvailableError({}, 'No host could be resolved');
|
210 | }
|
211 |
|
212 | await this._initializeConnection();
|
213 | }
|
214 |
|
215 | _setHealthListeners(host, connection) {
|
216 | const self = this;
|
217 | let wasRefreshCalled = 0;
|
218 |
|
219 | function removeListeners() {
|
220 | host.removeListener('down', downOrIgnoredHandler);
|
221 | host.removeListener('ignore', downOrIgnoredHandler);
|
222 | connection.removeListener('socketClose', socketClosedHandler);
|
223 | }
|
224 |
|
225 | function startReconnecting(hostDown) {
|
226 | if (wasRefreshCalled++ !== 0) {
|
227 |
|
228 | return;
|
229 | }
|
230 |
|
231 | removeListeners();
|
232 |
|
233 | if (self._isShuttingDown) {
|
234 |
|
235 | return;
|
236 | }
|
237 |
|
238 | if (hostDown) {
|
239 | self.log('warning',
|
240 | `Host ${host.address} used by the ControlConnection DOWN, ` +
|
241 | `connection to ${connection.endpointFriendlyName} will not longer be used`);
|
242 | } else {
|
243 | self.log('warning', `Connection to ${connection.endpointFriendlyName} used by the ControlConnection was closed`);
|
244 | }
|
245 |
|
246 | promiseUtils.toBackground(self._refresh());
|
247 | }
|
248 |
|
249 | function downOrIgnoredHandler() {
|
250 | startReconnecting(true);
|
251 | }
|
252 |
|
253 | function socketClosedHandler() {
|
254 | startReconnecting(false);
|
255 | }
|
256 |
|
257 | host.once('down', downOrIgnoredHandler);
|
258 | host.once('ignore', downOrIgnoredHandler);
|
259 | connection.once('socketClose', socketClosedHandler);
|
260 | }
|
261 |
|
262 | |
263 |
|
264 |
|
265 |
|
266 |
|
267 | _borrowAConnection(hostIterator) {
|
268 | let connection = null;
|
269 |
|
270 | while (!connection) {
|
271 | const item = hostIterator.next();
|
272 | const host = item.value;
|
273 |
|
274 | if (item.done) {
|
275 | throw new errors.NoHostAvailableError(this._triedHosts);
|
276 | }
|
277 |
|
278 |
|
279 | const distance = this._profileManager.getDistance(host);
|
280 | if (!host.isUp() || distance === types.distance.ignored) {
|
281 | continue;
|
282 | }
|
283 |
|
284 | try {
|
285 | connection = this._borrowHostConnection(host);
|
286 | } catch (err) {
|
287 | this._triedHosts[host.address] = err;
|
288 | }
|
289 | }
|
290 |
|
291 | return connection;
|
292 | }
|
293 |
|
294 | |
295 |
|
296 |
|
297 |
|
298 |
|
299 | async _borrowFirstConnection(contactPointsIterator) {
|
300 | let connection = null;
|
301 |
|
302 | while (!connection) {
|
303 | const item = contactPointsIterator.next();
|
304 | const contactPoint = item.value;
|
305 |
|
306 | if (item.done) {
|
307 | throw new errors.NoHostAvailableError(this._triedHosts);
|
308 | }
|
309 |
|
310 | try {
|
311 | connection = await this._createConnection(contactPoint);
|
312 | } catch (err) {
|
313 | this._triedHosts[contactPoint] = err;
|
314 | }
|
315 | }
|
316 |
|
317 | if (!connection) {
|
318 | const err = new errors.NoHostAvailableError(this._triedHosts);
|
319 | this.log('error', 'ControlConnection failed to acquire a connection');
|
320 | throw err;
|
321 | }
|
322 |
|
323 | this.protocolVersion = connection.protocolVersion;
|
324 | this._encoder = connection.encoder;
|
325 | this.connection = connection;
|
326 | }
|
327 |
|
328 |
|
329 | _borrowHostConnection(host) {
|
330 |
|
331 | return host.borrowConnection();
|
332 | }
|
333 |
|
334 | |
335 |
|
336 |
|
337 |
|
338 | async _createConnection(contactPoint) {
|
339 | const c = new Connection(contactPoint, null, this.options);
|
340 |
|
341 | try {
|
342 | await c.openAsync();
|
343 | } catch (err) {
|
344 | promiseUtils.toBackground(c.closeAsync());
|
345 | throw err;
|
346 | }
|
347 |
|
348 | return c;
|
349 | }
|
350 |
|
351 | |
352 |
|
353 |
|
354 |
|
355 |
|
356 |
|
357 |
|
358 |
|
359 | async _refreshHosts(initializing, isReconnecting) {
|
360 |
|
361 | const c = this.connection;
|
362 |
|
363 | if (!c) {
|
364 | if (isReconnecting) {
|
365 | throw new errors.DriverInternalError('Connection reference has been lost when reconnecting');
|
366 | }
|
367 |
|
368 |
|
369 |
|
370 | return;
|
371 | }
|
372 |
|
373 | this.log('info', 'Refreshing local and peers info');
|
374 |
|
375 | const rsLocal = await c.send(new requests.QueryRequest(selectLocal), null);
|
376 | this._setLocalInfo(initializing, isReconnecting, c, rsLocal);
|
377 |
|
378 | if (!this.host) {
|
379 | throw new errors.DriverInternalError('Information from system.local could not be retrieved');
|
380 | }
|
381 |
|
382 | const rsPeers = await c.send(new requests.QueryRequest(selectPeers), null);
|
383 | await this.setPeersInfo(initializing, rsPeers);
|
384 |
|
385 | if (!this.initialized) {
|
386 |
|
387 | const highestCommon = types.protocolVersion.getHighestCommon(c, this.hosts);
|
388 | const reconnect = highestCommon !== this.protocolVersion;
|
389 |
|
390 |
|
391 | this.protocolVersion = highestCommon;
|
392 | this.hosts.forEach(h => h.setProtocolVersion(this.protocolVersion));
|
393 |
|
394 |
|
395 | if (reconnect) {
|
396 | this.log('info', `Reconnecting since the protocol version changed to 0x${highestCommon.toString(16)}`);
|
397 | c.decreaseVersion(this.protocolVersion);
|
398 | await c.closeAsync();
|
399 |
|
400 | try {
|
401 | await c.openAsync();
|
402 | } catch (err) {
|
403 |
|
404 | promiseUtils.toBackground(c.closeAsync());
|
405 |
|
406 | throw err;
|
407 | }
|
408 | }
|
409 |
|
410 |
|
411 | this.metadata.setCassandraVersion(this.host.getCassandraVersion());
|
412 | this.metadata.buildTokens(this.hosts);
|
413 |
|
414 | if (!this.options.isMetadataSyncEnabled) {
|
415 | this.metadata.initialized = true;
|
416 | return;
|
417 | }
|
418 |
|
419 | await this.metadata.refreshKeyspacesInternal(false);
|
420 | this.metadata.initialized = true;
|
421 | }
|
422 | }
|
423 |
|
424 | async _refreshControlConnection(hostIterator) {
|
425 |
|
426 | if (this.options.sni) {
|
427 | this.connection = this._borrowAConnection(hostIterator);
|
428 | }
|
429 | else {
|
430 | try { this.connection = this._borrowAConnection(hostIterator); }
|
431 | catch(err) {
|
432 |
|
433 | |
434 |
|
435 | this.log("info", "ControlConnection could not reconnect using existing connections. Refreshing contact points and retrying");
|
436 | this._contactPoints.clear();
|
437 | this._resolvedContactPoints.clear();
|
438 | await Promise.all(this.options.contactPoints.map(name => this._parseContactPoint(name)));
|
439 | const refreshedContactPoints = Array.from(this._contactPoints).join(',');
|
440 | this.log('info', `Refreshed contact points: ${refreshedContactPoints}`);
|
441 | await this._initializeConnection();
|
442 | }
|
443 | }
|
444 | }
|
445 |
|
446 | |
447 |
|
448 |
|
449 |
|
450 |
|
451 |
|
452 |
|
453 | async _refresh(hostIterator) {
|
454 | if (this._isShuttingDown) {
|
455 | this.log('info', 'The ControlConnection will not be refreshed as the Client is being shutdown');
|
456 | return;
|
457 | }
|
458 |
|
459 |
|
460 | this.host = null;
|
461 | this.connection = null;
|
462 |
|
463 | try {
|
464 | if (!hostIterator) {
|
465 | this.log('info', 'Trying to acquire a connection to a new host');
|
466 | this._triedHosts = {};
|
467 | hostIterator = await promiseUtils.newQueryPlan(this._profileManager.getDefaultLoadBalancing(), null, null);
|
468 | }
|
469 |
|
470 | await this._refreshControlConnection(hostIterator);
|
471 | } catch (err) {
|
472 |
|
473 | this.log('error', 'ControlConnection failed to acquire a connection', err);
|
474 |
|
475 | if (!this._isShuttingDown) {
|
476 | const delay = this._reconnectionSchedule.next().value;
|
477 | this.log('warning', `ControlConnection could not reconnect, scheduling reconnection in ${delay}ms`);
|
478 | setTimeout(() => this._refresh(), delay);
|
479 | this.emit('newConnection', err);
|
480 | }
|
481 |
|
482 | return;
|
483 | }
|
484 |
|
485 | this.log('info',`ControlConnection connected to ${this.connection.endpointFriendlyName}`);
|
486 |
|
487 | try {
|
488 | await this._refreshHosts(false, true);
|
489 |
|
490 | await this._registerToConnectionEvents();
|
491 | } catch (err) {
|
492 | this.log('error', 'ControlConnection failed to retrieve topology and keyspaces information', err);
|
493 | this._triedHosts[this.connection.endpoint] = err;
|
494 |
|
495 | if (err.isSocketError && this.host) {
|
496 | this.host.removeFromPool(this.connection);
|
497 | }
|
498 |
|
499 |
|
500 | return await this._refresh(hostIterator);
|
501 | }
|
502 |
|
503 | this._reconnectionSchedule = this._reconnectionPolicy.newSchedule();
|
504 | this._setHealthListeners(this.host, this.connection);
|
505 | this.emit('newConnection', null, this.connection, this.host);
|
506 |
|
507 | this.log('info', `ControlConnection connected to ${this.connection.endpointFriendlyName} and up to date`);
|
508 | }
|
509 |
|
510 | |
511 |
|
512 |
|
513 |
|
514 | async _initializeConnection() {
|
515 | this.log('info', 'Getting first connection');
|
516 |
|
517 |
|
518 | this.host = null;
|
519 | this.connection = null;
|
520 | this._triedHosts = {};
|
521 |
|
522 |
|
523 | const contactPointsIterator = utils.shuffleArray(Array.from(this._contactPoints))[Symbol.iterator]();
|
524 |
|
525 | while (true) {
|
526 | await this._borrowFirstConnection(contactPointsIterator);
|
527 |
|
528 | this.log('info', `ControlConnection using protocol version 0x${
|
529 | this.protocolVersion.toString(16)}, connected to ${this.connection.endpointFriendlyName}`);
|
530 |
|
531 | try {
|
532 | await this._getSupportedOptions();
|
533 | await this._refreshHosts(true, true);
|
534 | await this._registerToConnectionEvents();
|
535 |
|
536 |
|
537 | break;
|
538 |
|
539 | } catch (err) {
|
540 | this.log('error', 'ControlConnection failed to retrieve topology and keyspaces information', err);
|
541 | this._triedHosts[this.connection.endpoint] = err;
|
542 | }
|
543 | }
|
544 |
|
545 |
|
546 | this.host.pool.addExistingConnection(this.connection);
|
547 |
|
548 | this.initialized = true;
|
549 | this._setHealthListeners(this.host, this.connection);
|
550 | this.log('info', `ControlConnection connected to ${this.connection.endpointFriendlyName}`);
|
551 | }
|
552 |
|
553 | async _getSupportedOptions() {
|
554 | const response = await this.connection.send(requests.options, null);
|
555 |
|
556 |
|
557 | const productType = response.supported && response.supported[supportedProductTypeKey];
|
558 | if (Array.isArray(productType) && productType[0] === supportedDbaas) {
|
559 | this.metadata.setProductTypeAsDbaas();
|
560 | }
|
561 | }
|
562 |
|
563 | async _registerToConnectionEvents() {
|
564 | this.connection.on('nodeTopologyChange', this._nodeTopologyChangeHandler.bind(this));
|
565 | this.connection.on('nodeStatusChange', this._nodeStatusChangeHandler.bind(this));
|
566 | this.connection.on('nodeSchemaChange', this._nodeSchemaChangeHandler.bind(this));
|
567 | const request = new requests.RegisterRequest(['TOPOLOGY_CHANGE', 'STATUS_CHANGE', 'SCHEMA_CHANGE']);
|
568 | await this.connection.send(request, null);
|
569 | }
|
570 |
|
571 | |
572 |
|
573 |
|
574 | _nodeTopologyChangeHandler(event) {
|
575 | this.log('info', 'Received topology change', event);
|
576 |
|
577 |
|
578 | clearTimeout(this._topologyChangeTimeout);
|
579 |
|
580 |
|
581 |
|
582 | this._topologyChangeTimeout = setTimeout(() =>
|
583 | promiseUtils.toBackground(this._scheduleRefreshHosts()), newNodeDelay);
|
584 | }
|
585 |
|
586 | |
587 |
|
588 |
|
589 | _nodeStatusChangeHandler(event) {
|
590 | const self = this;
|
591 | const addressToTranslate = event.inet.address.toString();
|
592 | const port = this.options.protocolOptions.port;
|
593 | this._addressTranslator.translate(addressToTranslate, port, function translateCallback(endPoint) {
|
594 | const host = self.hosts.get(endPoint);
|
595 | if (!host) {
|
596 | self.log('warning', 'Received status change event but host was not found: ' + addressToTranslate);
|
597 | return;
|
598 | }
|
599 | const distance = self._profileManager.getDistance(host);
|
600 | if (event.up) {
|
601 | if (distance === types.distance.ignored) {
|
602 | return host.setUp(true);
|
603 | }
|
604 | clearTimeout(self._nodeStatusChangeTimeout);
|
605 |
|
606 | self._nodeStatusChangeTimeout = setTimeout(() => host.checkIsUp(), newNodeDelay);
|
607 | return;
|
608 | }
|
609 |
|
610 | if (distance === types.distance.ignored) {
|
611 | return host.setDown();
|
612 | }
|
613 | self.log('warning', 'Received status change to DOWN for host ' + host.address);
|
614 | });
|
615 | }
|
616 |
|
617 | |
618 |
|
619 |
|
620 | _nodeSchemaChangeHandler(event) {
|
621 | this.log('info', 'Schema change', event);
|
622 | if (!this.options.isMetadataSyncEnabled) {
|
623 | return;
|
624 | }
|
625 |
|
626 | promiseUtils.toBackground(this.handleSchemaChange(event, false));
|
627 | }
|
628 |
|
629 | |
630 |
|
631 |
|
632 |
|
633 |
|
634 |
|
635 | handleSchemaChange(event, processNow) {
|
636 | const self = this;
|
637 | let handler, cqlObject;
|
638 |
|
639 | if (event.isKeyspace) {
|
640 | if (event.schemaChangeType === schemaChangeTypes.dropped) {
|
641 | handler = function removeKeyspace() {
|
642 |
|
643 |
|
644 | delete self.metadata.keyspaces[event.keyspace];
|
645 | };
|
646 |
|
647 | return this._scheduleObjectRefresh(handler, event.keyspace, null, processNow);
|
648 | }
|
649 |
|
650 | return this._scheduleKeyspaceRefresh(event.keyspace, processNow);
|
651 | }
|
652 |
|
653 | const ksInfo = this.metadata.keyspaces[event.keyspace];
|
654 | if (!ksInfo) {
|
655 |
|
656 | return Promise.resolve();
|
657 | }
|
658 |
|
659 | if (event.table) {
|
660 | cqlObject = event.table;
|
661 | handler = function clearTableState() {
|
662 | delete ksInfo.tables[event.table];
|
663 | delete ksInfo.views[event.table];
|
664 | };
|
665 | }
|
666 | else if (event.udt) {
|
667 | cqlObject = event.udt;
|
668 | handler = function clearUdtState() {
|
669 | delete ksInfo.udts[event.udt];
|
670 | };
|
671 | }
|
672 | else if (event.functionName) {
|
673 | cqlObject = event.functionName;
|
674 | handler = function clearFunctionState() {
|
675 | delete ksInfo.functions[event.functionName];
|
676 | };
|
677 | }
|
678 | else if (event.aggregate) {
|
679 | cqlObject = event.aggregate;
|
680 | handler = function clearKeyspaceState() {
|
681 | delete ksInfo.aggregates[event.aggregate];
|
682 | };
|
683 | }
|
684 |
|
685 | if (!handler) {
|
686 |
|
687 | return Promise.resolve();
|
688 | }
|
689 |
|
690 |
|
691 | return this._scheduleObjectRefresh(handler, event.keyspace, cqlObject, processNow);
|
692 | }
|
693 |
|
694 | |
695 |
|
696 |
|
697 |
|
698 |
|
699 |
|
700 |
|
701 | _scheduleObjectRefresh(handler, keyspace, cqlObject, processNow) {
|
702 | return this._debouncer.eventReceived({ handler, keyspace, cqlObject }, processNow);
|
703 | }
|
704 |
|
705 | |
706 |
|
707 |
|
708 |
|
709 |
|
710 | _scheduleKeyspaceRefresh(keyspace, processNow) {
|
711 | return this._debouncer.eventReceived({
|
712 | handler: () => this.metadata.refreshKeyspace(keyspace),
|
713 | keyspace
|
714 | }, processNow);
|
715 | }
|
716 |
|
717 |
|
718 | _scheduleRefreshHosts() {
|
719 | return this._debouncer.eventReceived({
|
720 | handler: () => this._refreshHosts(false, false),
|
721 | all: true
|
722 | }, false);
|
723 | }
|
724 |
|
725 | |
726 |
|
727 |
|
728 |
|
729 |
|
730 |
|
731 |
|
732 | _setLocalInfo(initializing, setCurrentHost, c, result) {
|
733 | if (!result || !result.rows || !result.rows.length) {
|
734 | this.log('warning', 'No local info could be obtained');
|
735 | return;
|
736 | }
|
737 |
|
738 | const row = result.rows[0];
|
739 |
|
740 | let localHost;
|
741 |
|
742 |
|
743 | const endpoint = !this.options.sni
|
744 | ? c.endpoint
|
745 | : `${row['rpc_address']}:${this.options.protocolOptions.port}`;
|
746 |
|
747 | if (initializing) {
|
748 | localHost = new Host(endpoint, this.protocolVersion, this.options, this.metadata);
|
749 | this.hosts.set(endpoint, localHost);
|
750 | this.log('info', `Adding host ${endpoint}`);
|
751 | } else {
|
752 | localHost = this.hosts.get(endpoint);
|
753 |
|
754 | if (!localHost) {
|
755 | this.log('error', 'Localhost could not be found');
|
756 | return;
|
757 | }
|
758 | }
|
759 |
|
760 | localHost.datacenter = row['data_center'];
|
761 | localHost.rack = row['rack'];
|
762 | localHost.tokens = row['tokens'];
|
763 | localHost.hostId = row['host_id'];
|
764 | localHost.cassandraVersion = row['release_version'];
|
765 | setDseParameters(localHost, row);
|
766 | this.metadata.setPartitioner(row['partitioner']);
|
767 | this.log('info', 'Local info retrieved');
|
768 |
|
769 | if (setCurrentHost) {
|
770 |
|
771 | this.host = localHost;
|
772 | }
|
773 | }
|
774 |
|
775 | |
776 |
|
777 |
|
778 |
|
779 |
|
780 | async setPeersInfo(initializing, result) {
|
781 | if (!result || !result.rows) {
|
782 | return;
|
783 | }
|
784 |
|
785 |
|
786 | const peers = {};
|
787 | const port = this.options.protocolOptions.port;
|
788 | const foundDataCenters = new Set();
|
789 |
|
790 | if (this.host && this.host.datacenter) {
|
791 | foundDataCenters.add(this.host.datacenter);
|
792 | }
|
793 |
|
794 | for (const row of result.rows) {
|
795 | const endpoint = await this.getAddressForPeerHost(row, port);
|
796 |
|
797 | if (!endpoint) {
|
798 | continue;
|
799 | }
|
800 |
|
801 | peers[endpoint] = true;
|
802 | let host = this.hosts.get(endpoint);
|
803 | let isNewHost = !host;
|
804 |
|
805 | if (isNewHost) {
|
806 | host = new Host(endpoint, this.protocolVersion, this.options, this.metadata);
|
807 | this.log('info', `Adding host ${endpoint}`);
|
808 | isNewHost = true;
|
809 | }
|
810 |
|
811 | host.datacenter = row['data_center'];
|
812 | host.rack = row['rack'];
|
813 | host.tokens = row['tokens'];
|
814 | host.hostId = row['host_id'];
|
815 | host.cassandraVersion = row['release_version'];
|
816 | setDseParameters(host, row);
|
817 |
|
818 | if (host.datacenter) {
|
819 | foundDataCenters.add(host.datacenter);
|
820 | }
|
821 |
|
822 | if (isNewHost) {
|
823 |
|
824 |
|
825 | this.hosts.set(endpoint, host);
|
826 |
|
827 | if (!initializing) {
|
828 |
|
829 | this._profileManager.getDistance(host);
|
830 |
|
831 |
|
832 | host.setDown();
|
833 | }
|
834 | }
|
835 | }
|
836 |
|
837 |
|
838 | if (this.hosts.length > result.rows.length + 1) {
|
839 |
|
840 | this.log('info', 'Removing nodes from the pool');
|
841 | const toRemove = [];
|
842 |
|
843 | this.hosts.forEach(h => {
|
844 |
|
845 | if (!peers[h.address] && h !== this.host) {
|
846 | this.log('info', 'Removing host ' + h.address);
|
847 | toRemove.push(h.address);
|
848 | h.shutdown(true);
|
849 | }
|
850 | });
|
851 |
|
852 | this.hosts.removeMultiple(toRemove);
|
853 | }
|
854 |
|
855 | if (initializing && this.options.localDataCenter) {
|
856 | const localDc = this.options.localDataCenter;
|
857 |
|
858 | if (!foundDataCenters.has(localDc)) {
|
859 | throw new errors.ArgumentError(`localDataCenter was configured as '${
|
860 | localDc}', but only found hosts in data centers: [${Array.from(foundDataCenters).join(', ')}]`);
|
861 | }
|
862 | }
|
863 |
|
864 | this.log('info', 'Peers info retrieved');
|
865 | }
|
866 |
|
867 | |
868 |
|
869 |
|
870 |
|
871 |
|
872 |
|
873 | getAddressForPeerHost(row, defaultPort) {
|
874 | return new Promise(resolve => {
|
875 | let address = row['rpc_address'];
|
876 | const peer = row['peer'];
|
877 | const bindAllAddress = '0.0.0.0';
|
878 |
|
879 | if (!address) {
|
880 | this.log('error', f('No rpc_address found for host %s in %s\'s peers system table. %s will be ignored.',
|
881 | peer, this.host.address, peer));
|
882 | return resolve(null);
|
883 | }
|
884 |
|
885 | if (address.toString() === bindAllAddress) {
|
886 | this.log('warning', f('Found host with 0.0.0.0 as rpc_address, using listen_address (%s) to contact it instead.' +
|
887 | ' If this is incorrect you should avoid the use of 0.0.0.0 server side.', peer));
|
888 | address = peer;
|
889 | }
|
890 |
|
891 | this._addressTranslator.translate(address.toString(), defaultPort, resolve);
|
892 | });
|
893 | }
|
894 |
|
895 | |
896 |
|
897 |
|
898 |
|
899 |
|
900 |
|
901 | async _resolveAll(name) {
|
902 | const addresses = [];
|
903 | const resolve4 = util.promisify(dns.resolve4);
|
904 | const resolve6 = util.promisify(dns.resolve6);
|
905 | const lookup = util.promisify(dns.lookup);
|
906 |
|
907 |
|
908 | const ipv4Promise = resolve4(name).catch(() => {}).then(r => r || utils.emptyArray);
|
909 | const ipv6Promise = resolve6(name).catch(() => {}).then(r => r || utils.emptyArray);
|
910 |
|
911 | let arr;
|
912 | arr = await ipv4Promise;
|
913 | arr.forEach(address => addresses.push({address, isIPv6: false}));
|
914 |
|
915 | arr = await ipv6Promise;
|
916 | arr.forEach(address => addresses.push({address, isIPv6: true}));
|
917 |
|
918 | if (addresses.length === 0) {
|
919 |
|
920 |
|
921 | try {
|
922 | arr = await lookup(name, { all: true });
|
923 | arr.forEach(({address, family}) => addresses.push({address, isIPv6: family === 6}));
|
924 | } catch (err) {
|
925 | this.log('error', `Host with name ${name} could not be resolved`, err);
|
926 | }
|
927 | }
|
928 |
|
929 | return addresses;
|
930 | }
|
931 |
|
932 | |
933 |
|
934 |
|
935 |
|
936 | _waitForReconnection() {
|
937 | return new Promise((resolve, reject) => {
|
938 | const callback = promiseUtils.getCallback(resolve, reject);
|
939 |
|
940 |
|
941 | let timeout;
|
942 |
|
943 | function newConnectionListener(err) {
|
944 | clearTimeout(timeout);
|
945 | callback(err);
|
946 | }
|
947 |
|
948 | this.once('newConnection', newConnectionListener);
|
949 |
|
950 | timeout = setTimeout(() => {
|
951 | this.removeListener('newConnection', newConnectionListener);
|
952 | callback(new errors.OperationTimedOutError('A connection could not be acquired before timeout.'));
|
953 | }, metadataQueryAbortTimeout);
|
954 | });
|
955 | }
|
956 |
|
957 | |
958 |
|
959 |
|
960 |
|
961 |
|
962 |
|
963 | async query(cqlQuery, waitReconnect = true) {
|
964 | const queryOnConnection = async () => {
|
965 | if (!this.connection || this._isShuttingDown) {
|
966 | throw new errors.NoHostAvailableError({}, 'ControlConnection is not connected at the time');
|
967 | }
|
968 |
|
969 | const request = typeof cqlQuery === 'string' ? new requests.QueryRequest(cqlQuery, null, null) : cqlQuery;
|
970 | return await this.connection.send(request, null);
|
971 | };
|
972 |
|
973 | if (!this.connection && waitReconnect) {
|
974 |
|
975 | await this._waitForReconnection();
|
976 | }
|
977 |
|
978 | return await queryOnConnection();
|
979 | }
|
980 |
|
981 |
|
982 | getEncoder() {
|
983 | if (!this._encoder) {
|
984 | throw new errors.DriverInternalError('Encoder is not defined');
|
985 | }
|
986 | return this._encoder;
|
987 | }
|
988 |
|
989 | |
990 |
|
991 |
|
992 | shutdown() {
|
993 | this._isShuttingDown = true;
|
994 | this._debouncer.shutdown();
|
995 |
|
996 | this.emit('newConnection', new errors.DriverError('ControlConnection is being shutdown'));
|
997 |
|
998 | clearTimeout(this._topologyChangeTimeout);
|
999 | clearTimeout(this._nodeStatusChangeTimeout);
|
1000 | }
|
1001 |
|
1002 | |
1003 |
|
1004 |
|
1005 | async reset() {
|
1006 |
|
1007 | const currentHosts = this.hosts.clear();
|
1008 |
|
1009 |
|
1010 | this._isShuttingDown = true;
|
1011 |
|
1012 |
|
1013 | await Promise.all(currentHosts.map(h => h.shutdown()));
|
1014 |
|
1015 | this.initialized = false;
|
1016 | this._isShuttingDown = false;
|
1017 | }
|
1018 |
|
1019 | |
1020 |
|
1021 |
|
1022 | getResolvedContactPoints() {
|
1023 | return this._resolvedContactPoints;
|
1024 | }
|
1025 |
|
1026 | |
1027 |
|
1028 |
|
1029 |
|
1030 | getLocalAddress() {
|
1031 | if (!this.connection) {
|
1032 | return undefined;
|
1033 | }
|
1034 |
|
1035 | return this.connection.getLocalAddress();
|
1036 | }
|
1037 |
|
1038 | |
1039 |
|
1040 |
|
1041 |
|
1042 | getEndpoint() {
|
1043 | if (!this.connection) {
|
1044 | return undefined;
|
1045 | }
|
1046 |
|
1047 | return this.connection.endpoint;
|
1048 | }
|
1049 | }
|
1050 |
|
1051 |
|
1052 |
|
1053 |
|
1054 |
|
1055 |
|
1056 |
|
1057 | function setDseParameters(host, row) {
|
1058 | if (row['workloads'] !== undefined) {
|
1059 | host.workloads = row['workloads'];
|
1060 | }
|
1061 | else if (row['workload']) {
|
1062 | host.workloads = [ row['workload'] ];
|
1063 | }
|
1064 | else {
|
1065 | host.workloads = utils.emptyArray;
|
1066 | }
|
1067 |
|
1068 | if (row['dse_version'] !== undefined) {
|
1069 | host.dseVersion = row['dse_version'];
|
1070 | }
|
1071 | }
|
1072 |
|
1073 | module.exports = ControlConnection;
|