1 |
|
2 |
|
3 |
|
4 |
|
5 | var _ = require('lodash');
|
6 | var EventEmitter = require('events').EventEmitter;
|
7 | var url = require('url');
|
8 | var dgram = require('dgram');
|
9 | var async = require('async');
|
10 | var semver = require('semver');
|
11 |
|
12 | var properties = require('./properties');
|
13 | var actors = require('./actors');
|
14 | var logger = require('./logger')('hubiquitus:core:discovery');
|
15 | var utils = {
|
16 | aid: require('./utils/aid')
|
17 | };
|
18 |
|
19 | exports.__proto__ = new EventEmitter();
|
20 | exports.setMaxListeners(0);
|
21 |
|
22 |
|
23 |
|
24 |
|
25 | var started = false;
|
26 |
|
27 |
|
28 |
|
29 |
|
30 | var locked = false;
|
31 |
|
32 |
|
33 |
|
34 |
|
35 | var mcastSock = null;
|
36 |
|
37 |
|
38 |
|
39 |
|
40 | var localSock = null;
|
41 |
|
42 |
|
43 |
|
44 |
|
45 | var lastResearches = {};
|
46 |
|
47 |
|
48 |
|
49 |
|
50 | var researchesLoops = {};
|
51 |
|
52 |
|
53 |
|
54 |
|
55 |
|
56 | var localInfos = {};
|
57 |
|
58 |
|
59 |
|
60 |
|
61 |
|
62 | var mcastInfos = {};
|
63 |
|
64 |
|
65 |
|
66 |
|
67 |
|
68 | var discoveryAddrs = null;
|
69 |
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 | exports.start = function (params, done) {
|
76 | if (locked || started) {
|
77 | var msg = locked ? 'busy' : 'already started';
|
78 | return logger.makeLog('warn', 'hub-38', 'attempt to start discovery while ' + msg + ' !');
|
79 | }
|
80 |
|
81 | locked = true;
|
82 |
|
83 | localInfos.host = properties.netInfo.ip;
|
84 | localInfos.port = params.port || _.random(3000, 60000);
|
85 | if (semver.gt(process.version, '0.12.0'))
|
86 | localSock = dgram.createSocket({type:'udp4', reuseAddr: true});
|
87 | else
|
88 | localSock = dgram.createSocket('udp4');
|
89 | localSock.on('message', onMessage);
|
90 |
|
91 | if (params.addr) {
|
92 | var mcastAddr = url.parse(params.addr);
|
93 | mcastInfos.host = mcastAddr.hostname;
|
94 | mcastInfos.port = mcastAddr.port || 5555;
|
95 | if (semver.gt(process.version, '0.12.0'))
|
96 | mcastSock = dgram.createSocket({type:'udp4', reuseAddr: true});
|
97 | else
|
98 | mcastSock = dgram.createSocket('udp4');
|
99 | mcastSock.on('message', onMessage);
|
100 | }
|
101 |
|
102 | if (!mcastSock) {
|
103 | localSock.bind(localInfos.port, localInfos.host, onStart);
|
104 | } else {
|
105 | async.parallel([
|
106 | function (done) { localSock.bind(localInfos.port, localInfos.host, done); },
|
107 | function (done) { mcastSock.bind(mcastInfos.port, mcastInfos.host, done); }
|
108 | ], onStart);
|
109 | }
|
110 |
|
111 | function onStart() {
|
112 | started = true;
|
113 | locked = false;
|
114 | logger.makeLog('trace', 'hub-22', 'discovery started !');
|
115 | done && done();
|
116 | }
|
117 | };
|
118 |
|
119 |
|
120 |
|
121 |
|
122 |
|
123 | exports.stop = function (done) {
|
124 | if (locked || !started) {
|
125 | var msg = locked ? 'busy' : 'already stopped';
|
126 | return logger.makeLog('warn', 'hub-39', 'attempt to stop container while ' + msg + ' !');
|
127 | }
|
128 |
|
129 | _.forEach(researchesLoops, function (loop, aid) {
|
130 | logger.makeLog('trace', 'hub-47', 'stop discovery for ' + aid + '; stoping discovery');
|
131 | clearTimeout(loop);
|
132 | });
|
133 |
|
134 | lastResearches = {};
|
135 | researchesLoops = {};
|
136 |
|
137 | if (localSock) {
|
138 | localSock.close();
|
139 | localSock = null;
|
140 | }
|
141 |
|
142 | if (mcastSock) {
|
143 | mcastSock.close();
|
144 | mcastSock = null;
|
145 | }
|
146 |
|
147 | started = false;
|
148 |
|
149 | logger.makeLog('trace', 'hub-54', 'discovery stopped !');
|
150 | done && done();
|
151 | };
|
152 |
|
153 |
|
154 |
|
155 |
|
156 |
|
157 | exports.setDiscoveryAddrs = function (value) {
|
158 | if (_.isArray(value)) {
|
159 | discoveryAddrs = [];
|
160 | _.forEach(value, function (addr) {
|
161 | var addrComp = url.parse(addr);
|
162 | var host = addrComp.hostname;
|
163 | var port = addrComp.port || 5555;
|
164 | var infos = {host: host, port: port};
|
165 | discoveryAddrs.push(infos);
|
166 | });
|
167 | } else {
|
168 | discoveryAddrs = null;
|
169 | }
|
170 | };
|
171 |
|
172 |
|
173 |
|
174 |
|
175 |
|
176 | exports.notifySearched = function (aid) {
|
177 | aid = utils.aid.bare(aid);
|
178 | if (!lastResearches[aid]) {
|
179 | lastResearches[aid] = Date.now();
|
180 | logger.makeLog('trace', 'hub-11', 'start discovery for ' + aid);
|
181 | exports.emit('discovery started', aid);
|
182 | (function tick(aid) {
|
183 | if (started && Date.now() - lastResearches[aid] < properties.discoveryTimeout) {
|
184 | logger.makeLog('trace', 'hub-45', 'discovery for ' + aid);
|
185 | exports.emit('discovery', aid);
|
186 | search(aid);
|
187 | researchesLoops[aid] = setTimeout(function () {
|
188 | tick(aid);
|
189 | }, _.random(properties.discoveryMinInterval, properties.discoveryMaxInterval));
|
190 | } else {
|
191 | logger.makeLog('trace', 'hub-12', 'stop discovery for ' + aid + '; too much time since last request');
|
192 | delete lastResearches[aid];
|
193 | delete researchesLoops[aid];
|
194 | exports.emit('discovery stopped', aid);
|
195 | }
|
196 | })(aid);
|
197 | } else {
|
198 | logger.makeLog('trace', 'hub-49', 'update last notify date for ' + aid);
|
199 | lastResearches[aid] = Date.now();
|
200 | }
|
201 | };
|
202 |
|
203 |
|
204 |
|
205 |
|
206 |
|
207 | exports.discoveries = function () {
|
208 | return lastResearches;
|
209 | };
|
210 |
|
211 |
|
212 |
|
213 |
|
214 |
|
215 | function search(aid) {
|
216 | var msg = encode({type: 'search', from: properties.ID, vfrom: properties.name, aid: aid, netInfo: localInfos});
|
217 |
|
218 | if (mcastSock) {
|
219 | localSock.send(msg, 0, msg.length, mcastInfos.port, mcastInfos.host);
|
220 | } else if (discoveryAddrs) {
|
221 | _.forEach(discoveryAddrs, function (container) {
|
222 | localSock.send(msg, 0, msg.length, container.port, container.host);
|
223 | });
|
224 | }
|
225 | }
|
226 |
|
227 |
|
228 |
|
229 |
|
230 |
|
231 | function onMessage(buffer) {
|
232 | var msg;
|
233 | try {
|
234 | msg = decode(buffer);
|
235 | } catch (err) {
|
236 | return logger.makeLog('warn', 'hub-21', 'error parsing incomming discovery message...', err);
|
237 | }
|
238 | if (msg && msg.type === 'search' && msg.from !== properties.ID) {
|
239 | onSearch(msg);
|
240 | } else if (msg && msg.type === 'result' && msg.to === properties.ID) {
|
241 | onResult(msg);
|
242 | }
|
243 | }
|
244 |
|
245 |
|
246 |
|
247 |
|
248 |
|
249 | function onSearch(msg) {
|
250 | logger.makeLog('trace', 'hub-7', 'search request for actor ' + msg.aid + ' received from node ' + msg.vfrom + ' (ID ' + msg.from + ')', {netInfo: msg.netInfo});
|
251 | var aids = actors.getAll(msg.aid, actors.scope.PROCESS);
|
252 | if (!_.isEmpty(aids)) {
|
253 | logger.makeLog('trace', 'hub-8', 'actor ' + msg.aid + ' for node ' + msg.vfrom + ' (ID ' + msg.from + ') found !', {netInfo: msg.netInfo});
|
254 | _.forEach(aids, function (aid) {
|
255 | var res = encode({type: 'result', from: properties.ID, vfrom: properties.name, to: msg.from, aid: aid, netInfo: properties.netInfo});
|
256 | localSock.send(res, 0, res.length, msg.netInfo.port, msg.netInfo.host);
|
257 | });
|
258 | }
|
259 | }
|
260 |
|
261 |
|
262 |
|
263 |
|
264 |
|
265 | function onResult(msg) {
|
266 | logger.makeLog('trace', 'hub-9', 'actor ' + msg.aid + ' found on node ' + msg.vfrom + ' (ID ' + msg.from + ')', {msg: msg});
|
267 | actors.add({id: msg.aid, container: {id: msg.from, name: msg.vfrom, netInfo: msg.netInfo}});
|
268 | }
|
269 |
|
270 |
|
271 |
|
272 |
|
273 |
|
274 |
|
275 | function encode(msg) {
|
276 | var result = null;
|
277 | var type, from, vfrom, aid;
|
278 |
|
279 | if (msg.type === 'search') {
|
280 | type = new Buffer([0]);
|
281 | from = new Buffer(msg.from, 'utf8');
|
282 | vfrom = new Buffer(msg.vfrom, 'utf8');
|
283 | aid = new Buffer(msg.aid, 'utf8');
|
284 | var senderHost = new Buffer(msg.netInfo.host, 'utf8');
|
285 | var senderPort = new Buffer(2);
|
286 | senderPort.writeUInt16BE(msg.netInfo.port, 0);
|
287 | result = concatBuffers([type, from, vfrom, aid, senderHost, senderPort]);
|
288 | } else if (msg.type === 'result') {
|
289 | type = new Buffer([1]);
|
290 | from = new Buffer(msg.from, 'utf8');
|
291 | vfrom = new Buffer(msg.vfrom, 'utf8');
|
292 | var to = new Buffer(msg.to, 'utf8');
|
293 | aid = new Buffer(msg.aid, 'utf8');
|
294 | var ip = new Buffer(msg.netInfo.ip, 'utf8');
|
295 | var pid = new Buffer(4);
|
296 | pid.writeUInt32BE(msg.netInfo.pid, 0);
|
297 | var port = new Buffer(2);
|
298 | port.writeUInt16BE(msg.netInfo.port, 0);
|
299 | result = concatBuffers([type, from, vfrom, to, aid, ip, pid, port]);
|
300 | }
|
301 |
|
302 | return result;
|
303 | }
|
304 |
|
305 |
|
306 |
|
307 |
|
308 |
|
309 |
|
310 | function decode(buffer) {
|
311 | if (!buffer || !buffer.length) return null;
|
312 | var result = null;
|
313 | var type, from, vfrom, aid;
|
314 | var bufferComponents = splitBuffers(buffer);
|
315 | var idx = 0;
|
316 |
|
317 | if (bufferComponents.length === 6 && nextBuffer()[0] === 0) {
|
318 | type = 'search';
|
319 | from = nextBuffer().toString('utf8');
|
320 | vfrom = nextBuffer().toString('utf8');
|
321 | aid = nextBuffer().toString('utf8');
|
322 | var senderHost = nextBuffer().toString('utf8');
|
323 | var senderPort = nextBuffer().readUInt16BE(0);
|
324 | result = {type: type, from: from, vfrom: vfrom, aid: aid, netInfo: {host: senderHost, port: senderPort}};
|
325 | } else if (bufferComponents.length === 8 && nextBuffer()[0] === 1) {
|
326 | type = 'result';
|
327 | from = nextBuffer().toString('utf8');
|
328 | vfrom = nextBuffer().toString('utf8');
|
329 | var to = nextBuffer().toString('utf8');
|
330 | aid = nextBuffer().toString('utf8');
|
331 | var ip = nextBuffer().toString('utf8');
|
332 | var pid = nextBuffer().readUInt32BE(0);
|
333 | var port = nextBuffer().readUInt16BE(0);
|
334 | result = {type: type, from: from, vfrom: vfrom, to: to, aid: aid, netInfo: {ip: ip, pid: pid, port: port}};
|
335 | } else {
|
336 | throw new Error('malformat discovery message');
|
337 | }
|
338 |
|
339 | function nextBuffer() {
|
340 | return bufferComponents[idx++];
|
341 | }
|
342 |
|
343 | return result;
|
344 | }
|
345 |
|
346 |
|
347 |
|
348 |
|
349 |
|
350 |
|
351 | function concatBuffers(items) {
|
352 | var result = null;
|
353 |
|
354 | var length = 0;
|
355 | _.forEach(items, function (item) {
|
356 | length += 2 + item.length;
|
357 | });
|
358 |
|
359 | result = new Buffer(length);
|
360 |
|
361 | var offset = 0;
|
362 | _.forEach(items, function (item) {
|
363 | var itemLength = item.length;
|
364 | result.writeUInt16BE(itemLength, offset);
|
365 | offset += 2;
|
366 | item.copy(result, offset);
|
367 | offset += itemLength;
|
368 | });
|
369 |
|
370 | return result;
|
371 | }
|
372 |
|
373 |
|
374 |
|
375 |
|
376 |
|
377 |
|
378 | function splitBuffers(buffer) {
|
379 | var result = [];
|
380 | var lastOffset = 0;
|
381 | var bufferLength = buffer.length;
|
382 | while (lastOffset < bufferLength) {
|
383 | var length = buffer.readUInt16BE(lastOffset);
|
384 | lastOffset += 2;
|
385 | if (length <= 0) return null;
|
386 | result.push(buffer.slice(lastOffset, lastOffset + length));
|
387 | lastOffset += length;
|
388 | }
|
389 | return result;
|
390 | }
|