1 | var Pool = require('./Pool');
|
2 | var PoolConfig = require('./PoolConfig');
|
3 | var PoolNamespace = require('./PoolNamespace');
|
4 | var PoolSelector = require('./PoolSelector');
|
5 | var Util = require('util');
|
6 | var EventEmitter = require('events').EventEmitter;
|
7 |
|
8 | module.exports = PoolCluster;
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 | function PoolCluster(config) {
|
17 | EventEmitter.call(this);
|
18 |
|
19 | config = config || {};
|
20 | this._canRetry = typeof config.canRetry === 'undefined' ? true : config.canRetry;
|
21 | this._defaultSelector = config.defaultSelector || 'RR';
|
22 | this._removeNodeErrorCount = config.removeNodeErrorCount || 5;
|
23 | this._restoreNodeTimeout = config.restoreNodeTimeout || 0;
|
24 |
|
25 | this._closed = false;
|
26 | this._findCaches = Object.create(null);
|
27 | this._lastId = 0;
|
28 | this._namespaces = Object.create(null);
|
29 | this._nodes = Object.create(null);
|
30 | }
|
31 |
|
32 | Util.inherits(PoolCluster, EventEmitter);
|
33 |
|
34 | PoolCluster.prototype.add = function add(id, config) {
|
35 | if (this._closed) {
|
36 | throw new Error('PoolCluster is closed.');
|
37 | }
|
38 |
|
39 | var nodeId = typeof id === 'object'
|
40 | ? 'CLUSTER::' + (++this._lastId)
|
41 | : String(id);
|
42 |
|
43 | if (this._nodes[nodeId] !== undefined) {
|
44 | throw new Error('Node ID "' + nodeId + '" is already defined in PoolCluster.');
|
45 | }
|
46 |
|
47 | var poolConfig = typeof id !== 'object'
|
48 | ? new PoolConfig(config)
|
49 | : new PoolConfig(id);
|
50 |
|
51 | this._nodes[nodeId] = {
|
52 | id : nodeId,
|
53 | errorCount : 0,
|
54 | pool : new Pool({config: poolConfig}),
|
55 | _offlineUntil : 0
|
56 | };
|
57 |
|
58 | this._clearFindCaches();
|
59 | };
|
60 |
|
61 | PoolCluster.prototype.end = function end(callback) {
|
62 | var cb = callback !== undefined
|
63 | ? callback
|
64 | : _cb;
|
65 |
|
66 | if (typeof cb !== 'function') {
|
67 | throw TypeError('callback argument must be a function');
|
68 | }
|
69 |
|
70 | if (this._closed) {
|
71 | process.nextTick(cb);
|
72 | return;
|
73 | }
|
74 |
|
75 | this._closed = true;
|
76 |
|
77 | var calledBack = false;
|
78 | var nodeIds = Object.keys(this._nodes);
|
79 | var waitingClose = 0;
|
80 |
|
81 | function onEnd(err) {
|
82 | if (!calledBack && (err || --waitingClose <= 0)) {
|
83 | calledBack = true;
|
84 | cb(err);
|
85 | }
|
86 | }
|
87 |
|
88 | for (var i = 0; i < nodeIds.length; i++) {
|
89 | var nodeId = nodeIds[i];
|
90 | var node = this._nodes[nodeId];
|
91 |
|
92 | waitingClose++;
|
93 | node.pool.end(onEnd);
|
94 | }
|
95 |
|
96 | if (waitingClose === 0) {
|
97 | process.nextTick(onEnd);
|
98 | }
|
99 | };
|
100 |
|
101 | PoolCluster.prototype.of = function(pattern, selector) {
|
102 | pattern = pattern || '*';
|
103 |
|
104 | selector = selector || this._defaultSelector;
|
105 | selector = selector.toUpperCase();
|
106 | if (typeof PoolSelector[selector] === 'undefined') {
|
107 | selector = this._defaultSelector;
|
108 | }
|
109 |
|
110 | var key = pattern + selector;
|
111 |
|
112 | if (typeof this._namespaces[key] === 'undefined') {
|
113 | this._namespaces[key] = new PoolNamespace(this, pattern, selector);
|
114 | }
|
115 |
|
116 | return this._namespaces[key];
|
117 | };
|
118 |
|
119 | PoolCluster.prototype.remove = function remove(pattern) {
|
120 | var foundNodeIds = this._findNodeIds(pattern, true);
|
121 |
|
122 | for (var i = 0; i < foundNodeIds.length; i++) {
|
123 | var node = this._getNode(foundNodeIds[i]);
|
124 |
|
125 | if (node) {
|
126 | this._removeNode(node);
|
127 | }
|
128 | }
|
129 | };
|
130 |
|
131 | PoolCluster.prototype.getConnection = function(pattern, selector, cb) {
|
132 | var namespace;
|
133 | if (typeof pattern === 'function') {
|
134 | cb = pattern;
|
135 | namespace = this.of();
|
136 | } else {
|
137 | if (typeof selector === 'function') {
|
138 | cb = selector;
|
139 | selector = this._defaultSelector;
|
140 | }
|
141 |
|
142 | namespace = this.of(pattern, selector);
|
143 | }
|
144 |
|
145 | namespace.getConnection(cb);
|
146 | };
|
147 |
|
148 | PoolCluster.prototype._clearFindCaches = function _clearFindCaches() {
|
149 | this._findCaches = Object.create(null);
|
150 | };
|
151 |
|
152 | PoolCluster.prototype._decreaseErrorCount = function _decreaseErrorCount(node) {
|
153 | var errorCount = node.errorCount;
|
154 |
|
155 | if (errorCount > this._removeNodeErrorCount) {
|
156 | errorCount = this._removeNodeErrorCount;
|
157 | }
|
158 |
|
159 | if (errorCount < 1) {
|
160 | errorCount = 1;
|
161 | }
|
162 |
|
163 | node.errorCount = errorCount - 1;
|
164 |
|
165 | if (node._offlineUntil) {
|
166 | node._offlineUntil = 0;
|
167 | this.emit('online', node.id);
|
168 | }
|
169 | };
|
170 |
|
171 | PoolCluster.prototype._findNodeIds = function _findNodeIds(pattern, includeOffline) {
|
172 | var currentTime = 0;
|
173 | var foundNodeIds = this._findCaches[pattern];
|
174 |
|
175 | if (foundNodeIds === undefined) {
|
176 | var expression = patternRegExp(pattern);
|
177 | var nodeIds = Object.keys(this._nodes);
|
178 |
|
179 | foundNodeIds = nodeIds.filter(function (id) {
|
180 | return id.match(expression);
|
181 | });
|
182 |
|
183 | this._findCaches[pattern] = foundNodeIds;
|
184 | }
|
185 |
|
186 | if (includeOffline) {
|
187 | return foundNodeIds;
|
188 | }
|
189 |
|
190 | return foundNodeIds.filter(function (nodeId) {
|
191 | var node = this._getNode(nodeId);
|
192 |
|
193 | if (!node._offlineUntil) {
|
194 | return true;
|
195 | }
|
196 |
|
197 | if (!currentTime) {
|
198 | currentTime = getMonotonicMilliseconds();
|
199 | }
|
200 |
|
201 | return node._offlineUntil <= currentTime;
|
202 | }, this);
|
203 | };
|
204 |
|
205 | PoolCluster.prototype._getNode = function _getNode(id) {
|
206 | return this._nodes[id] || null;
|
207 | };
|
208 |
|
209 | PoolCluster.prototype._increaseErrorCount = function _increaseErrorCount(node) {
|
210 | var errorCount = ++node.errorCount;
|
211 |
|
212 | if (this._removeNodeErrorCount > errorCount) {
|
213 | return;
|
214 | }
|
215 |
|
216 | if (this._restoreNodeTimeout > 0) {
|
217 | node._offlineUntil = getMonotonicMilliseconds() + this._restoreNodeTimeout;
|
218 | this.emit('offline', node.id);
|
219 | return;
|
220 | }
|
221 |
|
222 | this._removeNode(node);
|
223 | this.emit('remove', node.id);
|
224 | };
|
225 |
|
226 | PoolCluster.prototype._getConnection = function(node, cb) {
|
227 | var self = this;
|
228 |
|
229 | node.pool.getConnection(function (err, connection) {
|
230 | if (err) {
|
231 | self._increaseErrorCount(node);
|
232 | cb(err);
|
233 | return;
|
234 | } else {
|
235 | self._decreaseErrorCount(node);
|
236 | }
|
237 |
|
238 | connection._clusterId = node.id;
|
239 |
|
240 | cb(null, connection);
|
241 | });
|
242 | };
|
243 |
|
244 | PoolCluster.prototype._removeNode = function _removeNode(node) {
|
245 | delete this._nodes[node.id];
|
246 |
|
247 | this._clearFindCaches();
|
248 |
|
249 | node.pool.end(_noop);
|
250 | };
|
251 |
|
252 | function getMonotonicMilliseconds() {
|
253 | var ms;
|
254 |
|
255 | if (typeof process.hrtime === 'function') {
|
256 | ms = process.hrtime();
|
257 | ms = ms[0] * 1e3 + ms[1] * 1e-6;
|
258 | } else {
|
259 | ms = process.uptime() * 1000;
|
260 | }
|
261 |
|
262 | return Math.floor(ms);
|
263 | }
|
264 |
|
265 | function isRegExp(val) {
|
266 | return typeof val === 'object'
|
267 | && Object.prototype.toString.call(val) === '[object RegExp]';
|
268 | }
|
269 |
|
270 | function patternRegExp(pattern) {
|
271 | if (isRegExp(pattern)) {
|
272 | return pattern;
|
273 | }
|
274 |
|
275 | var source = pattern
|
276 | .replace(/([.+?^=!:${}()|\[\]\/\\])/g, '\\$1')
|
277 | .replace(/\*/g, '.*');
|
278 |
|
279 | return new RegExp('^' + source + '$');
|
280 | }
|
281 |
|
282 | function _cb(err) {
|
283 | if (err) {
|
284 | throw err;
|
285 | }
|
286 | }
|
287 |
|
288 | function _noop() {}
|