1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 | var _ = require('lodash');
|
7 | var timers = require('timers');
|
8 | var EventEmitter = require('events').EventEmitter;
|
9 | var tv4 = require('tv4');
|
10 |
|
11 | var properties = require('./properties');
|
12 | var actors = require('./actors');
|
13 | var discovery = require('./discovery');
|
14 | var logger = require('./logger')('hubiquitus:core:container');
|
15 | var schemas = require('./schemas');
|
16 | var utils = {
|
17 | aid: require('./utils/aid'),
|
18 | ip: require('./utils/ip'),
|
19 | uuid: require('./utils/uuid')
|
20 | };
|
21 |
|
22 | exports.__proto__ = new EventEmitter();
|
23 | exports.setMaxListeners(0);
|
24 |
|
25 |
|
26 |
|
27 |
|
28 | var started = false;
|
29 |
|
30 |
|
31 |
|
32 |
|
33 | var locked = false;
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 | var startingQueue = [];
|
40 |
|
41 |
|
42 |
|
43 |
|
44 | var middlewares = [];
|
45 |
|
46 |
|
47 |
|
48 |
|
49 |
|
50 | var events = new EventEmitter();
|
51 | events.setMaxListeners(0);
|
52 |
|
53 |
|
54 |
|
55 |
|
56 | var adapters = {
|
57 | inproc: require('./adapters/inproc'),
|
58 | remote: require('./adapters/remote')
|
59 | };
|
60 |
|
61 |
|
62 |
|
63 |
|
64 | const msgType = {
|
65 | REQ_OUT: 'req_out',
|
66 | REQ_IN: 'req_in',
|
67 | RES_OUT: 'res_out',
|
68 | RES_IN: 'res_in'
|
69 | };
|
70 |
|
71 |
|
72 |
|
73 |
|
74 | actors.on('actor added', function (aid, scope) {
|
75 | events.emit(aid + '!found', aid);
|
76 | events.emit(utils.aid.bare(aid) + '!found', aid);
|
77 | exports.emit('actor added', aid, scope);
|
78 | });
|
79 |
|
80 | actors.on('actor removed', function (aid, scope) {
|
81 | exports.emit('actor removed', aid, scope);
|
82 | });
|
83 |
|
84 | adapters.inproc.on('req', onReq);
|
85 |
|
86 | adapters.remote.on('req', onReq);
|
87 |
|
88 | adapters.inproc.on('res', function (res) {
|
89 | events.emit('res|' + res.id, res);
|
90 | });
|
91 |
|
92 | adapters.remote.on('res', function (res) {
|
93 | events.emit('res|' + res.id, res);
|
94 | });
|
95 |
|
96 | adapters.remote.on('drop', function (req) {
|
97 | events.emit('drop|' + req.id, req);
|
98 | });
|
99 |
|
100 |
|
101 |
|
102 |
|
103 |
|
104 |
|
105 |
|
106 |
|
107 | exports.start = function (params, cb) {
|
108 | if (locked || started) {
|
109 | var msg = locked ? 'busy' : 'already started';
|
110 | logger.makeLog('warn', 'hub-17', 'attempt to start container while ' + msg + ' !');
|
111 | return this;
|
112 | }
|
113 |
|
114 | if (_.isFunction(params)) {
|
115 | cb = params;
|
116 | params = null;
|
117 | }
|
118 |
|
119 | locked = true;
|
120 | logger.makeLog('trace', 'hub-16', 'starting container...');
|
121 |
|
122 | params = params || {};
|
123 | if (!tv4.validate(params, schemas.startParams)) {
|
124 | var err = logger.makeLog('warn', 'hub-43', 'attempt to start container using invalid params', null, tv4.error);
|
125 | cb && cb({code: 'TECHERR', cause: err});
|
126 | return this;
|
127 | }
|
128 |
|
129 | if (params.ip) properties.netInfo.ip = params.ip;
|
130 |
|
131 | adapters.remote.start(function () {
|
132 | var discoveryParams = {addr: params.discoveryAddr, port: params.discoveryPort};
|
133 | discovery.start(discoveryParams, function () {
|
134 | started = true;
|
135 | locked = false;
|
136 | logger.makeLog('info', 'hub-18', 'container started !');
|
137 | _.isFunction(cb) && setImmediate(cb);
|
138 | processStartingQueue();
|
139 | });
|
140 | });
|
141 |
|
142 | return this;
|
143 | };
|
144 |
|
145 |
|
146 |
|
147 |
|
148 |
|
149 |
|
150 | exports.stop = function (cb) {
|
151 | if (locked || !started) {
|
152 | var msg = locked ? 'busy' : 'already stopped';
|
153 | logger.makeLog('warn', 'hub-37', 'attempt to stop container while ' + msg + ' !');
|
154 | return this;
|
155 | }
|
156 |
|
157 | locked = true;
|
158 | logger.makeLog('trace', 'hub-52', 'stopping container...');
|
159 | discovery.stop();
|
160 | adapters.remote.stop(function () {
|
161 | logger.makeLog('info', 'hub-36', 'container stopped !');
|
162 | started = false;
|
163 | locked = false;
|
164 | _.isFunction(cb) && setImmediate(cb);
|
165 | });
|
166 |
|
167 | return this;
|
168 | };
|
169 |
|
170 |
|
171 |
|
172 |
|
173 |
|
174 |
|
175 |
|
176 |
|
177 |
|
178 |
|
179 |
|
180 | exports.send = function (from, to, content, timeout, cb, headers) {
|
181 | if (_.isFunction(timeout)) {
|
182 | headers = cb;
|
183 | cb = timeout;
|
184 | timeout = properties.sendDefaultTimeout;
|
185 | } else if (_.isObject(timeout)) {
|
186 | headers = timeout;
|
187 | timeout = null;
|
188 | } else if (!_.isFunction(cb)) {
|
189 | headers = cb;
|
190 | cb = null;
|
191 | }
|
192 |
|
193 | if (started) {
|
194 | var req = {from: from, to: to, content: content, id: utils.uuid(), date: Date.now(), headers: headers || {}};
|
195 | req.timeout = timeout || properties.sendMaxTimeout;
|
196 | if (cb) req.cb = true;
|
197 | if (!tv4.validate(req, schemas.message) || (cb && !_.isFunction(cb))) {
|
198 | var err = logger.makeLog('warn', 'hub-29', 'attempt to send an invalid request', null, {req: req, cause: tv4.error});
|
199 | cb && cb({code: 'TECHERR', cause: err});
|
200 | return this;
|
201 | }
|
202 |
|
203 | processMiddlewares(msgType.REQ_OUT, req, cb, function () {
|
204 | if (cb) {
|
205 | events.once('res|' + req.id, function (res) {
|
206 | onRes(res, cb);
|
207 | });
|
208 | events.on('drop|' + req.id, function () {
|
209 | onDrop(req);
|
210 | });
|
211 | setTimeout(function () {
|
212 | events.emit('res|' + req.id, {err: {code: 'TIMEOUT'}, id: req.id});
|
213 | events.removeAllListeners('drop|' + req.id);
|
214 | }, req.timeout);
|
215 | }
|
216 | internalSend(req);
|
217 | });
|
218 | } else {
|
219 | logger.makeLog('trace', 'hub-46', 'container not started : queueing request');
|
220 | startingQueue.push({from: from, to: to, content: content, timeout: timeout, cb: cb, headers: headers});
|
221 | }
|
222 | return this;
|
223 | };
|
224 |
|
225 |
|
226 |
|
227 |
|
228 |
|
229 | function internalSend(req) {
|
230 | searchActor(req.to, req.timeout, function (aid) {
|
231 | if (Date.now() < (req.date + req.timeout)) {
|
232 | var actor = actors.get(aid);
|
233 | if (!actor) return onDrop(req);
|
234 | req.to = aid;
|
235 | if (actor.scope === actors.scope.PROCESS) {
|
236 | logger.makeLog('trace', 'hub-2', 'sending request inproc...', {req: req});
|
237 | adapters.inproc.send(req);
|
238 | } else if (actor.scope === actors.scope.LOCAL) {
|
239 | logger.makeLog('trace', 'hub-15', 'sending request to another container ipc...', {req: req});
|
240 | adapters.remote.send(actor.container, req);
|
241 | } else if (actor.scope === actors.scope.REMOTE) {
|
242 | logger.makeLog('trace', 'hub-48', 'sending request to another container...', {req: req});
|
243 | adapters.remote.send(actor.container, req);
|
244 | }
|
245 | }
|
246 | });
|
247 | }
|
248 |
|
249 |
|
250 |
|
251 |
|
252 |
|
253 |
|
254 | function onReq(req, reply) {
|
255 | logger.makeLog('trace', 'hub-3', 'processing request', {req: req});
|
256 | var actor = actors.get(req.to, actors.scope.PROCESS);
|
257 |
|
258 | var inMiddleware;
|
259 | var replyWrapper = function (err, content) {
|
260 | var res = {from: actor.id, to: req.from, err: err, content: content, date: req.date, id: req.id, headers: req.headers};
|
261 | logger.makeLog('trace', 'hub-53', 'sending response...', {res: res});
|
262 | if (inMiddleware) {
|
263 | reply(res);
|
264 | } else {
|
265 | processMiddlewares(msgType.RES_OUT, res, null, function () {
|
266 | reply(res);
|
267 | });
|
268 | }
|
269 | };
|
270 |
|
271 | inMiddleware = true;
|
272 | processMiddlewares(msgType.REQ_IN, req, replyWrapper, function () {
|
273 | inMiddleware = false;
|
274 | setImmediate(function () {
|
275 | try {
|
276 | req.reply = replyWrapper;
|
277 | actor.onMessage(req);
|
278 | } catch (err) {
|
279 | logger.makeLog('warn', 'hub-30', 'request processing error', {req: req, err: err});
|
280 | }
|
281 | });
|
282 | });
|
283 | }
|
284 |
|
285 |
|
286 |
|
287 |
|
288 |
|
289 |
|
290 | function onRes(res, cb) {
|
291 | logger.makeLog('trace', 'hub-25', 'processing response', {res: res});
|
292 | processMiddlewares(msgType.RES_IN, res, null, function () {
|
293 | setImmediate(function () {
|
294 | try {
|
295 | cb && cb(res.err, res);
|
296 | } catch (err) {
|
297 | logger.makeLog('warn', 'hub-31', 'response processing error', {res: res, err: err});
|
298 | }
|
299 | });
|
300 | });
|
301 | }
|
302 |
|
303 |
|
304 |
|
305 |
|
306 |
|
307 | function onDrop(req) {
|
308 | logger.makeLog('trace', 'hub-26', 'request ' + req.id + ' dropped', {req: req});
|
309 | if (Date.now() < (req.date + req.timeout)) {
|
310 | setTimeout(function () {
|
311 | logger.makeLog('trace', 'hub-27', 'resending request ' + req.id, {req: req});
|
312 | internalSend(req);
|
313 | }, properties.sendRetryDelay);
|
314 | } else {
|
315 | logger.makeLog('trace', 'hub-28', 'timeout reached, ' + req.id + ' definitely dropped', {req: req});
|
316 | }
|
317 | }
|
318 |
|
319 |
|
320 |
|
321 |
|
322 | function processStartingQueue() {
|
323 | logger.makeLog('trace', 'hub-19', 'processing starting queue (' + startingQueue.length + ' items)');
|
324 | _.forEach(startingQueue, function (req) {
|
325 | setImmediate(function () {
|
326 | exports.send(req.from, req.to, req.content, req.timeout, req.cb, req.headers);
|
327 | });
|
328 | });
|
329 | startingQueue = [];
|
330 | }
|
331 |
|
332 |
|
333 |
|
334 |
|
335 |
|
336 | exports.use = function (fn) {
|
337 | _.isFunction(fn) && middlewares.push(fn);
|
338 | return this;
|
339 | };
|
340 |
|
341 |
|
342 |
|
343 |
|
344 |
|
345 |
|
346 |
|
347 |
|
348 | function processMiddlewares(type, msg, reply, cb) {
|
349 | var index = 0;
|
350 | var count = middlewares.length;
|
351 | msg.reply = reply;
|
352 | (function next() {
|
353 | if (index < count) {
|
354 | setImmediate(function () {
|
355 | try {
|
356 | middlewares[index++](type, msg, next);
|
357 | } catch (err) {
|
358 | logger.makeLog('warn', 'hub-10', 'middleware processing error', {err: err});
|
359 | }
|
360 | });
|
361 | } else {
|
362 | delete msg.reply;
|
363 | cb && cb();
|
364 | }
|
365 | })();
|
366 | }
|
367 |
|
368 |
|
369 |
|
370 |
|
371 |
|
372 |
|
373 |
|
374 |
|
375 | exports.addActor = function (aid, onMessage, scope) {
|
376 | if (!utils.aid.isValid(aid)) {
|
377 | logger.makeLog('warn', 'hub-1', 'attempt to add an actor using an invalid id !', aid);
|
378 | return this;
|
379 | }
|
380 |
|
381 | var actor = scope || {};
|
382 | actor.id = aid;
|
383 | actor.container = {id: properties.ID, name: properties.name, netInfo: properties.netInfo};
|
384 | actor.onMessage = onMessage.bind(actor);
|
385 | actor.send = function (to, content, timeout, cb) {
|
386 | exports.send(aid, to, content, timeout, cb);
|
387 | };
|
388 | actors.add(actor, actors.scope.PROCESS);
|
389 | return this;
|
390 | };
|
391 |
|
392 |
|
393 |
|
394 |
|
395 |
|
396 |
|
397 | exports.removeActor = function (aid) {
|
398 | if (!utils.aid.isValid(aid)) {
|
399 | logger.makeLog('warn', 'hub-4', 'attempt to remove an actor using an invalid aid !', aid);
|
400 | return this;
|
401 | }
|
402 |
|
403 | actors.remove(aid, actors.scope.PROCESS);
|
404 | return this;
|
405 | };
|
406 |
|
407 |
|
408 |
|
409 |
|
410 |
|
411 |
|
412 |
|
413 | function searchActor(aid, timeout, cb) {
|
414 | logger.makeLog('trace', 'hub-20', 'searching actor ' + aid + '...');
|
415 |
|
416 |
|
417 | var remindDiscovery;
|
418 |
|
419 |
|
420 | var searchTimeout;
|
421 |
|
422 |
|
423 | function onFound(resAid) {
|
424 | logger.makeLog('trace', 'hub-42', 'actor ' + resAid + ' found !');
|
425 | remindDiscovery && clearInterval(remindDiscovery);
|
426 | searchTimeout && clearTimeout(searchTimeout);
|
427 | cb && cb(resAid);
|
428 | }
|
429 |
|
430 | events.once(aid + '!found', onFound);
|
431 |
|
432 | searchTimeout = setTimeout(function () {
|
433 | logger.makeLog('trace', 'hub-50', 'req needing ' + aid + ' timeout !');
|
434 | remindDiscovery && clearInterval(remindDiscovery);
|
435 | events.removeListener(aid + '!found', onFound);
|
436 | }, timeout);
|
437 |
|
438 |
|
439 | discovery.notifySearched(aid);
|
440 |
|
441 |
|
442 | var cacheAid = actors.pick(aid);
|
443 | if (cacheAid) {
|
444 | events.emit(aid + '!found', cacheAid);
|
445 | } else {
|
446 | remindDiscovery = setInterval(function () {
|
447 | discovery.notifySearched(aid);
|
448 | }, properties.discoveryRemind);
|
449 | }
|
450 | }
|
451 |
|
452 |
|
453 |
|
454 |
|
455 |
|
456 |
|
457 |
|
458 | exports.set = function (key, value) {
|
459 | if (!_.isString(key)) {
|
460 | return this;
|
461 | }
|
462 |
|
463 | switch (key) {
|
464 | case 'discoveryAddrs':
|
465 | discovery.setDiscoveryAddrs(value);
|
466 | break;
|
467 | default:
|
468 | properties[key] = value;
|
469 | }
|
470 |
|
471 | return this;
|
472 | };
|