UNPKG

9.73 kBJavaScriptView Raw
1/**
2 * Manager of connections to a node(s), capable of ensuring that connections are clear and living
3 * before providing them to the application
4 *
5 * @class ConnectionPool
6 * @constructor
7 * @param {Object} config - The config object passed to the transport.
8 */
9
10module.exports = ConnectionPool;
11
12var _ = require('lodash');
13var utils = require('./utils');
14var Log = require('./log');
15
16function ConnectionPool(config) {
17 config = config || {};
18 utils.makeBoundMethods(this);
19
20 if (!config.log) {
21 this.log = new Log();
22 config.log = this.log;
23 } else {
24 this.log = config.log;
25 }
26
27 // we will need this when we create connections down the road
28 this._config = config;
29
30 // get the selector config var
31 this.selector = utils.funcEnum(
32 config,
33 'selector',
34 ConnectionPool.selectors,
35 ConnectionPool.defaultSelector
36 );
37
38 // get the connection class
39 this.Connection = utils.funcEnum(
40 config,
41 'connectionClass',
42 ConnectionPool.connectionClasses,
43 ConnectionPool.defaultConnectionClass
44 );
45
46 // time that connections will wait before being revived
47 this.deadTimeout = config.hasOwnProperty('deadTimeout')
48 ? config.deadTimeout
49 : 60000;
50 this.maxDeadTimeout = config.hasOwnProperty('maxDeadTimeout')
51 ? config.maxDeadTimeout
52 : 18e5;
53 this.calcDeadTimeout = utils.funcEnum(
54 config,
55 'calcDeadTimeout',
56 ConnectionPool.calcDeadTimeoutOptions,
57 'exponential'
58 );
59
60 // a map of connections to their "id" property, used when sniffing
61 this.index = {};
62
63 this._conns = {
64 alive: [],
65 dead: [],
66 };
67
68 // information about timeouts for dead connections
69 this._timeouts = [];
70}
71
72// selector options
73ConnectionPool.selectors = require('./selectors');
74ConnectionPool.defaultSelector = 'roundRobin';
75
76// get the connection options
77ConnectionPool.connectionClasses = require('./connectors');
78ConnectionPool.defaultConnectionClass =
79 ConnectionPool.connectionClasses._default;
80delete ConnectionPool.connectionClasses._default;
81
82// the function that calculates timeouts based on attempts
83ConnectionPool.calcDeadTimeoutOptions = {
84 flat: function(attempt, baseTimeout) {
85 return baseTimeout;
86 },
87 exponential: function(attempt, baseTimeout) {
88 return Math.min(
89 baseTimeout * 2 * Math.pow(2, attempt * 0.5 - 1),
90 this.maxDeadTimeout
91 );
92 },
93};
94
95/**
96 * Selects a connection from the list using the this.selector
97 * Features:
98 * - detects if the selector is async or not
99 * - sync selectors should still return asynchronously
100 * - catches errors in sync selectors
101 * - automatically selects the first dead connection when there no living connections
102 *
103 * @param {Function} cb [description]
104 * @return {[type]} [description]
105 */
106ConnectionPool.prototype.select = function(cb) {
107 if (this._conns.alive.length) {
108 if (this.selector.length > 1) {
109 this.selector(this._conns.alive, cb);
110 } else {
111 try {
112 utils.nextTick(cb, void 0, this.selector(this._conns.alive));
113 } catch (e) {
114 cb(e);
115 }
116 }
117 } else if (this._timeouts.length) {
118 this._selectDeadConnection(cb);
119 } else {
120 utils.nextTick(cb, void 0);
121 }
122};
123
124/**
125 * Handler for the "set status" event emitted but the connections. It will move
126 * the connection to it's proper connection list (unless it was closed).
127 *
128 * @param {String} status - the connection's new status
129 * @param {String} oldStatus - the connection's old status
130 * @param {ConnectionAbstract} connection - the connection object itself
131 */
132ConnectionPool.prototype.onStatusSet = utils.handler(function(
133 status,
134 oldStatus,
135 connection
136) {
137 var index;
138
139 var died = status === 'dead';
140 var wasAlreadyDead = died && oldStatus === 'dead';
141 var revived = !died && oldStatus === 'dead';
142 var noChange = oldStatus === status;
143 var from = this._conns[oldStatus];
144 var to = this._conns[status];
145
146 if (noChange && !died) {
147 return true;
148 }
149
150 if (from !== to) {
151 if (_.isArray(from)) {
152 index = from.indexOf(connection);
153 if (index !== -1) {
154 from.splice(index, 1);
155 }
156 }
157
158 if (_.isArray(to)) {
159 index = to.indexOf(connection);
160 if (index === -1) {
161 to.push(connection);
162 }
163 }
164 }
165
166 if (died) {
167 this._onConnectionDied(connection, wasAlreadyDead);
168 }
169
170 if (revived) {
171 this._onConnectionRevived(connection);
172 }
173});
174
175/**
176 * Handler used to clear the times created when a connection dies
177 * @param {ConnectionAbstract} connection
178 */
179ConnectionPool.prototype._onConnectionRevived = function(connection) {
180 var timeout;
181 for (var i = 0; i < this._timeouts.length; i++) {
182 if (this._timeouts[i].conn === connection) {
183 timeout = this._timeouts[i];
184 if (timeout.id) {
185 clearTimeout(timeout.id);
186 }
187 this._timeouts.splice(i, 1);
188 break;
189 }
190 }
191};
192
193/**
194 * Handler used to update or create a timeout for the connection which has died
195 * @param {ConnectionAbstract} connection
196 * @param {Boolean} alreadyWasDead - If the connection was preivously dead this must be set to true
197 */
198ConnectionPool.prototype._onConnectionDied = function(
199 connection,
200 alreadyWasDead
201) {
202 var timeout;
203 if (alreadyWasDead) {
204 for (var i = 0; i < this._timeouts.length; i++) {
205 if (this._timeouts[i].conn === connection) {
206 timeout = this._timeouts[i];
207 break;
208 }
209 }
210 } else {
211 timeout = {
212 conn: connection,
213 attempt: 0,
214 revive: function(cb) {
215 timeout.attempt++;
216 connection.ping(function(err) {
217 connection.setStatus(err ? 'dead' : 'alive');
218 if (cb && typeof cb === 'function') {
219 cb(err);
220 }
221 });
222 },
223 };
224 this._timeouts.push(timeout);
225 }
226
227 if (timeout.id) {
228 clearTimeout(timeout.id);
229 }
230
231 var ms = this.calcDeadTimeout(timeout.attempt, this.deadTimeout);
232 timeout.id = setTimeout(timeout.revive, ms);
233 timeout.runAt = utils.now() + ms;
234};
235
236ConnectionPool.prototype._selectDeadConnection = function(cb) {
237 var orderedTimeouts = _.sortBy(this._timeouts, 'runAt');
238 var log = this.log;
239
240 process.nextTick(function next() {
241 var timeout = orderedTimeouts.shift();
242 if (!timeout) {
243 cb(void 0);
244 return;
245 }
246
247 if (!timeout.conn) {
248 next();
249 return;
250 }
251
252 if (timeout.conn.status === 'dead') {
253 timeout.revive(function(err) {
254 if (err) {
255 log.warning('Unable to revive connection: ' + timeout.conn.id);
256 process.nextTick(next);
257 } else {
258 cb(void 0, timeout.conn);
259 }
260 });
261 } else {
262 cb(void 0, timeout.conn);
263 }
264 });
265};
266
267/**
268 * Returns a random list of nodes from the living connections up to the limit.
269 * If there are no living connections it will fall back to the dead connections.
270 * If there are no dead connections it will return nothing.
271 *
272 * This is used for testing (when we just want the one existing node)
273 * and sniffing, where using the selector to get all of the living connections
274 * is not reasonable.
275 *
276 * @param {string} [status] - optional status of the connection to fetch
277 * @param {Number} [limit] - optional limit on the number of connections to return
278 */
279ConnectionPool.prototype.getConnections = function(status, limit) {
280 var list;
281 if (status) {
282 list = this._conns[status];
283 } else {
284 list = this._conns[this._conns.alive.length ? 'alive' : 'dead'];
285 }
286
287 if (limit == null) {
288 return list.slice(0);
289 } else {
290 return _.shuffle(list).slice(0, limit);
291 }
292};
293
294/**
295 * Add a single connection to the pool and change it's status to "alive".
296 * The connection should inherit from ConnectionAbstract
297 *
298 * @param {ConnectionAbstract} connection - The connection to add
299 */
300ConnectionPool.prototype.addConnection = function(connection) {
301 if (!connection.id) {
302 connection.id = connection.host.toString();
303 }
304
305 if (!this.index[connection.id]) {
306 this.log.info('Adding connection to', connection.id);
307 this.index[connection.id] = connection;
308 connection.on('status set', this.bound.onStatusSet);
309 connection.setStatus('alive');
310 }
311};
312
313/**
314 * Remove a connection from the pool, and set it's status to "closed".
315 *
316 * @param {ConnectionAbstract} connection - The connection to remove/close
317 */
318ConnectionPool.prototype.removeConnection = function(connection) {
319 if (!connection.id) {
320 connection.id = connection.host.toString();
321 }
322
323 if (this.index[connection.id]) {
324 delete this.index[connection.id];
325 connection.setStatus('closed');
326 connection.removeListener('status set', this.bound.onStatusSet);
327 }
328};
329
330/**
331 * Override the internal node list. All connections that are not in the new host
332 * list are closed and removed. Non-unique hosts are ignored.
333 *
334 * @param {Host[]} hosts - An array of Host instances.
335 */
336ConnectionPool.prototype.setHosts = function(hosts) {
337 var connection;
338 var i;
339 var id;
340 var host;
341 var toRemove = _.clone(this.index);
342
343 for (i = 0; i < hosts.length; i++) {
344 host = hosts[i];
345 id = host.toString();
346 if (this.index[id]) {
347 delete toRemove[id];
348 } else {
349 connection = new this.Connection(host, this._config);
350 connection.id = id;
351 this.addConnection(connection);
352 }
353 }
354
355 var removeIds = _.keys(toRemove);
356 for (i = 0; i < removeIds.length; i++) {
357 this.removeConnection(this.index[removeIds[i]]);
358 }
359};
360
361ConnectionPool.prototype.getAllHosts = function() {
362 return _.values(this.index).map(function(connection) {
363 return connection.host;
364 });
365};
366
367/**
368 * Close the conncetion pool, as well as all of it's connections
369 */
370ConnectionPool.prototype.close = function() {
371 this.setHosts([]);
372};
373ConnectionPool.prototype.empty = ConnectionPool.prototype.close;