1 | /*
|
2 | * Copyright DataStax, Inc.
|
3 | *
|
4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | * you may not use this file except in compliance with the License.
|
6 | * You may obtain a copy of the License at
|
7 | *
|
8 | * http://www.apache.org/licenses/LICENSE-2.0
|
9 | *
|
10 | * Unless required by applicable law or agreed to in writing, software
|
11 | * distributed under the License is distributed on an "AS IS" BASIS,
|
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
13 | * See the License for the specific language governing permissions and
|
14 | * limitations under the License.
|
15 | */
|
16 |
|
17 | ;
|
18 |
|
19 | const events = require('events');
|
20 |
|
21 | const utils = require('./utils');
|
22 | const types = require('./types');
|
23 | const HostConnectionPool = require('./host-connection-pool');
|
24 | const PrepareHandler = require('./prepare-handler');
|
25 | const promiseUtils = require('./promise-utils');
|
26 |
|
27 | const healthResponseCountInterval = 200;
|
28 |
|
29 | /**
|
30 | * Represents a Cassandra node.
|
31 | * @extends EventEmitter
|
32 | */
|
33 | class Host extends events.EventEmitter {
|
34 |
|
35 | /**
|
36 | * Creates a new Host instance.
|
37 | */
|
38 | constructor(address, protocolVersion, options, metadata) {
|
39 | super();
|
40 | /**
|
41 | * Gets ip address and port number of the node separated by `:`.
|
42 | * @type {String}
|
43 | */
|
44 | this.address = address;
|
45 | this.setDownAt = 0;
|
46 | this.log = utils.log;
|
47 |
|
48 | /**
|
49 | * Gets the timestamp of the moment when the Host was marked as UP.
|
50 | * @type {Number|null}
|
51 | * @ignore
|
52 | * @internal
|
53 | */
|
54 | this.isUpSince = null;
|
55 | Object.defineProperty(this, 'options', { value: options, enumerable: false, writable: false });
|
56 |
|
57 | /**
|
58 | * The host pool.
|
59 | * @internal
|
60 | * @ignore
|
61 | * @type {HostConnectionPool}
|
62 | */
|
63 | Object.defineProperty(this, 'pool', { value: new HostConnectionPool(this, protocolVersion), enumerable: false });
|
64 |
|
65 | this.pool.on('open', err => promiseUtils.toBackground(this._onNewConnectionOpen(err)));
|
66 | this.pool.on('remove', () => this._checkPoolState());
|
67 |
|
68 | /**
|
69 | * Gets string containing the Cassandra version.
|
70 | * @type {String}
|
71 | */
|
72 | this.cassandraVersion = null;
|
73 |
|
74 | /**
|
75 | * Gets data center name of the node.
|
76 | * @type {String}
|
77 | */
|
78 | this.datacenter = null;
|
79 |
|
80 | /**
|
81 | * Gets rack name of the node.
|
82 | * @type {String}
|
83 | */
|
84 | this.rack = null;
|
85 |
|
86 | /**
|
87 | * Gets the tokens assigned to the node.
|
88 | * @type {Array}
|
89 | */
|
90 | this.tokens = null;
|
91 |
|
92 | /**
|
93 | * Gets the id of the host.
|
94 | * <p>This identifier is used by the server for internal communication / gossip.</p>
|
95 | * @type {Uuid}
|
96 | */
|
97 | this.hostId = null;
|
98 |
|
99 | /**
|
100 | * Gets string containing the DSE version or null if not set.
|
101 | * @type {String}
|
102 | */
|
103 | this.dseVersion = null;
|
104 |
|
105 | /**
|
106 | * Gets the DSE Workloads the host is running.
|
107 | * <p>
|
108 | * This is based on the "workload" or "workloads" columns in {@code system.local} and {@code system.peers}.
|
109 | * <p/>
|
110 | * <p>
|
111 | * Workload labels may vary depending on the DSE version in use;e.g. DSE 5.1 may report two distinct workloads:
|
112 | * <code>Search</code> and <code>Analytics</code>, while DSE 5.0 would report a single
|
113 | * <code>SearchAnalytics</code> workload instead. The driver simply returns the workload labels as reported by
|
114 | * DSE, without any form of pre-processing.
|
115 | * <p/>
|
116 | * <p>When the information is unavailable, this property returns an empty array.</p>
|
117 | * @type {Array<string>}
|
118 | */
|
119 | this.workloads = utils.emptyArray;
|
120 |
|
121 | // the distance as last set using the load balancing policy
|
122 | this._distance = types.distance.ignored;
|
123 | this._healthResponseCounter = 0;
|
124 |
|
125 | // Make some of the private instance variables not enumerable to prevent from showing when inspecting
|
126 | Object.defineProperty(this, '_metadata', { value: metadata, enumerable: false });
|
127 | Object.defineProperty(this, '_healthResponseCountTimer', { value: null, enumerable: false, writable: true });
|
128 |
|
129 | this.reconnectionSchedule = this.options.policies.reconnection.newSchedule();
|
130 | this.reconnectionDelay = 0;
|
131 | }
|
132 |
|
133 | /**
|
134 | * Marks this host as not available for query coordination, when the host was previously marked as UP, otherwise its
|
135 | * a no-op.
|
136 | * @internal
|
137 | * @ignore
|
138 | */
|
139 | setDown() {
|
140 | // Multiple events signaling that a host is failing could cause multiple calls to this method
|
141 | if (this.setDownAt !== 0) {
|
142 | // the host is already marked as Down
|
143 | return;
|
144 | }
|
145 | if (this.pool.isClosing()) {
|
146 | // the pool is being closed/shutdown, don't mind
|
147 | return;
|
148 | }
|
149 | this.setDownAt = Date.now();
|
150 | if (this.pool.coreConnectionsLength > 0) {
|
151 | // According to the distance, there should be connections open to it => issue a warning
|
152 | this.log('warning', `Host ${this.address} considered as DOWN. Reconnection delay ${this.reconnectionDelay}ms.`);
|
153 | }
|
154 | else {
|
155 | this.log('info', `Host ${this.address} considered as DOWN.`);
|
156 | }
|
157 | this.emit('down');
|
158 | this._checkPoolState();
|
159 | }
|
160 |
|
161 | /**
|
162 | * Marks this host as available for querying.
|
163 | * @param {Boolean} [clearReconnection]
|
164 | * @internal
|
165 | * @ignore
|
166 | */
|
167 | setUp(clearReconnection) {
|
168 | if (!this.setDownAt) {
|
169 | //The host is already marked as UP
|
170 | return;
|
171 | }
|
172 | this.log('info', `Setting host ${this.address} as UP`);
|
173 | this.setDownAt = 0;
|
174 | this.isUpSince = Date.now();
|
175 | //if it was unhealthy and now it is not, lets reset the reconnection schedule.
|
176 | this.reconnectionSchedule = this.options.policies.reconnection.newSchedule();
|
177 | if (clearReconnection) {
|
178 | this.pool.clearNewConnectionAttempt();
|
179 | }
|
180 | this.emit('up');
|
181 | }
|
182 |
|
183 | /**
|
184 | * Resets the reconnectionSchedule and tries to issue a reconnection immediately.
|
185 | * @internal
|
186 | * @ignore
|
187 | */
|
188 | checkIsUp() {
|
189 | if (this.isUp()) {
|
190 | return;
|
191 | }
|
192 | this.reconnectionSchedule = this.options.policies.reconnection.newSchedule();
|
193 | this.reconnectionDelay = 0;
|
194 | this.pool.attemptNewConnectionImmediate();
|
195 | }
|
196 |
|
197 | /**
|
198 | * @param {Boolean} [waitForPending] When true, it waits for in-flight operations to be finish before closing the
|
199 | * connections.
|
200 | * @returns {Promise<void>}
|
201 | * @internal
|
202 | * @ignore
|
203 | */
|
204 | shutdown(waitForPending) {
|
205 | if (this._healthResponseCountTimer) {
|
206 | clearInterval(this._healthResponseCountTimer);
|
207 | }
|
208 | if (waitForPending) {
|
209 | this.pool.drainAndShutdown();
|
210 | // Gracefully draining and shutting down the pool is being done in the background
|
211 | return Promise.resolve();
|
212 | }
|
213 | return this.pool.shutdown();
|
214 | }
|
215 |
|
216 | /**
|
217 | * Determines if the node is UP now (seen as UP by the driver).
|
218 | * @returns {boolean}
|
219 | */
|
220 | isUp() {
|
221 | return !this.setDownAt;
|
222 | }
|
223 |
|
224 | /**
|
225 | * Determines if the host can be considered as UP.
|
226 | * Deprecated: Use {@link Host#isUp()} instead.
|
227 | * @returns {boolean}
|
228 | */
|
229 | canBeConsideredAsUp() {
|
230 | const self = this;
|
231 | function hasTimePassed() {
|
232 | return new Date().getTime() - self.setDownAt >= self.reconnectionDelay;
|
233 | }
|
234 | return !this.setDownAt || hasTimePassed();
|
235 | }
|
236 |
|
237 | /**
|
238 | * Sets the distance of the host relative to the client using the load balancing policy.
|
239 | * @param {Number} distance
|
240 | * @internal
|
241 | * @ignore
|
242 | */
|
243 | setDistance(distance) {
|
244 | const previousDistance = this._distance;
|
245 | this._distance = distance || types.distance.local;
|
246 | if (this.options.pooling.coreConnectionsPerHost) {
|
247 | this.pool.coreConnectionsLength = this.options.pooling.coreConnectionsPerHost[this._distance] || 0;
|
248 | }
|
249 | else {
|
250 | this.pool.coreConnectionsLength = 1;
|
251 | }
|
252 | if (this._distance === previousDistance) {
|
253 | return this._distance;
|
254 | }
|
255 | if (this._healthResponseCountTimer) {
|
256 | clearInterval(this._healthResponseCountTimer);
|
257 | }
|
258 | if (this._distance === types.distance.ignored) {
|
259 | // this host was local/remote and now must be ignored
|
260 | this.emit('ignore');
|
261 | this.pool.drainAndShutdown();
|
262 | }
|
263 | else {
|
264 | if (!this.isUp()) {
|
265 | this.checkIsUp();
|
266 | }
|
267 | // Reset the health check timer
|
268 | this._healthResponseCountTimer = setInterval(() => {
|
269 | this._healthResponseCounter = this.pool.getAndResetResponseCounter();
|
270 | }, healthResponseCountInterval);
|
271 | }
|
272 | return this._distance;
|
273 | }
|
274 |
|
275 | /**
|
276 | * Changes the protocol version of a given host
|
277 | * @param {Number} value
|
278 | * @internal
|
279 | * @ignore
|
280 | */
|
281 | setProtocolVersion(value) {
|
282 | this.pool.protocolVersion = value;
|
283 | }
|
284 |
|
285 | /**
|
286 | * Gets the least busy connection from the pool.
|
287 | * @param {Connection} [previousConnection] When provided, the pool should attempt to obtain a different connection.
|
288 | * @returns {Connection!}
|
289 | * @throws {Error}
|
290 | * @throws {BusyConnectionError}
|
291 | * @internal
|
292 | * @ignore
|
293 | */
|
294 | borrowConnection(previousConnection) {
|
295 | return this.pool.borrowConnection(previousConnection);
|
296 | }
|
297 |
|
298 | /**
|
299 | * Creates all the connection in the pool.
|
300 | * @param {string} keyspace
|
301 | * @internal
|
302 | * @ignore
|
303 | */
|
304 | warmupPool(keyspace) {
|
305 | return this.pool.warmup(keyspace);
|
306 | }
|
307 |
|
308 | /**
|
309 | * Starts creating the pool in the background.
|
310 | * @internal
|
311 | * @ignore
|
312 | */
|
313 | initializePool() {
|
314 | this.pool.increaseSize();
|
315 | }
|
316 | /**
|
317 | * Gets any connection that is already opened or null if not found.
|
318 | * @returns {Connection}
|
319 | * @internal
|
320 | * @ignore
|
321 | */
|
322 | getActiveConnection() {
|
323 | if (!this.isUp() || !this.pool.connections.length) {
|
324 | return null;
|
325 | }
|
326 | return this.pool.connections[0];
|
327 | }
|
328 |
|
329 | /**
|
330 | * Internal method to get the amount of responses dequeued in the last interval (between 200ms and 400ms) on all
|
331 | * connections to the host.
|
332 | * @returns {Number}
|
333 | * @internal
|
334 | * @ignore
|
335 | */
|
336 | getResponseCount() {
|
337 | // Last interval plus the current count
|
338 | return this._healthResponseCounter + this.pool.responseCounter;
|
339 | }
|
340 |
|
341 | /**
|
342 | * Checks the health of a connection in the pool
|
343 | * @param {Connection} connection
|
344 | * @internal
|
345 | * @ignore
|
346 | */
|
347 | checkHealth(connection) {
|
348 | if (connection.timedOutOperations <= this.options.socketOptions.defunctReadTimeoutThreshold) {
|
349 | return;
|
350 | }
|
351 | this.removeFromPool(connection);
|
352 | }
|
353 |
|
354 | /**
|
355 | * @param {Connection} connection
|
356 | * @internal
|
357 | * @ignore
|
358 | */
|
359 | removeFromPool(connection) {
|
360 | this.pool.remove(connection);
|
361 | this._checkPoolState();
|
362 | }
|
363 |
|
364 | /**
|
365 | * Internal method that gets the amount of in-flight requests on all connections to the host.
|
366 | * @internal
|
367 | * @ignore
|
368 | */
|
369 | getInFlight() {
|
370 | return this.pool.getInFlight();
|
371 | }
|
372 |
|
373 | /**
|
374 | * Validates that the internal state of the connection pool.
|
375 | * If the pool size is smaller than expected, schedule a new connection attempt.
|
376 | * If the amount of connections is 0 for not ignored hosts, the host must be down.
|
377 | * @private
|
378 | */
|
379 | _checkPoolState() {
|
380 | if (this.pool.isClosing()) {
|
381 | return;
|
382 | }
|
383 | if (this.pool.connections.length < this.pool.coreConnectionsLength) {
|
384 | // the pool needs to grow / reconnect
|
385 | if (!this.pool.hasScheduledNewConnection()) {
|
386 | this.reconnectionDelay = this.reconnectionSchedule.next().value;
|
387 | this.pool.scheduleNewConnectionAttempt(this.reconnectionDelay);
|
388 | }
|
389 | }
|
390 | const shouldHaveConnections = this._distance !== types.distance.ignored && this.pool.coreConnectionsLength > 0;
|
391 | if (shouldHaveConnections && this.pool.connections.length === 0) {
|
392 | // Mark as DOWN, if its UP
|
393 | this.setDown();
|
394 | }
|
395 | }
|
396 |
|
397 | /**
|
398 | * Executed after an scheduled new connection attempt finished
|
399 | * @private
|
400 | */
|
401 | async _onNewConnectionOpen(err) {
|
402 | if (err) {
|
403 | this._checkPoolState();
|
404 | return;
|
405 | }
|
406 | if (!this.isUp() && this.options.rePrepareOnUp) {
|
407 | this.log('info', `Re-preparing all queries on host ${this.address} before setting it as UP`);
|
408 | const allPrepared = this._metadata.getAllPrepared();
|
409 | try {
|
410 | await PrepareHandler.prepareAllQueries(this, allPrepared);
|
411 | }
|
412 | catch (err) {
|
413 | this.log('warning', `Failed re-preparing on host ${this.address}: ${err}`, err);
|
414 | }
|
415 | }
|
416 | this.setUp();
|
417 | this.pool.increaseSize();
|
418 | }
|
419 |
|
420 | /**
|
421 | * Returns an array containing the Cassandra Version as an Array of Numbers having the major version in the first
|
422 | * position.
|
423 | * @returns {Array.<Number>}
|
424 | */
|
425 | getCassandraVersion() {
|
426 | if (!this.cassandraVersion) {
|
427 | return utils.emptyArray;
|
428 | }
|
429 | return this.cassandraVersion.split('-')[0].split('.').map(x => parseInt(x, 10));
|
430 | }
|
431 |
|
432 | /**
|
433 | * Gets the DSE version of the host as an Array, containing the major version in the first position.
|
434 | * In case the cluster is not a DSE cluster, it returns an empty Array.
|
435 | * @returns {Array}
|
436 | */
|
437 | getDseVersion() {
|
438 | if (!this.dseVersion) {
|
439 | return utils.emptyArray;
|
440 | }
|
441 | return this.dseVersion.split('-')[0].split('.').map(x => parseInt(x, 10));
|
442 | }
|
443 | }
|
444 |
|
445 | /**
|
446 | * Represents an associative-array of {@link Host hosts} that can be iterated.
|
447 | * It creates an internal copy when adding or removing, making it safe to iterate using the values()
|
448 | * method within async operations.
|
449 | * @extends events.EventEmitter
|
450 | * @constructor
|
451 | */
|
452 | class HostMap extends events.EventEmitter{
|
453 | constructor() {
|
454 | super();
|
455 |
|
456 | this._items = new Map();
|
457 | this._values = null;
|
458 |
|
459 | Object.defineProperty(this, 'length', { get: () => this.values().length, enumerable: true });
|
460 |
|
461 | /**
|
462 | * Emitted when a host is added to the map
|
463 | * @event HostMap#add
|
464 | */
|
465 | /**
|
466 | * Emitted when a host is removed from the map
|
467 | * @event HostMap#remove
|
468 | */
|
469 | }
|
470 |
|
471 | /**
|
472 | * Executes a provided function once per map element.
|
473 | * @param callback
|
474 | */
|
475 | forEach(callback) {
|
476 | const items = this._items;
|
477 | for (const [ key, value ] of items) {
|
478 | callback(value, key);
|
479 | }
|
480 | }
|
481 |
|
482 | /**
|
483 | * Gets a {@link Host host} by key or undefined if not found.
|
484 | * @param {String} key
|
485 | * @returns {Host}
|
486 | */
|
487 | get(key) {
|
488 | return this._items.get(key);
|
489 | }
|
490 |
|
491 | /**
|
492 | * Returns an array of host addresses.
|
493 | * @returns {Array.<String>}
|
494 | */
|
495 | keys() {
|
496 | return Array.from(this._items.keys());
|
497 | }
|
498 |
|
499 | /**
|
500 | * Removes an item from the map.
|
501 | * @param {String} key The key of the host
|
502 | * @fires HostMap#remove
|
503 | */
|
504 | remove(key) {
|
505 | const value = this._items.get(key);
|
506 | if (value === undefined) {
|
507 | return;
|
508 | }
|
509 |
|
510 | // Clear cache
|
511 | this._values = null;
|
512 |
|
513 | // Copy the values
|
514 | const copy = new Map(this._items);
|
515 | copy.delete(key);
|
516 |
|
517 | this._items = copy;
|
518 | this.emit('remove', value);
|
519 | }
|
520 |
|
521 | /**
|
522 | * Removes multiple hosts from the map.
|
523 | * @param {Array.<String>} keys
|
524 | * @fires HostMap#remove
|
525 | */
|
526 | removeMultiple(keys) {
|
527 | // Clear value cache
|
528 | this._values = null;
|
529 |
|
530 | // Copy the values
|
531 | const copy = new Map(this._items);
|
532 | const removedHosts = [];
|
533 |
|
534 | for (const key of keys) {
|
535 | const h = copy.get(key);
|
536 |
|
537 | if (!h) {
|
538 | continue;
|
539 | }
|
540 |
|
541 | removedHosts.push(h);
|
542 | copy.delete(key);
|
543 | }
|
544 |
|
545 | this._items = copy;
|
546 | removedHosts.forEach(h => this.emit('remove', h));
|
547 | }
|
548 |
|
549 | /**
|
550 | * Adds a new item to the map.
|
551 | * @param {String} key The key of the host
|
552 | * @param {Host} value The host to be added
|
553 | * @fires HostMap#remove
|
554 | * @fires HostMap#add
|
555 | */
|
556 | set(key, value) {
|
557 | // Clear values cache
|
558 | this._values = null;
|
559 |
|
560 | const originalValue = this._items.get(key);
|
561 | if (originalValue) {
|
562 | //The internal structure does not change
|
563 | this._items.set(key, value);
|
564 | //emit a remove followed by a add
|
565 | this.emit('remove', originalValue);
|
566 | this.emit('add', value);
|
567 | return;
|
568 | }
|
569 |
|
570 | // Copy the values
|
571 | const copy = new Map(this._items);
|
572 | copy.set(key, value);
|
573 | this._items = copy;
|
574 | this.emit('add', value);
|
575 | return value;
|
576 | }
|
577 |
|
578 | /**
|
579 | * Returns a shallow copy of a portion of the items into a new array object.
|
580 | * Backward-compatibility.
|
581 | * @param {Number} [begin]
|
582 | * @param {Number} [end]
|
583 | * @returns {Array}
|
584 | * @ignore
|
585 | */
|
586 | slice(begin, end) {
|
587 | if (!begin && !end) {
|
588 | // Avoid making a copy of the copy
|
589 | return this.values();
|
590 | }
|
591 |
|
592 | return this.values().slice(begin || 0, end);
|
593 | }
|
594 |
|
595 | /**
|
596 | * Deprecated: Use set() instead.
|
597 | * @ignore
|
598 | * @deprecated
|
599 | */
|
600 | push(k, v) {
|
601 | this.set(k, v);
|
602 | }
|
603 |
|
604 | /**
|
605 | * Returns a shallow copy of the values of the map.
|
606 | * @returns {Array.<Host>}
|
607 | */
|
608 | values() {
|
609 | if (!this._values) {
|
610 | // Cache the values
|
611 | this._values = Object.freeze(Array.from(this._items.values()));
|
612 | }
|
613 |
|
614 | return this._values;
|
615 | }
|
616 |
|
617 | /**
|
618 | * Removes all items from the map.
|
619 | * @returns {Array.<Host>} The previous items
|
620 | */
|
621 | clear() {
|
622 | const previousItems = this.values();
|
623 |
|
624 | // Clear cache
|
625 | this._values = null;
|
626 |
|
627 | // Clear items
|
628 | this._items = new Map();
|
629 |
|
630 | // Emit events
|
631 | previousItems.forEach(h => this.emit('remove', h));
|
632 |
|
633 | return previousItems;
|
634 | }
|
635 |
|
636 | inspect() {
|
637 | return this._items;
|
638 | }
|
639 |
|
640 | toJSON() {
|
641 | // Node.js 10 and below don't support Object.fromEntries()
|
642 | if (Object.fromEntries) {
|
643 | return Object.fromEntries(this._items);
|
644 | }
|
645 |
|
646 | const obj = {};
|
647 | for (const [ key, value ] of this._items) {
|
648 | obj[key] = value;
|
649 | }
|
650 |
|
651 | return obj;
|
652 | }
|
653 | }
|
654 |
|
655 | module.exports = {
|
656 | Host,
|
657 | HostMap
|
658 | }; |
\ | No newline at end of file |