UNPKG

12.7 kBJavaScriptView Raw
1/**
2 * @module hubiquitus
3 * Actors container
4 */
5
6var _ = require('lodash');
7var timers = require('timers');
8var EventEmitter = require('events').EventEmitter;
9var tv4 = require('tv4');
10
11var properties = require('./properties');
12var actors = require('./actors');
13var discovery = require('./discovery');
14var logger = require('./logger')('hubiquitus:core:container');
15var schemas = require('./schemas');
16var utils = {
17 aid: require('./utils/aid'),
18 ip: require('./utils/ip'),
19 uuid: require('./utils/uuid')
20};
21
22exports.__proto__ = new EventEmitter();
23exports.setMaxListeners(0);
24
25/**
26 * @type {boolean}
27 */
28var started = false;
29
30/**
31 * @type {boolean}
32 */
33var locked = false;
34
35/**
36 * @type {Array}
37 * Starting queue holds requests sent before the container was started.
38 */
39var startingQueue = [];
40
41/**
42 * @type {Array}
43 */
44var middlewares = [];
45
46/**
47 * @type {EventEmitter}
48 * Internal event emitter
49 */
50var events = new EventEmitter();
51events.setMaxListeners(0);
52
53/**
54 * @type {object}
55 */
56var adapters = {
57 inproc: require('./adapters/inproc'),
58 remote: require('./adapters/remote')
59};
60
61/**
62 * @enum {string}
63 */
64const msgType = {
65 REQ_OUT: 'req_out',
66 REQ_IN: 'req_in',
67 RES_OUT: 'res_out',
68 RES_IN: 'res_in'
69};
70
71/**
72 * Listeners setup
73 */
74actors.on('actor added', function (aid, scope) {
75 events.emit(aid + '!found', aid);
76 events.emit(utils.aid.bare(aid) + '!found', aid);
77 exports.emit('actor added', aid, scope);
78});
79
80actors.on('actor removed', function (aid, scope) {
81 exports.emit('actor removed', aid, scope);
82});
83
84adapters.inproc.on('req', onReq);
85
86adapters.remote.on('req', onReq);
87
88adapters.inproc.on('res', function (res) {
89 events.emit('res|' + res.id, res);
90});
91
92adapters.remote.on('res', function (res) {
93 events.emit('res|' + res.id, res);
94});
95
96adapters.remote.on('drop', function (req) {
97 events.emit('drop|' + req.id, req);
98});
99
100/**
101 * Starts container
102 * Can be called at any time : requests are queued until the container starts
103 * @param params {object|function} parameters - callback if function
104 * @param cb {function} callback
105 * @returns {object} module reference
106 */
107exports.start = function (params, cb) {
108 if (locked || started) {
109 var msg = locked ? 'busy' : 'already started';
110 logger.makeLog('warn', 'hub-17', 'attempt to start container while ' + msg + ' !');
111 return this;
112 }
113
114 if (_.isFunction(params)) {
115 cb = params;
116 params = null;
117 }
118
119 locked = true;
120 logger.makeLog('trace', 'hub-16', 'starting container...');
121
122 params = params || {};
123 if (!tv4.validate(params, schemas.startParams)) {
124 var err = logger.makeLog('warn', 'hub-43', 'attempt to start container using invalid params', null, tv4.error);
125 cb && cb({code: 'TECHERR', cause: err});
126 return this;
127 }
128
129 if (params.ip) properties.netInfo.ip = params.ip;
130
131 adapters.remote.start(function () {
132 var discoveryParams = {addr: params.discoveryAddr, port: params.discoveryPort};
133 discovery.start(discoveryParams, function () {
134 started = true;
135 locked = false;
136 logger.makeLog('info', 'hub-18', 'container started !');
137 _.isFunction(cb) && setImmediate(cb);
138 processStartingQueue();
139 });
140 });
141
142 return this;
143};
144
145/**
146 * Stops container
147 * @param cb {function} callback
148 * @returns {object} module reference
149 */
150exports.stop = function (cb) {
151 if (locked || !started) {
152 var msg = locked ? 'busy' : 'already stopped';
153 logger.makeLog('warn', 'hub-37', 'attempt to stop container while ' + msg + ' !');
154 return this;
155 }
156
157 locked = true;
158 logger.makeLog('trace', 'hub-52', 'stopping container...');
159 discovery.stop();
160 adapters.remote.stop(function () {
161 logger.makeLog('info', 'hub-36', 'container stopped !');
162 started = false;
163 locked = false;
164 _.isFunction(cb) && setImmediate(cb);
165 });
166
167 return this;
168};
169
170/**
171 * Sends a request
172 * @param from {string} sender aid
173 * @param to {string} receiver aid
174 * @param [content] {object} request
175 * @param [timeout] {number|function|object} timeout - callback if function
176 * @param [cb] {function|object} callback
177 * @param [headers] {object} headers
178 * @returns {object} module reference
179 */
180exports.send = function (from, to, content, timeout, cb, headers) {
181 if (_.isFunction(timeout)) {
182 headers = cb;
183 cb = timeout;
184 timeout = properties.sendDefaultTimeout;
185 } else if (_.isObject(timeout)) {
186 headers = timeout;
187 timeout = null;
188 } else if (!_.isFunction(cb)) {
189 headers = cb;
190 cb = null;
191 }
192
193 if (started) {
194 var req = {from: from, to: to, content: content, id: utils.uuid(), date: Date.now(), headers: headers || {}};
195 req.timeout = timeout || properties.sendMaxTimeout;
196 if (cb) req.cb = true;
197 if (!tv4.validate(req, schemas.message) || (cb && !_.isFunction(cb))) {
198 var err = logger.makeLog('warn', 'hub-29', 'attempt to send an invalid request', null, {req: req, cause: tv4.error});
199 cb && cb({code: 'TECHERR', cause: err});
200 return this;
201 }
202
203 processMiddlewares(msgType.REQ_OUT, req, cb, function () {
204 if (cb) {
205 events.once('res|' + req.id, function (res) {
206 onRes(res, cb);
207 });
208 events.on('drop|' + req.id, function () {
209 onDrop(req); // do not use the 'req' from the arguments to keep the original 'to'
210 });
211 setTimeout(function () {
212 events.emit('res|' + req.id, {err: {code: 'TIMEOUT'}, id: req.id});
213 events.removeAllListeners('drop|' + req.id);
214 }, req.timeout);
215 }
216 internalSend(req);
217 });
218 } else {
219 logger.makeLog('trace', 'hub-46', 'container not started : queueing request');
220 startingQueue.push({from: from, to: to, content: content, timeout: timeout, cb: cb, headers: headers});
221 }
222 return this;
223};
224
225/**
226 * Internal send : find actor & send request
227 * @param req {object} formated request to be sent
228 */
229function internalSend(req) {
230 searchActor(req.to, req.timeout, function (aid) {
231 if (Date.now() < (req.date + req.timeout)) {
232 var actor = actors.get(aid);
233 if (!actor) return onDrop(req);
234 req.to = aid;
235 if (actor.scope === actors.scope.PROCESS) {
236 logger.makeLog('trace', 'hub-2', 'sending request inproc...', {req: req});
237 adapters.inproc.send(req);
238 } else if (actor.scope === actors.scope.LOCAL) {
239 logger.makeLog('trace', 'hub-15', 'sending request to another container ipc...', {req: req});
240 adapters.remote.send(actor.container, req);
241 } else if (actor.scope === actors.scope.REMOTE) {
242 logger.makeLog('trace', 'hub-48', 'sending request to another container...', {req: req});
243 adapters.remote.send(actor.container, req);
244 }
245 }
246 });
247}
248
249/**
250 * Incomming request processing
251 * @param req {object} request (hMessage)
252 * @param reply {function}
253 */
254function onReq(req, reply) {
255 logger.makeLog('trace', 'hub-3', 'processing request', {req: req});
256 var actor = actors.get(req.to, actors.scope.PROCESS);
257
258 var inMiddleware;
259 var replyWrapper = function (err, content) {
260 var res = {from: actor.id, to: req.from, err: err, content: content, date: req.date, id: req.id, headers: req.headers};
261 logger.makeLog('trace', 'hub-53', 'sending response...', {res: res});
262 if (inMiddleware) {
263 reply(res);
264 } else {
265 processMiddlewares(msgType.RES_OUT, res, null, function () {
266 reply(res);
267 });
268 }
269 };
270
271 inMiddleware = true;
272 processMiddlewares(msgType.REQ_IN, req, replyWrapper, function () {
273 inMiddleware = false;
274 setImmediate(function () {
275 try {
276 req.reply = replyWrapper;
277 actor.onMessage(req);
278 } catch (err) {
279 logger.makeLog('warn', 'hub-30', 'request processing error', {req: req, err: err});
280 }
281 });
282 });
283}
284
285/**
286 * Incomming response processing
287 * @param res {object} formated response
288 * @param cb {function} original send callback
289 */
290function onRes(res, cb) {
291 logger.makeLog('trace', 'hub-25', 'processing response', {res: res});
292 processMiddlewares(msgType.RES_IN, res, null, function () {
293 setImmediate(function () {
294 try {
295 cb && cb(res.err, res);
296 } catch (err) {
297 logger.makeLog('warn', 'hub-31', 'response processing error', {res: res, err: err});
298 }
299 });
300 });
301}
302
303/**
304 * Message drop handler
305 * @param req {object} request to be processed
306 */
307function onDrop(req) {
308 logger.makeLog('trace', 'hub-26', 'request ' + req.id + ' dropped', {req: req});
309 if (Date.now() < (req.date + req.timeout)) {
310 setTimeout(function () {
311 logger.makeLog('trace', 'hub-27', 'resending request ' + req.id, {req: req});
312 internalSend(req);
313 }, properties.sendRetryDelay);
314 } else {
315 logger.makeLog('trace', 'hub-28', 'timeout reached, ' + req.id + ' definitely dropped', {req: req});
316 }
317}
318
319/**
320 * Sends starting queue requests
321 */
322function processStartingQueue() {
323 logger.makeLog('trace', 'hub-19', 'processing starting queue (' + startingQueue.length + ' items)');
324 _.forEach(startingQueue, function (req) {
325 setImmediate(function () {
326 exports.send(req.from, req.to, req.content, req.timeout, req.cb, req.headers);
327 });
328 });
329 startingQueue = [];
330}
331
332/**
333 * Declare a middleware
334 * @param fn {function} middleware
335 */
336exports.use = function (fn) {
337 _.isFunction(fn) && middlewares.push(fn);
338 return this;
339};
340
341/**
342 * Process middlewares
343 * @param msg {object} request to pass through the middlewares
344 * @param type {string} request type
345 * @param reply {function} reply function
346 * @param cb {function} callback
347 */
348function processMiddlewares(type, msg, reply, cb) {
349 var index = 0;
350 var count = middlewares.length;
351 msg.reply = reply;
352 (function next() {
353 if (index < count) {
354 setImmediate(function () {
355 try {
356 middlewares[index++](type, msg, next);
357 } catch (err) {
358 logger.makeLog('warn', 'hub-10', 'middleware processing error', {err: err});
359 }
360 });
361 } else {
362 delete msg.reply;
363 cb && cb();
364 }
365 })();
366}
367
368/**
369 * Adds an actor to the container
370 * @param {string} aid
371 * @param onMessage {function} actor handler
372 * @param [scope] {object} scope
373 * @returns {object} module reference
374 */
375exports.addActor = function (aid, onMessage, scope) {
376 if (!utils.aid.isValid(aid)) {
377 logger.makeLog('warn', 'hub-1', 'attempt to add an actor using an invalid id !', aid);
378 return this;
379 }
380
381 var actor = scope || {};
382 actor.id = aid;
383 actor.container = {id: properties.ID, name: properties.name, netInfo: properties.netInfo};
384 actor.onMessage = onMessage.bind(actor);
385 actor.send = function (to, content, timeout, cb) {
386 exports.send(aid, to, content, timeout, cb);
387 };
388 actors.add(actor, actors.scope.PROCESS);
389 return this;
390};
391
392/**
393 * Removes an actor
394 * @param {string} aid
395 * @returns {object} module reference
396 */
397exports.removeActor = function (aid) {
398 if (!utils.aid.isValid(aid)) {
399 logger.makeLog('warn', 'hub-4', 'attempt to remove an actor using an invalid aid !', aid);
400 return this;
401 }
402
403 actors.remove(aid, actors.scope.PROCESS);
404 return this;
405};
406
407/**
408 * Search for an actor
409 * @param aid {string}
410 * @param {number} timeout
411 * @param cb {function}
412 */
413function searchActor(aid, timeout, cb) {
414 logger.makeLog('trace', 'hub-20', 'searching actor ' + aid + '...');
415
416 // loop to remind discovery of actor research
417 var remindDiscovery;
418
419 // scheduled task to cancel reminder & research at request timeout
420 var searchTimeout;
421
422 // when actor is found, we stop the reminder and the timeout task
423 function onFound(resAid) {
424 logger.makeLog('trace', 'hub-42', 'actor ' + resAid + ' found !');
425 remindDiscovery && clearInterval(remindDiscovery);
426 searchTimeout && clearTimeout(searchTimeout);
427 cb && cb(resAid);
428 }
429
430 events.once(aid + '!found', onFound);
431
432 searchTimeout = setTimeout(function () {
433 logger.makeLog('trace', 'hub-50', 'req needing ' + aid + ' timeout !');
434 remindDiscovery && clearInterval(remindDiscovery);
435 events.removeListener(aid + '!found', onFound);
436 }, timeout);
437
438 // notify discovery
439 discovery.notifySearched(aid);
440
441 // search in cache
442 var cacheAid = actors.pick(aid);
443 if (cacheAid) {
444 events.emit(aid + '!found', cacheAid);
445 } else {
446 remindDiscovery = setInterval(function () {
447 discovery.notifySearched(aid);
448 }, properties.discoveryRemind);
449 }
450}
451
452/**
453 * Set hubiquitus properties
454 * @param key
455 * @param value
456 * @returns {object} module reference
457 */
458exports.set = function (key, value) {
459 if (!_.isString(key)) {
460 return this;
461 }
462
463 switch (key) {
464 case 'discoveryAddrs':
465 discovery.setDiscoveryAddrs(value);
466 break;
467 default:
468 properties[key] = value;
469 }
470
471 return this;
472};