1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 | 'use strict';
|
17 | const util = require('util');
|
18 | const events = require('events');
|
19 |
|
20 | const Connection = require('./connection');
|
21 | const utils = require('./utils');
|
22 | const promiseUtils = require('./promise-utils');
|
23 | const errors = require('./errors');
|
24 | const clientOptions = require('./client-options');
|
25 |
|
26 |
|
27 | let connectionIndex = 0;
|
28 | const connectionIndexOverflow = Math.pow(2, 15);
|
29 |
|
30 | let defaultOptions;
|
31 |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 | const state = {
|
44 |
|
45 | initial: 0,
|
46 |
|
47 | closing: 1,
|
48 |
|
49 | shuttingDown: 2,
|
50 |
|
51 | shutDown: 4
|
52 | };
|
53 |
|
54 |
|
55 |
|
56 |
|
57 | class HostConnectionPool extends events.EventEmitter {
|
58 | |
59 |
|
60 |
|
61 |
|
62 |
|
63 |
|
64 | constructor(host, protocolVersion) {
|
65 | super();
|
66 | this._address = host.address;
|
67 | this._newConnectionTimeout = null;
|
68 | this._state = state.initial;
|
69 | this._opening = false;
|
70 | this._host = host;
|
71 | this.responseCounter = 0;
|
72 | this.options = host.options;
|
73 | this.protocolVersion = protocolVersion;
|
74 | this.coreConnectionsLength = 1;
|
75 | |
76 |
|
77 |
|
78 |
|
79 | this.connections = utils.emptyArray;
|
80 | this.setMaxListeners(0);
|
81 | this.log = utils.log;
|
82 | }
|
83 |
|
84 | getInFlight() {
|
85 | const length = this.connections.length;
|
86 | if (length === 1) {
|
87 | return this.connections[0].getInFlight();
|
88 | }
|
89 |
|
90 | let sum = 0;
|
91 | for (let i = 0; i < length; i++) {
|
92 | sum += this.connections[i].getInFlight();
|
93 | }
|
94 | return sum;
|
95 | }
|
96 |
|
97 | |
98 |
|
99 |
|
100 |
|
101 |
|
102 |
|
103 |
|
104 | borrowConnection(previousConnection) {
|
105 | if (this.connections.length === 0) {
|
106 | throw new Error('No connection available');
|
107 | }
|
108 |
|
109 | const maxRequests = this.options.pooling.maxRequestsPerConnection;
|
110 | const c = HostConnectionPool.minInFlight(this.connections, maxRequests, previousConnection);
|
111 |
|
112 | if (c.getInFlight() >= maxRequests) {
|
113 | throw new errors.BusyConnectionError(this._address, maxRequests, this.connections.length);
|
114 | }
|
115 |
|
116 | return c;
|
117 | }
|
118 |
|
119 | |
120 |
|
121 |
|
122 |
|
123 |
|
124 |
|
125 |
|
126 |
|
127 |
|
128 | static minInFlight(connections, maxRequests, previousConnection) {
|
129 | const length = connections.length;
|
130 | if (length === 1) {
|
131 | return connections[0];
|
132 | }
|
133 |
|
134 |
|
135 | connectionIndex++;
|
136 | if (connectionIndex >= connectionIndexOverflow) {
|
137 | connectionIndex = 0;
|
138 | }
|
139 |
|
140 | let current;
|
141 | for (let index = connectionIndex; index < connectionIndex + length; index++) {
|
142 | current = connections[index % length];
|
143 | if (current === previousConnection) {
|
144 |
|
145 | current = connections[(++index) % length];
|
146 | }
|
147 |
|
148 | let next = connections[(index + 1) % length];
|
149 | if (next === previousConnection) {
|
150 |
|
151 | next = connections[(index + 2) % length];
|
152 | }
|
153 |
|
154 | if (next.getInFlight() < current.getInFlight()) {
|
155 | current = next;
|
156 | }
|
157 |
|
158 | if (current.getInFlight() < maxRequests) {
|
159 |
|
160 |
|
161 | break;
|
162 | }
|
163 | }
|
164 | return current;
|
165 | }
|
166 |
|
167 | |
168 |
|
169 |
|
170 |
|
171 | async warmup(keyspace) {
|
172 | if (this.connections.length < this.coreConnectionsLength) {
|
173 | while (this.connections.length < this.coreConnectionsLength) {
|
174 | await this._attemptNewConnection();
|
175 | }
|
176 |
|
177 | this.log('info',
|
178 | `Connection pool to host ${this._address} created with ${this.connections.length} connection(s)`);
|
179 | } else {
|
180 | this.log('info', `Connection pool to host ${this._address} contains ${this.connections.length} connection(s)`);
|
181 | }
|
182 |
|
183 | if (keyspace) {
|
184 | try {
|
185 | for (const connection of this.connections) {
|
186 | await connection.changeKeyspace(keyspace);
|
187 | }
|
188 | } catch (err) {
|
189 |
|
190 | this.log('warning', `Connection(s) to host ${this._address} could not be switched to keyspace ${keyspace}`);
|
191 | }
|
192 | }
|
193 | }
|
194 |
|
195 |
|
196 | _createConnection() {
|
197 | const endpointOrServerName = !this.options.sni
|
198 | ? this._address : this._host.hostId.toString();
|
199 |
|
200 | const c = new Connection(endpointOrServerName, this.protocolVersion, this.options);
|
201 | this._addListeners(c);
|
202 | return c;
|
203 | }
|
204 |
|
205 |
|
206 | _addListeners(c) {
|
207 | c.on('responseDequeued', () => this.responseCounter++);
|
208 |
|
209 | const self = this;
|
210 | function connectionErrorCallback() {
|
211 |
|
212 | self.remove(c);
|
213 | }
|
214 | c.on('idleRequestError', connectionErrorCallback);
|
215 | c.on('socketClose', connectionErrorCallback);
|
216 | }
|
217 |
|
218 | addExistingConnection(c) {
|
219 | this._addListeners(c);
|
220 |
|
221 | this.connections = this.connections.slice(0);
|
222 | this.connections.push(c);
|
223 | }
|
224 |
|
225 | |
226 |
|
227 |
|
228 | clearNewConnectionAttempt() {
|
229 | if (!this._newConnectionTimeout) {
|
230 | return;
|
231 | }
|
232 | clearTimeout(this._newConnectionTimeout);
|
233 | this._newConnectionTimeout = null;
|
234 | }
|
235 |
|
236 | |
237 |
|
238 |
|
239 |
|
240 |
|
241 | async _attemptNewConnection() {
|
242 | if (this._opening) {
|
243 |
|
244 | return await promiseUtils.fromEvent(this, 'open');
|
245 | }
|
246 |
|
247 | this._opening = true;
|
248 |
|
249 | const c = this._createConnection();
|
250 | let err;
|
251 |
|
252 | try {
|
253 | await c.openAsync();
|
254 | } catch (e) {
|
255 | err = e;
|
256 | this.log('warning', `Connection to ${this._address} could not be created: ${err}`, err);
|
257 | }
|
258 |
|
259 | if (this.isClosing()) {
|
260 | this.log('info', `Connection to ${this._address} opened successfully but pool was being closed`);
|
261 | err = new Error('Connection closed');
|
262 | }
|
263 |
|
264 | if (!err) {
|
265 |
|
266 |
|
267 | const newConnections = this.connections.slice(0);
|
268 | newConnections.push(c);
|
269 | this.connections = newConnections;
|
270 | this.log('info', `Connection to ${this._address} opened successfully`);
|
271 | } else {
|
272 | promiseUtils.toBackground(c.closeAsync());
|
273 | }
|
274 |
|
275 |
|
276 | this._opening = false;
|
277 | this.emit('open', err, c);
|
278 |
|
279 | if (err) {
|
280 |
|
281 | throw err;
|
282 | }
|
283 | }
|
284 |
|
285 | attemptNewConnectionImmediate() {
|
286 | const self = this;
|
287 | function openConnection() {
|
288 | self.clearNewConnectionAttempt();
|
289 | self.scheduleNewConnectionAttempt(0);
|
290 | }
|
291 |
|
292 | if (this._state === state.initial) {
|
293 | return openConnection();
|
294 | }
|
295 |
|
296 | if (this._state === state.closing) {
|
297 | return this.once('close', openConnection);
|
298 | }
|
299 |
|
300 |
|
301 | }
|
302 |
|
303 | |
304 |
|
305 |
|
306 |
|
307 | remove(connection) {
|
308 |
|
309 | const index = this.connections.indexOf(connection);
|
310 | if (index < 0) {
|
311 |
|
312 | return;
|
313 | }
|
314 |
|
315 | const newConnections = this.connections.slice(0);
|
316 | newConnections.splice(index, 1);
|
317 | this.connections = newConnections;
|
318 |
|
319 | setImmediate(function removeClose() {
|
320 | connection.close();
|
321 | });
|
322 | this.emit('remove');
|
323 | }
|
324 |
|
325 | |
326 |
|
327 |
|
328 | scheduleNewConnectionAttempt(delay) {
|
329 | if (this.isClosing()) {
|
330 | return;
|
331 | }
|
332 |
|
333 | const self = this;
|
334 |
|
335 | this._newConnectionTimeout = setTimeout(function newConnectionTimeoutExpired() {
|
336 | self._newConnectionTimeout = null;
|
337 | if (self.connections.length >= self.coreConnectionsLength) {
|
338 |
|
339 |
|
340 | return;
|
341 | }
|
342 |
|
343 | if (delay > 0 && self.options.sni) {
|
344 |
|
345 |
|
346 | promiseUtils.toBackground(self.options.sni.addressResolver.refresh().then(() => self._attemptNewConnection()));
|
347 | return;
|
348 | }
|
349 |
|
350 | promiseUtils.toBackground(self._attemptNewConnection());
|
351 | }, delay);
|
352 | }
|
353 |
|
354 | hasScheduledNewConnection() {
|
355 | return !!this._newConnectionTimeout || this._opening;
|
356 | }
|
357 |
|
358 | |
359 |
|
360 |
|
361 | increaseSize() {
|
362 | if (this.connections.length < this.coreConnectionsLength && !this.hasScheduledNewConnection()) {
|
363 |
|
364 | this.scheduleNewConnectionAttempt(0);
|
365 | }
|
366 | }
|
367 |
|
368 | |
369 |
|
370 |
|
371 |
|
372 | getAndResetResponseCounter() {
|
373 | const temp = this.responseCounter;
|
374 | this.responseCounter = 0;
|
375 | return temp;
|
376 | }
|
377 |
|
378 | |
379 |
|
380 |
|
381 | isClosing() {
|
382 | return this._state !== state.initial;
|
383 | }
|
384 |
|
385 | |
386 |
|
387 |
|
388 | drainAndShutdown() {
|
389 | if (this.isClosing()) {
|
390 |
|
391 | return;
|
392 | }
|
393 |
|
394 | this._state = state.closing;
|
395 | this.clearNewConnectionAttempt();
|
396 |
|
397 | if (this.connections.length === 0) {
|
398 | return this._afterClosing();
|
399 | }
|
400 |
|
401 | const self = this;
|
402 | const connections = this.connections;
|
403 | this.connections = utils.emptyArray;
|
404 | let closedConnections = 0;
|
405 | this.log('info', util.format('Draining and closing %d connections to %s', connections.length, this._address));
|
406 | let wasClosed = false;
|
407 |
|
408 | let checkShutdownTimeout;
|
409 |
|
410 | for (let i = 0; i < connections.length; i++) {
|
411 | const c = connections[i];
|
412 | if (c.getInFlight() === 0) {
|
413 | getDelayedClose(c)();
|
414 | continue;
|
415 | }
|
416 | c.emitDrain = true;
|
417 | c.once('drain', getDelayedClose(c));
|
418 | }
|
419 |
|
420 | function getDelayedClose(connection) {
|
421 | return (function delayedClose() {
|
422 | connection.close();
|
423 | if (++closedConnections < connections.length) {
|
424 | return;
|
425 | }
|
426 | if (wasClosed) {
|
427 | return;
|
428 | }
|
429 | wasClosed = true;
|
430 | if (checkShutdownTimeout) {
|
431 | clearTimeout(checkShutdownTimeout);
|
432 | }
|
433 | self._afterClosing();
|
434 | });
|
435 | }
|
436 |
|
437 |
|
438 | const delay = (this.options.socketOptions.readTimeout || getDefaultOptions().socketOptions.readTimeout) + 100;
|
439 | checkShutdownTimeout = setTimeout(function checkShutdown() {
|
440 | wasClosed = true;
|
441 | connections.forEach(function connectionEach(c) {
|
442 | c.close();
|
443 | });
|
444 | self._afterClosing();
|
445 | }, delay);
|
446 | }
|
447 |
|
448 | _afterClosing() {
|
449 | const self = this;
|
450 |
|
451 | function resetState() {
|
452 | if (self._state === state.shuttingDown) {
|
453 | self._state = state.shutDown;
|
454 | } else {
|
455 | self._state = state.initial;
|
456 | }
|
457 |
|
458 | self.emit('close');
|
459 |
|
460 | if (self._state === state.shutDown) {
|
461 | self.emit('shutdown');
|
462 | }
|
463 | }
|
464 |
|
465 | if (this._opening) {
|
466 |
|
467 | return this.once('open', resetState);
|
468 | }
|
469 |
|
470 | resetState();
|
471 | }
|
472 |
|
473 | |
474 |
|
475 |
|
476 | async shutdown() {
|
477 | this.clearNewConnectionAttempt();
|
478 |
|
479 | if (!this.connections.length) {
|
480 | this._state = state.shutDown;
|
481 | return;
|
482 | }
|
483 |
|
484 | const previousState = this._state;
|
485 | this._state = state.shuttingDown;
|
486 |
|
487 | if (previousState === state.closing || previousState === state.shuttingDown) {
|
488 |
|
489 |
|
490 |
|
491 | return promiseUtils.fromEvent(this, 'shutdown');
|
492 | }
|
493 |
|
494 | await this._closeAllConnections();
|
495 |
|
496 | this._state = state.shutDown;
|
497 | this.emit('shutdown');
|
498 | }
|
499 |
|
500 | async _closeAllConnections() {
|
501 | const connections = this.connections;
|
502 |
|
503 | this.connections = utils.emptyArray;
|
504 | if (connections.length === 0) {
|
505 | return;
|
506 | }
|
507 |
|
508 | this.log('info', util.format('Closing %d connections to %s', connections.length, this._address));
|
509 |
|
510 | await Promise.all(connections.map(c => c.closeAsync()));
|
511 | }
|
512 | }
|
513 |
|
514 |
|
515 | function getDefaultOptions() {
|
516 | if (defaultOptions === undefined) {
|
517 | defaultOptions = clientOptions.defaultOptions();
|
518 | }
|
519 | return defaultOptions;
|
520 | }
|
521 |
|
522 | module.exports = HostConnectionPool; |
\ | No newline at end of file |