UNPKG

10.8 kBJavaScriptView Raw
1/**
2 * @module discovery
3 */
4
5var _ = require('lodash');
6var EventEmitter = require('events').EventEmitter;
7var url = require('url');
8var dgram = require('dgram');
9var async = require('async');
10var semver = require('semver');
11
12var properties = require('./properties');
13var actors = require('./actors');
14var logger = require('./logger')('hubiquitus:core:discovery');
15var utils = {
16 aid: require('./utils/aid')
17};
18
19exports.__proto__ = new EventEmitter();
20exports.setMaxListeners(0);
21
22/**
23 * @type {boolean}
24 */
25var started = false;
26
27/**
28 * @type {boolean}
29 */
30var locked = false;
31
32/**
33 * @type {Socket}
34 */
35var mcastSock = null;
36
37/**
38 * @type {Socket}
39 */
40var localSock = null;
41
42/**
43 * @type {object}
44 */
45var lastResearches = {};
46
47/**
48 * @type {object}
49 */
50var researchesLoops = {};
51
52/**
53 * Local socket infos
54 * @type {object}
55 */
56var localInfos = {};
57
58/**
59 * Multicast/Broadcast discovery socket infos
60 * @type {object}
61 */
62var mcastInfos = {};
63
64/**
65 * Remote containers discovery addrs
66 * @type {Array} Array of string with format udp://192.168.0.1:5555
67 */
68var discoveryAddrs = null;
69
70/**
71 * Starts containers discovery
72 * @param params {object} discovery parameters
73 * @param [done] {function} done callback
74 */
75exports.start = function (params, done) {
76 if (locked || started) {
77 var msg = locked ? 'busy' : 'already started';
78 return logger.makeLog('warn', 'hub-38', 'attempt to start discovery while ' + msg + ' !');
79 }
80
81 locked = true;
82
83 localInfos.host = properties.netInfo.ip;
84 localInfos.port = params.port || _.random(3000, 60000);
85 if (semver.gt(process.version, '0.12.0'))
86 localSock = dgram.createSocket({type:'udp4', reuseAddr: true});
87 else
88 localSock = dgram.createSocket('udp4');
89 localSock.on('message', onMessage);
90
91 if (params.addr) {
92 var mcastAddr = url.parse(params.addr);
93 mcastInfos.host = mcastAddr.hostname;
94 mcastInfos.port = mcastAddr.port || 5555;
95 if (semver.gt(process.version, '0.12.0'))
96 mcastSock = dgram.createSocket({type:'udp4', reuseAddr: true});
97 else
98 mcastSock = dgram.createSocket('udp4');
99 mcastSock.on('message', onMessage);
100 }
101
102 if (!mcastSock) {
103 localSock.bind(localInfos.port, localInfos.host, onStart);
104 } else {
105 async.parallel([
106 function (done) { localSock.bind(localInfos.port, localInfos.host, done); },
107 function (done) { mcastSock.bind(mcastInfos.port, mcastInfos.host, done); }
108 ], onStart);
109 }
110
111 function onStart() {
112 started = true;
113 locked = false;
114 logger.makeLog('trace', 'hub-22', 'discovery started !');
115 done && done();
116 }
117};
118
119/**
120 * Stops containers discovery
121 * @param [done] {function} done callback
122 */
123exports.stop = function (done) {
124 if (locked || !started) {
125 var msg = locked ? 'busy' : 'already stopped';
126 return logger.makeLog('warn', 'hub-39', 'attempt to stop container while ' + msg + ' !');
127 }
128
129 _.forEach(researchesLoops, function (loop, aid) {
130 logger.makeLog('trace', 'hub-47', 'stop discovery for ' + aid + '; stoping discovery');
131 clearTimeout(loop);
132 });
133
134 lastResearches = {};
135 researchesLoops = {};
136
137 if (localSock) {
138 localSock.close();
139 localSock = null;
140 }
141
142 if (mcastSock) {
143 mcastSock.close();
144 mcastSock = null;
145 }
146
147 started = false;
148
149 logger.makeLog('trace', 'hub-54', 'discovery stopped !');
150 done && done();
151};
152
153/**
154 * Set discovery addresses
155 * @param value
156 */
157exports.setDiscoveryAddrs = function (value) {
158 if (_.isArray(value)) {
159 discoveryAddrs = [];
160 _.forEach(value, function (addr) {
161 var addrComp = url.parse(addr);
162 var host = addrComp.hostname;
163 var port = addrComp.port || 5555;
164 var infos = {host: host, port: port};
165 discoveryAddrs.push(infos);
166 });
167 } else {
168 discoveryAddrs = null;
169 }
170};
171
172/**
173 * Handle research notification
174 * @param aid {string}
175 */
176exports.notifySearched = function (aid) {
177 aid = utils.aid.bare(aid);
178 if (!lastResearches[aid]) {
179 lastResearches[aid] = Date.now();
180 logger.makeLog('trace', 'hub-11', 'start discovery for ' + aid);
181 exports.emit('discovery started', aid);
182 (function tick(aid) {
183 if (started && Date.now() - lastResearches[aid] < properties.discoveryTimeout) {
184 logger.makeLog('trace', 'hub-45', 'discovery for ' + aid);
185 exports.emit('discovery', aid);
186 search(aid);
187 researchesLoops[aid] = setTimeout(function () {
188 tick(aid);
189 }, _.random(properties.discoveryMinInterval, properties.discoveryMaxInterval));
190 } else {
191 logger.makeLog('trace', 'hub-12', 'stop discovery for ' + aid + '; too much time since last request');
192 delete lastResearches[aid];
193 delete researchesLoops[aid];
194 exports.emit('discovery stopped', aid);
195 }
196 })(aid);
197 } else {
198 logger.makeLog('trace', 'hub-49', 'update last notify date for ' + aid);
199 lastResearches[aid] = Date.now();
200 }
201};
202
203/**
204 * Returns current discoveries with the last research date for each of them
205 * @returns {Object} discoveries
206 */
207exports.discoveries = function () {
208 return lastResearches;
209};
210
211/**
212 * Search for aid
213 * @params {string} aid
214 */
215function search(aid) {
216 var msg = encode({type: 'search', from: properties.ID, vfrom: properties.name, aid: aid, netInfo: localInfos});
217
218 if (mcastSock) {
219 localSock.send(msg, 0, msg.length, mcastInfos.port, mcastInfos.host);
220 } else if (discoveryAddrs) {
221 _.forEach(discoveryAddrs, function (container) {
222 localSock.send(msg, 0, msg.length, container.port, container.host);
223 });
224 }
225}
226
227/**
228 * Discovery message handler
229 * @param {Buffer} buffer
230 */
231function onMessage(buffer) {
232 var msg;
233 try {
234 msg = decode(buffer);
235 } catch (err) {
236 return logger.makeLog('warn', 'hub-21', 'error parsing incomming discovery message...', err);
237 }
238 if (msg && msg.type === 'search' && msg.from !== properties.ID) {
239 onSearch(msg);
240 } else if (msg && msg.type === 'result' && msg.to === properties.ID) {
241 onResult(msg);
242 }
243}
244
245/**
246 * Discovery search handler
247 * @param msg {object} incomming search request
248 */
249function onSearch(msg) {
250 logger.makeLog('trace', 'hub-7', 'search request for actor ' + msg.aid + ' received from node ' + msg.vfrom + ' (ID ' + msg.from + ')', {netInfo: msg.netInfo});
251 var aids = actors.getAll(msg.aid, actors.scope.PROCESS);
252 if (!_.isEmpty(aids)) {
253 logger.makeLog('trace', 'hub-8', 'actor ' + msg.aid + ' for node ' + msg.vfrom + ' (ID ' + msg.from + ') found !', {netInfo: msg.netInfo});
254 _.forEach(aids, function (aid) {
255 var res = encode({type: 'result', from: properties.ID, vfrom: properties.name, to: msg.from, aid: aid, netInfo: properties.netInfo});
256 localSock.send(res, 0, res.length, msg.netInfo.port, msg.netInfo.host);
257 });
258 }
259}
260
261/**
262 * Discovery result handler
263 * @param msg {object} incomming result
264 */
265function onResult(msg) {
266 logger.makeLog('trace', 'hub-9', 'actor ' + msg.aid + ' found on node ' + msg.vfrom + ' (ID ' + msg.from + ')', {msg: msg});
267 actors.add({id: msg.aid, container: {id: msg.from, name: msg.vfrom, netInfo: msg.netInfo}});
268}
269
270/**
271 * Encode a discovery message into a buffer following the discovery protocol
272 * @param msg {object} Discovery message
273 * @returns {Buffer}
274 */
275function encode(msg) {
276 var result = null;
277 var type, from, vfrom, aid;
278
279 if (msg.type === 'search') {
280 type = new Buffer([0]);
281 from = new Buffer(msg.from, 'utf8');
282 vfrom = new Buffer(msg.vfrom, 'utf8');
283 aid = new Buffer(msg.aid, 'utf8');
284 var senderHost = new Buffer(msg.netInfo.host, 'utf8');
285 var senderPort = new Buffer(2);
286 senderPort.writeUInt16BE(msg.netInfo.port, 0);
287 result = concatBuffers([type, from, vfrom, aid, senderHost, senderPort]);
288 } else if (msg.type === 'result') {
289 type = new Buffer([1]);
290 from = new Buffer(msg.from, 'utf8');
291 vfrom = new Buffer(msg.vfrom, 'utf8');
292 var to = new Buffer(msg.to, 'utf8');
293 aid = new Buffer(msg.aid, 'utf8');
294 var ip = new Buffer(msg.netInfo.ip, 'utf8');
295 var pid = new Buffer(4);
296 pid.writeUInt32BE(msg.netInfo.pid, 0);
297 var port = new Buffer(2);
298 port.writeUInt16BE(msg.netInfo.port, 0);
299 result = concatBuffers([type, from, vfrom, to, aid, ip, pid, port]);
300 }
301
302 return result;
303}
304
305/**
306 * Decode a discovery buffer into a discovery object
307 * @param buffer {Buffer}
308 * @returns {object} decoded discovery buffer
309 */
310function decode(buffer) {
311 if (!buffer || !buffer.length) return null;
312 var result = null;
313 var type, from, vfrom, aid;
314 var bufferComponents = splitBuffers(buffer);
315 var idx = 0;
316
317 if (bufferComponents.length === 6 && nextBuffer()[0] === 0) {
318 type = 'search';
319 from = nextBuffer().toString('utf8');
320 vfrom = nextBuffer().toString('utf8');
321 aid = nextBuffer().toString('utf8');
322 var senderHost = nextBuffer().toString('utf8');
323 var senderPort = nextBuffer().readUInt16BE(0);
324 result = {type: type, from: from, vfrom: vfrom, aid: aid, netInfo: {host: senderHost, port: senderPort}};
325 } else if (bufferComponents.length === 8 && nextBuffer()[0] === 1) {
326 type = 'result';
327 from = nextBuffer().toString('utf8');
328 vfrom = nextBuffer().toString('utf8');
329 var to = nextBuffer().toString('utf8');
330 aid = nextBuffer().toString('utf8');
331 var ip = nextBuffer().toString('utf8');
332 var pid = nextBuffer().readUInt32BE(0);
333 var port = nextBuffer().readUInt16BE(0);
334 result = {type: type, from: from, vfrom: vfrom, to: to, aid: aid, netInfo: {ip: ip, pid: pid, port: port}};
335 } else {
336 throw new Error('malformat discovery message');
337 }
338
339 function nextBuffer() {
340 return bufferComponents[idx++];
341 }
342
343 return result;
344}
345
346/**
347 * Concat a discovery buffer adding fields buffer length split them
348 * @param items {Array} array of buffers
349 * @returns {Buffer} concatened buffer with fields length
350 */
351function concatBuffers(items) {
352 var result = null;
353
354 var length = 0;
355 _.forEach(items, function (item) {
356 length += 2 + item.length;
357 });
358
359 result = new Buffer(length);
360
361 var offset = 0;
362 _.forEach(items, function (item) {
363 var itemLength = item.length;
364 result.writeUInt16BE(itemLength, offset);
365 offset += 2;
366 item.copy(result, offset);
367 offset += itemLength;
368 });
369
370 return result;
371}
372
373/**
374 * Split a discovery buffer on each fields length
375 * @param buffer {Buffer} incoming discovery buffer
376 * @returns {Array} array of buffer splitted based on separator value
377 */
378function splitBuffers(buffer) {
379 var result = [];
380 var lastOffset = 0;
381 var bufferLength = buffer.length;
382 while (lastOffset < bufferLength) {
383 var length = buffer.readUInt16BE(lastOffset);
384 lastOffset += 2;
385 if (length <= 0) return null;
386 result.push(buffer.slice(lastOffset, lastOffset + length));
387 lastOffset += length;
388 }
389 return result;
390}