UNPKG

17.2 kBJavaScriptView Raw
1/*
2 * Copyright DataStax, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17'use strict';
18
19const events = require('events');
20
21const utils = require('./utils');
22const types = require('./types');
23const HostConnectionPool = require('./host-connection-pool');
24const PrepareHandler = require('./prepare-handler');
25const promiseUtils = require('./promise-utils');
26
27const healthResponseCountInterval = 200;
28
29/**
30 * Represents a Cassandra node.
31 * @extends EventEmitter
32 */
33class Host extends events.EventEmitter {
34
35 /**
36 * Creates a new Host instance.
37 */
38 constructor(address, protocolVersion, options, metadata) {
39 super();
40 /**
41 * Gets ip address and port number of the node separated by `:`.
42 * @type {String}
43 */
44 this.address = address;
45 this.setDownAt = 0;
46 this.log = utils.log;
47
48 /**
49 * Gets the timestamp of the moment when the Host was marked as UP.
50 * @type {Number|null}
51 * @ignore
52 * @internal
53 */
54 this.isUpSince = null;
55 Object.defineProperty(this, 'options', { value: options, enumerable: false, writable: false });
56
57 /**
58 * The host pool.
59 * @internal
60 * @ignore
61 * @type {HostConnectionPool}
62 */
63 Object.defineProperty(this, 'pool', { value: new HostConnectionPool(this, protocolVersion), enumerable: false });
64
65 this.pool.on('open', err => promiseUtils.toBackground(this._onNewConnectionOpen(err)));
66 this.pool.on('remove', () => this._checkPoolState());
67
68 /**
69 * Gets string containing the Cassandra version.
70 * @type {String}
71 */
72 this.cassandraVersion = null;
73
74 /**
75 * Gets data center name of the node.
76 * @type {String}
77 */
78 this.datacenter = null;
79
80 /**
81 * Gets rack name of the node.
82 * @type {String}
83 */
84 this.rack = null;
85
86 /**
87 * Gets the tokens assigned to the node.
88 * @type {Array}
89 */
90 this.tokens = null;
91
92 /**
93 * Gets the id of the host.
94 * <p>This identifier is used by the server for internal communication / gossip.</p>
95 * @type {Uuid}
96 */
97 this.hostId = null;
98
99 /**
100 * Gets string containing the DSE version or null if not set.
101 * @type {String}
102 */
103 this.dseVersion = null;
104
105 /**
106 * Gets the DSE Workloads the host is running.
107 * <p>
108 * This is based on the "workload" or "workloads" columns in {@code system.local} and {@code system.peers}.
109 * <p/>
110 * <p>
111 * Workload labels may vary depending on the DSE version in use;e.g. DSE 5.1 may report two distinct workloads:
112 * <code>Search</code> and <code>Analytics</code>, while DSE 5.0 would report a single
113 * <code>SearchAnalytics</code> workload instead. The driver simply returns the workload labels as reported by
114 * DSE, without any form of pre-processing.
115 * <p/>
116 * <p>When the information is unavailable, this property returns an empty array.</p>
117 * @type {Array<string>}
118 */
119 this.workloads = utils.emptyArray;
120
121 // the distance as last set using the load balancing policy
122 this._distance = types.distance.ignored;
123 this._healthResponseCounter = 0;
124
125 // Make some of the private instance variables not enumerable to prevent from showing when inspecting
126 Object.defineProperty(this, '_metadata', { value: metadata, enumerable: false });
127 Object.defineProperty(this, '_healthResponseCountTimer', { value: null, enumerable: false, writable: true });
128
129 this.reconnectionSchedule = this.options.policies.reconnection.newSchedule();
130 this.reconnectionDelay = 0;
131 }
132
133 /**
134 * Marks this host as not available for query coordination, when the host was previously marked as UP, otherwise its
135 * a no-op.
136 * @internal
137 * @ignore
138 */
139 setDown() {
140 // Multiple events signaling that a host is failing could cause multiple calls to this method
141 if (this.setDownAt !== 0) {
142 // the host is already marked as Down
143 return;
144 }
145 if (this.pool.isClosing()) {
146 // the pool is being closed/shutdown, don't mind
147 return;
148 }
149 this.setDownAt = Date.now();
150 if (this.pool.coreConnectionsLength > 0) {
151 // According to the distance, there should be connections open to it => issue a warning
152 this.log('warning', `Host ${this.address} considered as DOWN. Reconnection delay ${this.reconnectionDelay}ms.`);
153 }
154 else {
155 this.log('info', `Host ${this.address} considered as DOWN.`);
156 }
157 this.emit('down');
158 this._checkPoolState();
159 }
160
161 /**
162 * Marks this host as available for querying.
163 * @param {Boolean} [clearReconnection]
164 * @internal
165 * @ignore
166 */
167 setUp(clearReconnection) {
168 if (!this.setDownAt) {
169 //The host is already marked as UP
170 return;
171 }
172 this.log('info', `Setting host ${this.address} as UP`);
173 this.setDownAt = 0;
174 this.isUpSince = Date.now();
175 //if it was unhealthy and now it is not, lets reset the reconnection schedule.
176 this.reconnectionSchedule = this.options.policies.reconnection.newSchedule();
177 if (clearReconnection) {
178 this.pool.clearNewConnectionAttempt();
179 }
180 this.emit('up');
181 }
182
183 /**
184 * Resets the reconnectionSchedule and tries to issue a reconnection immediately.
185 * @internal
186 * @ignore
187 */
188 checkIsUp() {
189 if (this.isUp()) {
190 return;
191 }
192 this.reconnectionSchedule = this.options.policies.reconnection.newSchedule();
193 this.reconnectionDelay = 0;
194 this.pool.attemptNewConnectionImmediate();
195 }
196
197 /**
198 * @param {Boolean} [waitForPending] When true, it waits for in-flight operations to be finish before closing the
199 * connections.
200 * @returns {Promise<void>}
201 * @internal
202 * @ignore
203 */
204 shutdown(waitForPending) {
205 if (this._healthResponseCountTimer) {
206 clearInterval(this._healthResponseCountTimer);
207 }
208 if (waitForPending) {
209 this.pool.drainAndShutdown();
210 // Gracefully draining and shutting down the pool is being done in the background
211 return Promise.resolve();
212 }
213 return this.pool.shutdown();
214 }
215
216 /**
217 * Determines if the node is UP now (seen as UP by the driver).
218 * @returns {boolean}
219 */
220 isUp() {
221 return !this.setDownAt;
222 }
223
224 /**
225 * Determines if the host can be considered as UP.
226 * Deprecated: Use {@link Host#isUp()} instead.
227 * @returns {boolean}
228 */
229 canBeConsideredAsUp() {
230 const self = this;
231 function hasTimePassed() {
232 return new Date().getTime() - self.setDownAt >= self.reconnectionDelay;
233 }
234 return !this.setDownAt || hasTimePassed();
235 }
236
237 /**
238 * Sets the distance of the host relative to the client using the load balancing policy.
239 * @param {Number} distance
240 * @internal
241 * @ignore
242 */
243 setDistance(distance) {
244 const previousDistance = this._distance;
245 this._distance = distance || types.distance.local;
246 if (this.options.pooling.coreConnectionsPerHost) {
247 this.pool.coreConnectionsLength = this.options.pooling.coreConnectionsPerHost[this._distance] || 0;
248 }
249 else {
250 this.pool.coreConnectionsLength = 1;
251 }
252 if (this._distance === previousDistance) {
253 return this._distance;
254 }
255 if (this._healthResponseCountTimer) {
256 clearInterval(this._healthResponseCountTimer);
257 }
258 if (this._distance === types.distance.ignored) {
259 // this host was local/remote and now must be ignored
260 this.emit('ignore');
261 this.pool.drainAndShutdown();
262 }
263 else {
264 if (!this.isUp()) {
265 this.checkIsUp();
266 }
267 // Reset the health check timer
268 this._healthResponseCountTimer = setInterval(() => {
269 this._healthResponseCounter = this.pool.getAndResetResponseCounter();
270 }, healthResponseCountInterval);
271 }
272 return this._distance;
273 }
274
275 /**
276 * Changes the protocol version of a given host
277 * @param {Number} value
278 * @internal
279 * @ignore
280 */
281 setProtocolVersion(value) {
282 this.pool.protocolVersion = value;
283 }
284
285 /**
286 * Gets the least busy connection from the pool.
287 * @param {Connection} [previousConnection] When provided, the pool should attempt to obtain a different connection.
288 * @returns {Connection!}
289 * @throws {Error}
290 * @throws {BusyConnectionError}
291 * @internal
292 * @ignore
293 */
294 borrowConnection(previousConnection) {
295 return this.pool.borrowConnection(previousConnection);
296 }
297
298 /**
299 * Creates all the connection in the pool.
300 * @param {string} keyspace
301 * @internal
302 * @ignore
303 */
304 warmupPool(keyspace) {
305 return this.pool.warmup(keyspace);
306 }
307
308 /**
309 * Starts creating the pool in the background.
310 * @internal
311 * @ignore
312 */
313 initializePool() {
314 this.pool.increaseSize();
315 }
316 /**
317 * Gets any connection that is already opened or null if not found.
318 * @returns {Connection}
319 * @internal
320 * @ignore
321 */
322 getActiveConnection() {
323 if (!this.isUp() || !this.pool.connections.length) {
324 return null;
325 }
326 return this.pool.connections[0];
327 }
328
329 /**
330 * Internal method to get the amount of responses dequeued in the last interval (between 200ms and 400ms) on all
331 * connections to the host.
332 * @returns {Number}
333 * @internal
334 * @ignore
335 */
336 getResponseCount() {
337 // Last interval plus the current count
338 return this._healthResponseCounter + this.pool.responseCounter;
339 }
340
341 /**
342 * Checks the health of a connection in the pool
343 * @param {Connection} connection
344 * @internal
345 * @ignore
346 */
347 checkHealth(connection) {
348 if (connection.timedOutOperations <= this.options.socketOptions.defunctReadTimeoutThreshold) {
349 return;
350 }
351 this.removeFromPool(connection);
352 }
353
354 /**
355 * @param {Connection} connection
356 * @internal
357 * @ignore
358 */
359 removeFromPool(connection) {
360 this.pool.remove(connection);
361 this._checkPoolState();
362 }
363
364 /**
365 * Internal method that gets the amount of in-flight requests on all connections to the host.
366 * @internal
367 * @ignore
368 */
369 getInFlight() {
370 return this.pool.getInFlight();
371 }
372
373 /**
374 * Validates that the internal state of the connection pool.
375 * If the pool size is smaller than expected, schedule a new connection attempt.
376 * If the amount of connections is 0 for not ignored hosts, the host must be down.
377 * @private
378 */
379 _checkPoolState() {
380 if (this.pool.isClosing()) {
381 return;
382 }
383 if (this.pool.connections.length < this.pool.coreConnectionsLength) {
384 // the pool needs to grow / reconnect
385 if (!this.pool.hasScheduledNewConnection()) {
386 this.reconnectionDelay = this.reconnectionSchedule.next().value;
387 this.pool.scheduleNewConnectionAttempt(this.reconnectionDelay);
388 }
389 }
390 const shouldHaveConnections = this._distance !== types.distance.ignored && this.pool.coreConnectionsLength > 0;
391 if (shouldHaveConnections && this.pool.connections.length === 0) {
392 // Mark as DOWN, if its UP
393 this.setDown();
394 }
395 }
396
397 /**
398 * Executed after an scheduled new connection attempt finished
399 * @private
400 */
401 async _onNewConnectionOpen(err) {
402 if (err) {
403 this._checkPoolState();
404 return;
405 }
406 if (!this.isUp() && this.options.rePrepareOnUp) {
407 this.log('info', `Re-preparing all queries on host ${this.address} before setting it as UP`);
408 const allPrepared = this._metadata.getAllPrepared();
409 try {
410 await PrepareHandler.prepareAllQueries(this, allPrepared);
411 }
412 catch (err) {
413 this.log('warning', `Failed re-preparing on host ${this.address}: ${err}`, err);
414 }
415 }
416 this.setUp();
417 this.pool.increaseSize();
418 }
419
420 /**
421 * Returns an array containing the Cassandra Version as an Array of Numbers having the major version in the first
422 * position.
423 * @returns {Array.<Number>}
424 */
425 getCassandraVersion() {
426 if (!this.cassandraVersion) {
427 return utils.emptyArray;
428 }
429 return this.cassandraVersion.split('-')[0].split('.').map(x => parseInt(x, 10));
430 }
431
432 /**
433 * Gets the DSE version of the host as an Array, containing the major version in the first position.
434 * In case the cluster is not a DSE cluster, it returns an empty Array.
435 * @returns {Array}
436 */
437 getDseVersion() {
438 if (!this.dseVersion) {
439 return utils.emptyArray;
440 }
441 return this.dseVersion.split('-')[0].split('.').map(x => parseInt(x, 10));
442 }
443}
444
445/**
446 * Represents an associative-array of {@link Host hosts} that can be iterated.
447 * It creates an internal copy when adding or removing, making it safe to iterate using the values()
448 * method within async operations.
449 * @extends events.EventEmitter
450 * @constructor
451 */
452class HostMap extends events.EventEmitter{
453 constructor() {
454 super();
455
456 this._items = new Map();
457 this._values = null;
458
459 Object.defineProperty(this, 'length', { get: () => this.values().length, enumerable: true });
460
461 /**
462 * Emitted when a host is added to the map
463 * @event HostMap#add
464 */
465 /**
466 * Emitted when a host is removed from the map
467 * @event HostMap#remove
468 */
469 }
470
471 /**
472 * Executes a provided function once per map element.
473 * @param callback
474 */
475 forEach(callback) {
476 const items = this._items;
477 for (const [ key, value ] of items) {
478 callback(value, key);
479 }
480 }
481
482 /**
483 * Gets a {@link Host host} by key or undefined if not found.
484 * @param {String} key
485 * @returns {Host}
486 */
487 get(key) {
488 return this._items.get(key);
489 }
490
491 /**
492 * Returns an array of host addresses.
493 * @returns {Array.<String>}
494 */
495 keys() {
496 return Array.from(this._items.keys());
497 }
498
499 /**
500 * Removes an item from the map.
501 * @param {String} key The key of the host
502 * @fires HostMap#remove
503 */
504 remove(key) {
505 const value = this._items.get(key);
506 if (value === undefined) {
507 return;
508 }
509
510 // Clear cache
511 this._values = null;
512
513 // Copy the values
514 const copy = new Map(this._items);
515 copy.delete(key);
516
517 this._items = copy;
518 this.emit('remove', value);
519 }
520
521 /**
522 * Removes multiple hosts from the map.
523 * @param {Array.<String>} keys
524 * @fires HostMap#remove
525 */
526 removeMultiple(keys) {
527 // Clear value cache
528 this._values = null;
529
530 // Copy the values
531 const copy = new Map(this._items);
532 const removedHosts = [];
533
534 for (const key of keys) {
535 const h = copy.get(key);
536
537 if (!h) {
538 continue;
539 }
540
541 removedHosts.push(h);
542 copy.delete(key);
543 }
544
545 this._items = copy;
546 removedHosts.forEach(h => this.emit('remove', h));
547 }
548
549 /**
550 * Adds a new item to the map.
551 * @param {String} key The key of the host
552 * @param {Host} value The host to be added
553 * @fires HostMap#remove
554 * @fires HostMap#add
555 */
556 set(key, value) {
557 // Clear values cache
558 this._values = null;
559
560 const originalValue = this._items.get(key);
561 if (originalValue) {
562 //The internal structure does not change
563 this._items.set(key, value);
564 //emit a remove followed by a add
565 this.emit('remove', originalValue);
566 this.emit('add', value);
567 return;
568 }
569
570 // Copy the values
571 const copy = new Map(this._items);
572 copy.set(key, value);
573 this._items = copy;
574 this.emit('add', value);
575 return value;
576 }
577
578 /**
579 * Returns a shallow copy of a portion of the items into a new array object.
580 * Backward-compatibility.
581 * @param {Number} [begin]
582 * @param {Number} [end]
583 * @returns {Array}
584 * @ignore
585 */
586 slice(begin, end) {
587 if (!begin && !end) {
588 // Avoid making a copy of the copy
589 return this.values();
590 }
591
592 return this.values().slice(begin || 0, end);
593 }
594
595 /**
596 * Deprecated: Use set() instead.
597 * @ignore
598 * @deprecated
599 */
600 push(k, v) {
601 this.set(k, v);
602 }
603
604 /**
605 * Returns a shallow copy of the values of the map.
606 * @returns {Array.<Host>}
607 */
608 values() {
609 if (!this._values) {
610 // Cache the values
611 this._values = Object.freeze(Array.from(this._items.values()));
612 }
613
614 return this._values;
615 }
616
617 /**
618 * Removes all items from the map.
619 * @returns {Array.<Host>} The previous items
620 */
621 clear() {
622 const previousItems = this.values();
623
624 // Clear cache
625 this._values = null;
626
627 // Clear items
628 this._items = new Map();
629
630 // Emit events
631 previousItems.forEach(h => this.emit('remove', h));
632
633 return previousItems;
634 }
635
636 inspect() {
637 return this._items;
638 }
639
640 toJSON() {
641 // Node.js 10 and below don't support Object.fromEntries()
642 if (Object.fromEntries) {
643 return Object.fromEntries(this._items);
644 }
645
646 const obj = {};
647 for (const [ key, value ] of this._items) {
648 obj[key] = value;
649 }
650
651 return obj;
652 }
653}
654
655module.exports = {
656 Host,
657 HostMap
658};
\No newline at end of file