UNPKG

15.6 kBJavaScriptView Raw
1/*
2 * Copyright DataStax, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16'use strict';
17const util = require('util');
18const events = require('events');
19
20const Connection = require('./connection');
21const utils = require('./utils');
22const promiseUtils = require('./promise-utils');
23const errors = require('./errors');
24const clientOptions = require('./client-options');
25
26// Used to get the index of the connection with less in-flight requests
27let connectionIndex = 0;
28const connectionIndexOverflow = Math.pow(2, 15);
29
30let defaultOptions;
31
32/**
33 * Represents the possible states of the pool.
34 * Possible state transitions:
35 * - From initial to closing: The pool must be closed because the host is ignored.
36 * - From initial to shuttingDown: The pool is being shutdown as a result of a client shutdown.
37 * - From closing to initial state: The pool finished closing connections (is now ignored) and it resets to
38 * initial state in case the host is marked as local/remote in the future.
39 * - From closing to shuttingDown (rare): It was marked as ignored, now the client is being shutdown.
40 * - From shuttingDown to shutdown: Finished shutting down, the pool should not be reused.
41 * @private
42 */
43const state = {
44 // Initial state: open / opening / ready to be opened
45 initial: 0,
46 // When the pool is being closed as part of a distance change
47 closing: 1,
48 // When the pool is being shutdown for good
49 shuttingDown: 2,
50 // When the pool has being shutdown
51 shutDown: 4
52};
53
54/**
55 * Represents a pool of connections to a host
56 */
57class HostConnectionPool extends events.EventEmitter {
58 /**
59 * Creates a new instance of HostConnectionPool.
60 * @param {Host} host
61 * @param {Number} protocolVersion Initial protocol version
62 * @extends EventEmitter
63 */
64 constructor(host, protocolVersion) {
65 super();
66 this._address = host.address;
67 this._newConnectionTimeout = null;
68 this._state = state.initial;
69 this._opening = false;
70 this._host = host;
71 this.responseCounter = 0;
72 this.options = host.options;
73 this.protocolVersion = protocolVersion;
74 this.coreConnectionsLength = 1;
75 /**
76 * An immutable array of connections
77 * @type {Array.<Connection>}
78 */
79 this.connections = utils.emptyArray;
80 this.setMaxListeners(0);
81 this.log = utils.log;
82 }
83
84 getInFlight() {
85 const length = this.connections.length;
86 if (length === 1) {
87 return this.connections[0].getInFlight();
88 }
89
90 let sum = 0;
91 for (let i = 0; i < length; i++) {
92 sum += this.connections[i].getInFlight();
93 }
94 return sum;
95 }
96
97 /**
98 * Gets the least busy connection from the pool.
99 * @param {Connection} [previousConnection] When provided, the pool should attempt to obtain a different connection.
100 * @returns {Connection!}
101 * @throws {Error}
102 * @throws {BusyConnectionError}
103 */
104 borrowConnection(previousConnection) {
105 if (this.connections.length === 0) {
106 throw new Error('No connection available');
107 }
108
109 const maxRequests = this.options.pooling.maxRequestsPerConnection;
110 const c = HostConnectionPool.minInFlight(this.connections, maxRequests, previousConnection);
111
112 if (c.getInFlight() >= maxRequests) {
113 throw new errors.BusyConnectionError(this._address, maxRequests, this.connections.length);
114 }
115
116 return c;
117 }
118
119 /**
120 * Gets the connection with the minimum number of in-flight requests.
121 * Only checks for 2 connections (round-robin) and gets the one with minimum in-flight requests, as long as
122 * the amount of in-flight requests is lower than maxRequests.
123 * @param {Array.<Connection>} connections
124 * @param {Number} maxRequests
125 * @param {Connection} previousConnection When provided, it will attempt to obtain a different connection.
126 * @returns {Connection!}
127 */
128 static minInFlight(connections, maxRequests, previousConnection) {
129 const length = connections.length;
130 if (length === 1) {
131 return connections[0];
132 }
133
134 // Use a single index for all hosts as a simplified way to balance the load between connections
135 connectionIndex++;
136 if (connectionIndex >= connectionIndexOverflow) {
137 connectionIndex = 0;
138 }
139
140 let current;
141 for (let index = connectionIndex; index < connectionIndex + length; index++) {
142 current = connections[index % length];
143 if (current === previousConnection) {
144 // Increment the index and skip
145 current = connections[(++index) % length];
146 }
147
148 let next = connections[(index + 1) % length];
149 if (next === previousConnection) {
150 // Skip
151 next = connections[(index + 2) % length];
152 }
153
154 if (next.getInFlight() < current.getInFlight()) {
155 current = next;
156 }
157
158 if (current.getInFlight() < maxRequests) {
159 // Check as few connections as possible, as long as the amount of in-flight
160 // requests is lower than maxRequests
161 break;
162 }
163 }
164 return current;
165 }
166
167 /**
168 * Creates all the connections in the pool and switches the keyspace of each connection if needed.
169 * @param {string} keyspace
170 */
171 async warmup(keyspace) {
172 if (this.connections.length < this.coreConnectionsLength) {
173 while (this.connections.length < this.coreConnectionsLength) {
174 await this._attemptNewConnection();
175 }
176
177 this.log('info',
178 `Connection pool to host ${this._address} created with ${this.connections.length} connection(s)`);
179 } else {
180 this.log('info', `Connection pool to host ${this._address} contains ${this.connections.length} connection(s)`);
181 }
182
183 if (keyspace) {
184 try {
185 for (const connection of this.connections) {
186 await connection.changeKeyspace(keyspace);
187 }
188 } catch (err) {
189 // Log it and move on, it could be a momentary schema mismatch failure
190 this.log('warning', `Connection(s) to host ${this._address} could not be switched to keyspace ${keyspace}`);
191 }
192 }
193 }
194
195 /** @returns {Connection} */
196 _createConnection() {
197 const endpointOrServerName = !this.options.sni
198 ? this._address : this._host.hostId.toString();
199
200 const c = new Connection(endpointOrServerName, this.protocolVersion, this.options);
201 this._addListeners(c);
202 return c;
203 }
204
205 /** @param {Connection} c */
206 _addListeners(c) {
207 c.on('responseDequeued', () => this.responseCounter++);
208
209 const self = this;
210 function connectionErrorCallback() {
211 // The socket is not fully open / can not send heartbeat
212 self.remove(c);
213 }
214 c.on('idleRequestError', connectionErrorCallback);
215 c.on('socketClose', connectionErrorCallback);
216 }
217
218 addExistingConnection(c) {
219 this._addListeners(c);
220 // Use a copy of the connections array
221 this.connections = this.connections.slice(0);
222 this.connections.push(c);
223 }
224
225 /**
226 * Prevents reconnection timeout from triggering
227 */
228 clearNewConnectionAttempt() {
229 if (!this._newConnectionTimeout) {
230 return;
231 }
232 clearTimeout(this._newConnectionTimeout);
233 this._newConnectionTimeout = null;
234 }
235
236 /**
237 * Tries to open a new connection.
238 * If a connection is being opened, it will resolve when the existing open task completes.
239 * @returns {Promise<void>}
240 */
241 async _attemptNewConnection() {
242 if (this._opening) {
243 // Wait for the event to fire
244 return await promiseUtils.fromEvent(this, 'open');
245 }
246
247 this._opening = true;
248
249 const c = this._createConnection();
250 let err;
251
252 try {
253 await c.openAsync();
254 } catch (e) {
255 err = e;
256 this.log('warning', `Connection to ${this._address} could not be created: ${err}`, err);
257 }
258
259 if (this.isClosing()) {
260 this.log('info', `Connection to ${this._address} opened successfully but pool was being closed`);
261 err = new Error('Connection closed');
262 }
263
264 if (!err) {
265 // Append the connection to the pool.
266 // Use a copy of the connections array.
267 const newConnections = this.connections.slice(0);
268 newConnections.push(c);
269 this.connections = newConnections;
270 this.log('info', `Connection to ${this._address} opened successfully`);
271 } else {
272 promiseUtils.toBackground(c.closeAsync());
273 }
274
275 // Notify that creation finished by setting the flag and emitting the event
276 this._opening = false;
277 this.emit('open', err, c);
278
279 if (err) {
280 // Opening failed
281 throw err;
282 }
283 }
284
285 attemptNewConnectionImmediate() {
286 const self = this;
287 function openConnection() {
288 self.clearNewConnectionAttempt();
289 self.scheduleNewConnectionAttempt(0);
290 }
291
292 if (this._state === state.initial) {
293 return openConnection();
294 }
295
296 if (this._state === state.closing) {
297 return this.once('close', openConnection);
298 }
299 // In the case the pool its being / has been shutdown for good
300 // Do not attempt to create a new connection.
301 }
302
303 /**
304 * Closes the connection and removes a connection from the pool.
305 * @param {Connection} connection
306 */
307 remove(connection) {
308 // locating an object by position in the array is O(n), but normally there should be between 1 to 8 connections.
309 const index = this.connections.indexOf(connection);
310 if (index < 0) {
311 // it was already removed from the connections and it's closing
312 return;
313 }
314 // remove the connection from the pool, using an pool copy
315 const newConnections = this.connections.slice(0);
316 newConnections.splice(index, 1);
317 this.connections = newConnections;
318 // close the connection
319 setImmediate(function removeClose() {
320 connection.close();
321 });
322 this.emit('remove');
323 }
324
325 /**
326 * @param {Number} delay
327 */
328 scheduleNewConnectionAttempt(delay) {
329 if (this.isClosing()) {
330 return;
331 }
332
333 const self = this;
334
335 this._newConnectionTimeout = setTimeout(function newConnectionTimeoutExpired() {
336 self._newConnectionTimeout = null;
337 if (self.connections.length >= self.coreConnectionsLength) {
338 // new connection can be scheduled while a new connection is being opened
339 // the pool has the appropriate size
340 return;
341 }
342
343 if (delay > 0 && self.options.sni) {
344 // We use delay > 0 as an indication that it's a reconnection.
345 // A reconnection schedule can use delay = 0 as well, but it's a good enough signal.
346 promiseUtils.toBackground(self.options.sni.addressResolver.refresh().then(() => self._attemptNewConnection()));
347 return;
348 }
349
350 promiseUtils.toBackground(self._attemptNewConnection());
351 }, delay);
352 }
353
354 hasScheduledNewConnection() {
355 return !!this._newConnectionTimeout || this._opening;
356 }
357
358 /**
359 * Increases the size of the connection pool in the background, if needed.
360 */
361 increaseSize() {
362 if (this.connections.length < this.coreConnectionsLength && !this.hasScheduledNewConnection()) {
363 // schedule the next connection in the background
364 this.scheduleNewConnectionAttempt(0);
365 }
366 }
367
368 /**
369 * Gets the amount of responses and resets the internal counter.
370 * @returns {number}
371 */
372 getAndResetResponseCounter() {
373 const temp = this.responseCounter;
374 this.responseCounter = 0;
375 return temp;
376 }
377
378 /**
379 * Gets a boolean indicating if the pool is being closed / shutting down or has been shutdown.
380 */
381 isClosing() {
382 return this._state !== state.initial;
383 }
384
385 /**
386 * Gracefully waits for all in-flight requests to finish and closes the pool.
387 */
388 drainAndShutdown() {
389 if (this.isClosing()) {
390 // Its already closing / shutting down
391 return;
392 }
393
394 this._state = state.closing;
395 this.clearNewConnectionAttempt();
396
397 if (this.connections.length === 0) {
398 return this._afterClosing();
399 }
400
401 const self = this;
402 const connections = this.connections;
403 this.connections = utils.emptyArray;
404 let closedConnections = 0;
405 this.log('info', util.format('Draining and closing %d connections to %s', connections.length, this._address));
406 let wasClosed = false;
407 // eslint-disable-next-line prefer-const
408 let checkShutdownTimeout;
409
410 for (let i = 0; i < connections.length; i++) {
411 const c = connections[i];
412 if (c.getInFlight() === 0) {
413 getDelayedClose(c)();
414 continue;
415 }
416 c.emitDrain = true;
417 c.once('drain', getDelayedClose(c));
418 }
419
420 function getDelayedClose(connection) {
421 return (function delayedClose() {
422 connection.close();
423 if (++closedConnections < connections.length) {
424 return;
425 }
426 if (wasClosed) {
427 return;
428 }
429 wasClosed = true;
430 if (checkShutdownTimeout) {
431 clearTimeout(checkShutdownTimeout);
432 }
433 self._afterClosing();
434 });
435 }
436
437 // Check that after sometime (readTimeout + 100ms) the connections have been drained
438 const delay = (this.options.socketOptions.readTimeout || getDefaultOptions().socketOptions.readTimeout) + 100;
439 checkShutdownTimeout = setTimeout(function checkShutdown() {
440 wasClosed = true;
441 connections.forEach(function connectionEach(c) {
442 c.close();
443 });
444 self._afterClosing();
445 }, delay);
446 }
447
448 _afterClosing() {
449 const self = this;
450
451 function resetState() {
452 if (self._state === state.shuttingDown) {
453 self._state = state.shutDown;
454 } else {
455 self._state = state.initial;
456 }
457
458 self.emit('close');
459
460 if (self._state === state.shutDown) {
461 self.emit('shutdown');
462 }
463 }
464
465 if (this._opening) {
466 // The pool is growing, reset the state back to init once the open finished (without any new connection)
467 return this.once('open', resetState);
468 }
469
470 resetState();
471 }
472
473 /**
474 * @returns {Promise<void>}
475 */
476 async shutdown() {
477 this.clearNewConnectionAttempt();
478
479 if (!this.connections.length) {
480 this._state = state.shutDown;
481 return;
482 }
483
484 const previousState = this._state;
485 this._state = state.shuttingDown;
486
487 if (previousState === state.closing || previousState === state.shuttingDown) {
488 // When previous state was closing, it will drain all connections and close them
489 // When previous state was "shuttingDown", it will close all the connections
490 // Once it's completed, shutdown event will be emitted
491 return promiseUtils.fromEvent(this, 'shutdown');
492 }
493
494 await this._closeAllConnections();
495
496 this._state = state.shutDown;
497 this.emit('shutdown');
498 }
499
500 async _closeAllConnections() {
501 const connections = this.connections;
502 // point to an empty array
503 this.connections = utils.emptyArray;
504 if (connections.length === 0) {
505 return;
506 }
507
508 this.log('info', util.format('Closing %d connections to %s', connections.length, this._address));
509
510 await Promise.all(connections.map(c => c.closeAsync()));
511 }
512}
513
514/** Lazily loads the default options */
515function getDefaultOptions() {
516 if (defaultOptions === undefined) {
517 defaultOptions = clientOptions.defaultOptions();
518 }
519 return defaultOptions;
520}
521
522module.exports = HostConnectionPool;
\No newline at end of file