1 | var os = require('os');
|
2 | var AutoScout = require('zetta-auto-scout');
|
3 | var async = require('async');
|
4 | var HttpScout = require('./lib/http_scout');
|
5 | var HttpServer = require('./lib/http_server');
|
6 | var Logger = require('./lib/logger');
|
7 | var PeerClient = require('./lib/peer_client');
|
8 | var PeerRegistry = require('./lib/peer_registry');
|
9 | var PubSub = require('./lib/pubsub_service');
|
10 | var Runtime = require('./lib/runtime');
|
11 | var scientist = require('zetta-scientist');
|
12 | var Query = require('calypso').Query;
|
13 |
|
14 | var Zetta = module.exports = function(opts) {
|
15 | if (!(this instanceof Zetta)) {
|
16 | return new Zetta(opts);
|
17 | }
|
18 |
|
19 | opts = opts || {};
|
20 |
|
21 | this._name = os.hostname();
|
22 | this.id = this._name;
|
23 | this._properties = {};
|
24 |
|
25 | this._exposeQuery = '';
|
26 | this._scouts = [];
|
27 | this._apps = [];
|
28 | this._peers = [];
|
29 | this._peerClients = [];
|
30 |
|
31 | this.peerRegistry = opts.peerRegistry || new PeerRegistry();
|
32 |
|
33 | this.pubsub = opts.pubsub || new PubSub();
|
34 | this.log = opts.log || new Logger({ pubsub: this.pubsub });
|
35 | this.log.init();
|
36 | this._silent = false;
|
37 |
|
38 | this.httpServer = new HttpServer(this);
|
39 |
|
40 | var runtimeOptions = {
|
41 | pubsub: this.pubsub,
|
42 | log: this.log,
|
43 | httpServer: this.httpServer
|
44 | };
|
45 |
|
46 | if (opts && opts.registry) {
|
47 | runtimeOptions.registry = opts.registry;
|
48 | }
|
49 | this.runtime = new Runtime(runtimeOptions);
|
50 |
|
51 | var httpScout = scientist.create.apply(null, [HttpScout]);
|
52 | httpScout.server = this.runtime;
|
53 | this.httpScout = httpScout;
|
54 | this._scouts.push(httpScout);
|
55 | };
|
56 |
|
57 | Zetta.prototype.silent = function() {
|
58 | this._silent = true;
|
59 | return this;
|
60 | };
|
61 |
|
62 |
|
63 | Zetta.prototype.logger = function(func) {
|
64 | this._silent = true;
|
65 | func(this.log);
|
66 | return this;
|
67 | };
|
68 |
|
69 | Zetta.prototype.name = function(name) {
|
70 | if (name === '*') {
|
71 | throw new Error('Cannot set name to *');
|
72 | }
|
73 |
|
74 | this._name = name;
|
75 | this.id = this._name;
|
76 | return this;
|
77 | };
|
78 |
|
79 | Zetta.prototype.properties = function(props) {
|
80 | var self = this;
|
81 | if (typeof props === 'object') {
|
82 | delete props.name;
|
83 | this._properties = props;
|
84 | }
|
85 | return this;
|
86 | };
|
87 |
|
88 | Zetta.prototype.getProperties = function() {
|
89 | var self = this;
|
90 | var ret = { name: this._name };
|
91 | Object.keys(this._properties).forEach(function(k) {
|
92 | ret[k] = self._properties[k];
|
93 | });
|
94 | return ret;
|
95 | };
|
96 |
|
97 | Zetta.prototype.use = function() {
|
98 | var args = Array.prototype.slice.call(arguments);
|
99 | var constructor = args[0];
|
100 |
|
101 | var self = this;
|
102 | function addScout(scout) {
|
103 | scout.server = self.runtime;
|
104 | self._scouts.push(scout);
|
105 | }
|
106 |
|
107 | function init() {
|
108 | var machine = Object.create(constructor.prototype);
|
109 | constructor.apply(machine, args.slice(1));
|
110 | machine._pubsub = self.pubsub;
|
111 | machine._log = self.log;
|
112 | machine._registry = self.runtime.registry;
|
113 |
|
114 | var config = scientist.config(machine);
|
115 | return { config: config, instance: machine };
|
116 | }
|
117 |
|
118 | function walk(proto) {
|
119 | if (!proto || !proto.__proto__) {
|
120 | self.load.apply(self, args);
|
121 | } else if (proto.__proto__.constructor.name === 'HttpDevice') {
|
122 | var config = init().config;
|
123 | self.httpScout.driverFunctions[config._type] = constructor;
|
124 | } else if (proto.__proto__.constructor.name === 'Device') {
|
125 | var build = init();
|
126 | args.unshift(build.config._type);
|
127 | var scout = Object.create(AutoScout.prototype);
|
128 | scout._deviceInstance = build;
|
129 | AutoScout.apply(scout, args);
|
130 | addScout(scout);
|
131 | } else if (proto.__proto__.constructor.name === 'Scout') {
|
132 | var scout = scientist.create.apply(null, args);
|
133 | addScout(scout);
|
134 | } else {
|
135 | walk(proto.__proto__);
|
136 | }
|
137 | }
|
138 |
|
139 | walk(constructor.prototype);
|
140 |
|
141 | return this;
|
142 | };
|
143 |
|
144 | Zetta.prototype.expose = function(query) {
|
145 | this._exposeQuery = query;
|
146 | this.runtime.expose(query);
|
147 | return this;
|
148 | };
|
149 |
|
150 | Zetta.prototype.load = function() {
|
151 | var args = Array.prototype.slice.call(arguments);
|
152 | var appArgs = args.slice(1, args.length);
|
153 | var app = {
|
154 | app: args[0],
|
155 | args: appArgs
|
156 | };
|
157 | this._apps.push(app);
|
158 | return this;
|
159 | };
|
160 |
|
161 | Zetta.prototype.link = function(peers) {
|
162 | var self = this;
|
163 | if(!Array.isArray(peers)) {
|
164 | peers = [peers];
|
165 | }
|
166 |
|
167 | peers.forEach(function(peer) {
|
168 |
|
169 | self._peers.push(peer);
|
170 | });
|
171 |
|
172 | return this;
|
173 | };
|
174 |
|
175 |
|
176 | Zetta.prototype.listen = function() {
|
177 | var self = this;
|
178 |
|
179 | var args = Array.prototype.slice.call(arguments);
|
180 |
|
181 | var last = args[args.length - 1];
|
182 |
|
183 | var callback;
|
184 | if (typeof last === 'function') {
|
185 | callback = last;
|
186 | }
|
187 |
|
188 | this._run(function(err){
|
189 | if(err) {
|
190 | if (callback) {
|
191 | return callback(err);
|
192 | } else {
|
193 | throw err;
|
194 | }
|
195 | }
|
196 |
|
197 | var cb = function(err) {
|
198 | if (err) {
|
199 | if (callback) {
|
200 | callback(err);
|
201 | } else {
|
202 | throw err;
|
203 | }
|
204 | }
|
205 |
|
206 | var host;
|
207 | if (typeof args[0] === 'string') {
|
208 | host = args[0];
|
209 | } else if (typeof args[0] === 'number') {
|
210 | if (args.length > 1 && typeof args[1] === 'string') {
|
211 | host = 'http://' + args[1] + ':' + args[0];
|
212 | } else {
|
213 | host = 'http://127.0.0.1:' + args[0];
|
214 | }
|
215 | } else if (typeof args[0] === 'object' && args[0].fd) {
|
216 | host = 'fd: ' + args[0].fd;
|
217 | } else {
|
218 | host = '<unknown>';
|
219 | }
|
220 |
|
221 | self.log.emit('log', 'server', 'Server (' + self._name + ') ' + self.id + ' listening on ' + host);
|
222 |
|
223 | if (callback) {
|
224 | callback(err);
|
225 | }
|
226 | };
|
227 |
|
228 | if (!callback) {
|
229 | args.push(cb);
|
230 | } else {
|
231 | args[args.length - 1] = cb;
|
232 | }
|
233 |
|
234 | self.httpServer.listen.apply(self.httpServer, args);
|
235 | });
|
236 |
|
237 | return this;
|
238 | };
|
239 |
|
240 |
|
241 | Zetta.prototype._run = function(callback) {
|
242 | var self = this;
|
243 |
|
244 | if(!callback) {
|
245 | callback = function(){};
|
246 | }
|
247 |
|
248 | if (!this._silent) {
|
249 | Logger.ConsoleOutput(this.log);
|
250 | }
|
251 |
|
252 | async.series([
|
253 | function(next) {
|
254 | self.runtime.registry._init(next);
|
255 | },
|
256 | function(next) {
|
257 | self.peerRegistry._init(next);
|
258 | },
|
259 | function(next) {
|
260 | self._initScouts(next);
|
261 | },
|
262 | function(next) {
|
263 | self._initApps(next);
|
264 | },
|
265 | function(next) {
|
266 | self._initHttpServer(next);
|
267 | },
|
268 | function(next) {
|
269 | self._cleanupPeers(next);
|
270 | },
|
271 | function(next) {
|
272 | self._initPeers(self._peers, next);
|
273 | self.link = function(peers, cb) {
|
274 | self._initPeers(peers, (cb || function() {}) );
|
275 | };
|
276 | }
|
277 | ], function(err){
|
278 | setImmediate(function() {
|
279 | callback(err);
|
280 | });
|
281 | });
|
282 |
|
283 | return this;
|
284 | };
|
285 |
|
286 | Zetta.prototype._initScouts = function(callback) {
|
287 | async.each(this._scouts, function(scout, next) {
|
288 | scout.init(next);
|
289 | }, function(err) {
|
290 | callback(err);
|
291 | });
|
292 |
|
293 | return this;
|
294 | };
|
295 |
|
296 | Zetta.prototype._initApps = function(callback) {
|
297 | var self = this;
|
298 | this._apps.forEach(function(app) {
|
299 | var args = app.args;
|
300 | args.unshift(self.runtime);
|
301 | app.app.apply(null, args);
|
302 | });
|
303 | callback();
|
304 |
|
305 | return this;
|
306 | };
|
307 |
|
308 | Zetta.prototype._initHttpServer = function(callback) {
|
309 | this.httpServer.init();
|
310 | callback();
|
311 |
|
312 | return this;
|
313 | };
|
314 |
|
315 |
|
316 |
|
317 | Zetta.prototype._cleanupPeers = function(callback) {
|
318 | var self = this;
|
319 | this.peerRegistry.find(Query.of('peers'), function(err, results) {
|
320 | if(err) {
|
321 | callback(err);
|
322 | return;
|
323 | }
|
324 |
|
325 | async.forEach(results, function(peer, next) {
|
326 | peer.status = 'disconnected';
|
327 | self.peerRegistry.save(peer, next);
|
328 | }, callback);
|
329 | });
|
330 | };
|
331 |
|
332 | Zetta.prototype._initPeers = function(peers, callback) {
|
333 | var self = this;
|
334 | var existingUrls = [];
|
335 | var allPeers = [];
|
336 |
|
337 | if (!Array.isArray(peers)) {
|
338 | peers = [peers];
|
339 | }
|
340 |
|
341 | this.peerRegistry.find(Query.of('peers'), function(err, results) {
|
342 | if(err) {
|
343 | callback(err);
|
344 | return;
|
345 | }
|
346 |
|
347 | results.forEach(function(peer) {
|
348 | peer.status = 'disconnected';
|
349 | if (peer.direction === 'initiator' && peer.url) {
|
350 | allPeers.push(peer);
|
351 | existingUrls.push(peer.url);
|
352 | return;
|
353 | }
|
354 | });
|
355 |
|
356 |
|
357 | allPeers = allPeers.concat(peers.filter(function(peer) {
|
358 | return existingUrls.indexOf(peer) === -1;
|
359 | }));
|
360 |
|
361 | allPeers.forEach(function(obj) {
|
362 | var existing = (typeof obj === 'object');
|
363 | if (existing) {
|
364 | if(!obj.fromLink || peers.indexOf(obj.url) > -1) {
|
365 | self.peerRegistry.save(obj, function() {
|
366 | self._runPeer(obj);
|
367 | });
|
368 | } else {
|
369 |
|
370 | self.peerRegistry.remove(obj, function(err){
|
371 | if(err) {
|
372 | console.error(err);
|
373 | }
|
374 | });
|
375 | }
|
376 | } else {
|
377 | var peerData = {
|
378 | url: obj,
|
379 | direction: 'initiator',
|
380 | fromLink:true
|
381 | };
|
382 | self.peerRegistry.add(peerData, function(err, newPeer) {
|
383 | self._runPeer(newPeer);
|
384 | });
|
385 | }
|
386 |
|
387 |
|
388 | });
|
389 |
|
390 |
|
391 | callback();
|
392 | });
|
393 |
|
394 | return this;
|
395 | };
|
396 |
|
397 | Zetta.prototype._extendPeerRequest = function(client) {
|
398 | this.runtime.modifyPeerRequest(client.ws);
|
399 | };
|
400 |
|
401 | Zetta.prototype._extendPeerResponse = function(client) {
|
402 | this.runtime.modifyPeerResponse(client.ws);
|
403 | };
|
404 |
|
405 | Zetta.prototype._runPeer = function(peer) {
|
406 | var self = this;
|
407 | var peerClient = new PeerClient(peer.url, self);
|
408 | this._extendPeerRequest(peerClient);
|
409 | this._extendPeerResponse(peerClient);
|
410 |
|
411 | self._peerClients.push(peerClient);
|
412 |
|
413 |
|
414 | peerClient.on('connecting', function() {
|
415 | peer.status = 'connecting';
|
416 | self.peerRegistry.save(peer);
|
417 | });
|
418 |
|
419 |
|
420 | peerClient.on('connected', function() {
|
421 | peer.status = 'connected';
|
422 | peer.connectionId = peerClient.connectionId;
|
423 | self.peerRegistry.save(peer);
|
424 |
|
425 |
|
426 | self.pubsub.publish('_peer/connect', { peer: peerClient});
|
427 | });
|
428 |
|
429 | peerClient.on('error', function(error) {
|
430 | self.peerRegistry.get(peer.id, function(err, result) {
|
431 | result.status = 'failed';
|
432 | result.error = error;
|
433 | self.peerRegistry.save(result);
|
434 |
|
435 |
|
436 | self.pubsub.publish('_peer/disconnect', { peer: peerClient });
|
437 | });
|
438 | });
|
439 |
|
440 | peerClient.on('closed', function() {
|
441 | self.peerRegistry.get(peer.id, function(err, result) {
|
442 | result.status = 'disconnected';
|
443 |
|
444 |
|
445 | self.pubsub.publish('_peer/disconnect', { peer: peerClient });
|
446 | self.peerRegistry.save(result, function() { });
|
447 | });
|
448 | });
|
449 |
|
450 | peerClient.start();
|
451 | }
|