UNPKG

6.76 kBJavaScriptView Raw
1var Pool = require('./Pool');
2var PoolConfig = require('./PoolConfig');
3var PoolNamespace = require('./PoolNamespace');
4var PoolSelector = require('./PoolSelector');
5var Util = require('util');
6var EventEmitter = require('events').EventEmitter;
7
8module.exports = PoolCluster;
9
10/**
11 * PoolCluster
12 * @constructor
13 * @param {object} [config] The pool cluster configuration
14 * @public
15 */
16function PoolCluster(config) {
17 EventEmitter.call(this);
18
19 config = config || {};
20 this._canRetry = typeof config.canRetry === 'undefined' ? true : config.canRetry;
21 this._defaultSelector = config.defaultSelector || 'RR';
22 this._removeNodeErrorCount = config.removeNodeErrorCount || 5;
23 this._restoreNodeTimeout = config.restoreNodeTimeout || 0;
24
25 this._closed = false;
26 this._findCaches = Object.create(null);
27 this._lastId = 0;
28 this._namespaces = Object.create(null);
29 this._nodes = Object.create(null);
30}
31
32Util.inherits(PoolCluster, EventEmitter);
33
34PoolCluster.prototype.add = function add(id, config) {
35 if (this._closed) {
36 throw new Error('PoolCluster is closed.');
37 }
38
39 var nodeId = typeof id === 'object'
40 ? 'CLUSTER::' + (++this._lastId)
41 : String(id);
42
43 if (this._nodes[nodeId] !== undefined) {
44 throw new Error('Node ID "' + nodeId + '" is already defined in PoolCluster.');
45 }
46
47 var poolConfig = typeof id !== 'object'
48 ? new PoolConfig(config)
49 : new PoolConfig(id);
50
51 this._nodes[nodeId] = {
52 id : nodeId,
53 errorCount : 0,
54 pool : new Pool({config: poolConfig}),
55 _offlineUntil : 0
56 };
57
58 this._clearFindCaches();
59};
60
61PoolCluster.prototype.end = function end(callback) {
62 var cb = callback !== undefined
63 ? callback
64 : _cb;
65
66 if (typeof cb !== 'function') {
67 throw TypeError('callback argument must be a function');
68 }
69
70 if (this._closed) {
71 process.nextTick(cb);
72 return;
73 }
74
75 this._closed = true;
76
77 var calledBack = false;
78 var nodeIds = Object.keys(this._nodes);
79 var waitingClose = 0;
80
81 function onEnd(err) {
82 if (!calledBack && (err || --waitingClose <= 0)) {
83 calledBack = true;
84 cb(err);
85 }
86 }
87
88 for (var i = 0; i < nodeIds.length; i++) {
89 var nodeId = nodeIds[i];
90 var node = this._nodes[nodeId];
91
92 waitingClose++;
93 node.pool.end(onEnd);
94 }
95
96 if (waitingClose === 0) {
97 process.nextTick(onEnd);
98 }
99};
100
101PoolCluster.prototype.of = function(pattern, selector) {
102 pattern = pattern || '*';
103
104 selector = selector || this._defaultSelector;
105 selector = selector.toUpperCase();
106 if (typeof PoolSelector[selector] === 'undefined') {
107 selector = this._defaultSelector;
108 }
109
110 var key = pattern + selector;
111
112 if (typeof this._namespaces[key] === 'undefined') {
113 this._namespaces[key] = new PoolNamespace(this, pattern, selector);
114 }
115
116 return this._namespaces[key];
117};
118
119PoolCluster.prototype.remove = function remove(pattern) {
120 var foundNodeIds = this._findNodeIds(pattern, true);
121
122 for (var i = 0; i < foundNodeIds.length; i++) {
123 var node = this._getNode(foundNodeIds[i]);
124
125 if (node) {
126 this._removeNode(node);
127 }
128 }
129};
130
131PoolCluster.prototype.getConnection = function(pattern, selector, cb) {
132 var namespace;
133 if (typeof pattern === 'function') {
134 cb = pattern;
135 namespace = this.of();
136 } else {
137 if (typeof selector === 'function') {
138 cb = selector;
139 selector = this._defaultSelector;
140 }
141
142 namespace = this.of(pattern, selector);
143 }
144
145 namespace.getConnection(cb);
146};
147
148PoolCluster.prototype._clearFindCaches = function _clearFindCaches() {
149 this._findCaches = Object.create(null);
150};
151
152PoolCluster.prototype._decreaseErrorCount = function _decreaseErrorCount(node) {
153 var errorCount = node.errorCount;
154
155 if (errorCount > this._removeNodeErrorCount) {
156 errorCount = this._removeNodeErrorCount;
157 }
158
159 if (errorCount < 1) {
160 errorCount = 1;
161 }
162
163 node.errorCount = errorCount - 1;
164
165 if (node._offlineUntil) {
166 node._offlineUntil = 0;
167 this.emit('online', node.id);
168 }
169};
170
171PoolCluster.prototype._findNodeIds = function _findNodeIds(pattern, includeOffline) {
172 var currentTime = 0;
173 var foundNodeIds = this._findCaches[pattern];
174
175 if (foundNodeIds === undefined) {
176 var expression = patternRegExp(pattern);
177 var nodeIds = Object.keys(this._nodes);
178
179 foundNodeIds = nodeIds.filter(function (id) {
180 return id.match(expression);
181 });
182
183 this._findCaches[pattern] = foundNodeIds;
184 }
185
186 if (includeOffline) {
187 return foundNodeIds;
188 }
189
190 return foundNodeIds.filter(function (nodeId) {
191 var node = this._getNode(nodeId);
192
193 if (!node._offlineUntil) {
194 return true;
195 }
196
197 if (!currentTime) {
198 currentTime = getMonotonicMilliseconds();
199 }
200
201 return node._offlineUntil <= currentTime;
202 }, this);
203};
204
205PoolCluster.prototype._getNode = function _getNode(id) {
206 return this._nodes[id] || null;
207};
208
209PoolCluster.prototype._increaseErrorCount = function _increaseErrorCount(node) {
210 var errorCount = ++node.errorCount;
211
212 if (this._removeNodeErrorCount > errorCount) {
213 return;
214 }
215
216 if (this._restoreNodeTimeout > 0) {
217 node._offlineUntil = getMonotonicMilliseconds() + this._restoreNodeTimeout;
218 this.emit('offline', node.id);
219 return;
220 }
221
222 this._removeNode(node);
223 this.emit('remove', node.id);
224};
225
226PoolCluster.prototype._getConnection = function(node, cb) {
227 var self = this;
228
229 node.pool.getConnection(function (err, connection) {
230 if (err) {
231 self._increaseErrorCount(node);
232 cb(err);
233 return;
234 } else {
235 self._decreaseErrorCount(node);
236 }
237
238 connection._clusterId = node.id;
239
240 cb(null, connection);
241 });
242};
243
244PoolCluster.prototype._removeNode = function _removeNode(node) {
245 delete this._nodes[node.id];
246
247 this._clearFindCaches();
248
249 node.pool.end(_noop);
250};
251
252function getMonotonicMilliseconds() {
253 var ms;
254
255 if (typeof process.hrtime === 'function') {
256 ms = process.hrtime();
257 ms = ms[0] * 1e3 + ms[1] * 1e-6;
258 } else {
259 ms = process.uptime() * 1000;
260 }
261
262 return Math.floor(ms);
263}
264
265function isRegExp(val) {
266 return typeof val === 'object'
267 && Object.prototype.toString.call(val) === '[object RegExp]';
268}
269
270function patternRegExp(pattern) {
271 if (isRegExp(pattern)) {
272 return pattern;
273 }
274
275 var source = pattern
276 .replace(/([.+?^=!:${}()|\[\]\/\\])/g, '\\$1')
277 .replace(/\*/g, '.*');
278
279 return new RegExp('^' + source + '$');
280}
281
282function _cb(err) {
283 if (err) {
284 throw err;
285 }
286}
287
288function _noop() {}