UNPKG

10.6 kBJavaScriptView Raw
1var os = require('os');
2var AutoScout = require('zetta-auto-scout');
3var async = require('async');
4var HttpScout = require('./lib/http_scout');
5var HttpServer = require('./lib/http_server');
6var Logger = require('./lib/logger');
7var PeerClient = require('./lib/peer_client');
8var PeerRegistry = require('./lib/peer_registry');
9var PubSub = require('./lib/pubsub_service');
10var Runtime = require('./lib/runtime');
11var scientist = require('zetta-scientist');
12var Query = require('calypso').Query;
13
14var Zetta = module.exports = function(opts) {
15 if (!(this instanceof Zetta)) {
16 return new Zetta(opts);
17 }
18
19 opts = opts || {};
20
21 this._name = os.hostname(); // optional name, defaults to OS hostname
22 this.id = this._name;
23 this._properties = {}; // custom properties
24
25 this._exposeQuery = '';
26 this._scouts = [];
27 this._apps = [];
28 this._peers = [];
29 this._peerClients = [];
30
31 this.peerRegistry = opts.peerRegistry || new PeerRegistry();
32
33 this.pubsub = opts.pubsub || new PubSub();
34 this.log = opts.log || new Logger({ pubsub: this.pubsub });
35 this.log.init();
36 this._silent = false;
37
38 this.httpServer = new HttpServer(this);
39
40 var runtimeOptions = {
41 pubsub: this.pubsub,
42 log: this.log,
43 httpServer: this.httpServer
44 };
45
46 if (opts && opts.registry) {
47 runtimeOptions.registry = opts.registry;
48 }
49 this.runtime = new Runtime(runtimeOptions);
50
51 var httpScout = scientist.create.apply(null, [HttpScout]);
52 httpScout.server = this.runtime;
53 this.httpScout = httpScout;
54 this._scouts.push(httpScout);
55};
56
57Zetta.prototype.silent = function() {
58 this._silent = true;
59 return this;
60};
61
62// pass in a custom logging
63Zetta.prototype.logger = function(func) {
64 this._silent = true;
65 func(this.log);
66 return this;
67};
68
69Zetta.prototype.name = function(name) {
70 if (name === '*') {
71 throw new Error('Cannot set name to *');
72 }
73
74 this._name = name;
75 this.id = this._name;
76 return this;
77};
78
79Zetta.prototype.properties = function(props) {
80 var self = this;
81 if (typeof props === 'object') {
82 delete props.name; // cannot overide name
83 this._properties = props;
84 }
85 return this;
86};
87
88Zetta.prototype.getProperties = function() {
89 var self = this;
90 var ret = { name: this._name };
91 Object.keys(this._properties).forEach(function(k) {
92 ret[k] = self._properties[k];
93 });
94 return ret;
95};
96
97Zetta.prototype.use = function() {
98 var args = Array.prototype.slice.call(arguments);
99 var constructor = args[0];
100
101 var self = this;
102 function addScout(scout) {
103 scout.server = self.runtime;
104 self._scouts.push(scout);
105 }
106
107 function init() {
108 var machine = Object.create(constructor.prototype);
109 constructor.apply(machine, args.slice(1));
110 machine._pubsub = self.pubsub;
111 machine._log = self.log;
112 machine._registry = self.runtime.registry;
113
114 var config = scientist.config(machine);
115 return { config: config, instance: machine };
116 }
117
118 function walk(proto) {
119 if (!proto || !proto.__proto__) {
120 self.load.apply(self, args);
121 } else if (proto.__proto__.constructor.name === 'HttpDevice') {
122 var config = init().config;
123 self.httpScout.driverFunctions[config._type] = constructor;
124 } else if (proto.__proto__.constructor.name === 'Device') {
125 var build = init();
126 args.unshift(build.config._type);
127 var scout = Object.create(AutoScout.prototype);
128 scout._deviceInstance = build; // pass both machine and config to autoscout need to _generate device
129 AutoScout.apply(scout, args);
130 addScout(scout);
131 } else if (proto.__proto__.constructor.name === 'Scout') {
132 var scout = scientist.create.apply(null, args);
133 addScout(scout);
134 } else {
135 walk(proto.__proto__);
136 }
137 }
138
139 walk(constructor.prototype);
140
141 return this;
142};
143
144Zetta.prototype.expose = function(query) {
145 this._exposeQuery = query;
146 this.runtime.expose(query);
147 return this;
148};
149
150Zetta.prototype.load = function() {
151 var args = Array.prototype.slice.call(arguments);
152 var appArgs = args.slice(1, args.length);
153 var app = {
154 app: args[0],
155 args: appArgs
156 };
157 this._apps.push(app);
158 return this;
159};
160
161Zetta.prototype.link = function(peers) {
162 var self = this;
163 if(!Array.isArray(peers)) {
164 peers = [peers];
165 }
166
167 peers.forEach(function(peer) {
168 //self._peers.push(new PeerClient(peer, self));
169 self._peers.push(peer);
170 });
171
172 return this;
173};
174
175
176Zetta.prototype.listen = function() {
177 var self = this;
178
179 var args = Array.prototype.slice.call(arguments);
180
181 var last = args[args.length - 1];
182
183 var callback;
184 if (typeof last === 'function') {
185 callback = last;
186 }
187
188 this._run(function(err){
189 if(err) {
190 if (callback) {
191 return callback(err);
192 } else {
193 throw err;
194 }
195 }
196
197 var cb = function(err) {
198 if (err) {
199 if (callback) {
200 callback(err);
201 } else {
202 throw err;
203 }
204 }
205
206 var host;
207 if (typeof args[0] === 'string') {
208 host = args[0]; // UNIX socket
209 } else if (typeof args[0] === 'number') {
210 if (args.length > 1 && typeof args[1] === 'string') {
211 host = 'http://' + args[1] + ':' + args[0]; // host + port
212 } else {
213 host = 'http://127.0.0.1:' + args[0]; // just port
214 }
215 } else if (typeof args[0] === 'object' && args[0].fd) {
216 host = 'fd: ' + args[0].fd; // handle
217 } else {
218 host = '<unknown>';
219 }
220
221 self.log.emit('log', 'server', 'Server (' + self._name + ') ' + self.id + ' listening on ' + host);
222
223 if (callback) {
224 callback(err);
225 }
226 };
227
228 if (!callback) {
229 args.push(cb);
230 } else {
231 args[args.length - 1] = cb;
232 }
233
234 self.httpServer.listen.apply(self.httpServer, args);
235 });
236
237 return this;
238};
239
240// run scouts/apps init server but do not listening on http port
241Zetta.prototype._run = function(callback) {
242 var self = this;
243
244 if(!callback) {
245 callback = function(){};
246 }
247
248 if (!this._silent) {
249 Logger.ConsoleOutput(this.log);
250 }
251
252 async.series([
253 function(next) {
254 self.runtime.registry._init(next);
255 },
256 function(next) {
257 self.peerRegistry._init(next);
258 },
259 function(next) {
260 self._initScouts(next);
261 },
262 function(next) {
263 self._initApps(next);
264 },
265 function(next) {
266 self._initHttpServer(next);
267 },
268 function(next) {
269 self._cleanupPeers(next);
270 },
271 function(next) {
272 self._initPeers(self._peers, next);
273 self.link = function(peers, cb) {
274 self._initPeers(peers, (cb || function() {}) );
275 };
276 }
277 ], function(err){
278 setImmediate(function() {
279 callback(err);
280 });
281 });
282
283 return this;
284};
285
286Zetta.prototype._initScouts = function(callback) {
287 async.each(this._scouts, function(scout, next) {
288 scout.init(next);
289 }, function(err) {
290 callback(err);
291 });
292
293 return this;
294};
295
296Zetta.prototype._initApps = function(callback) {
297 var self = this;
298 this._apps.forEach(function(app) {
299 var args = app.args;
300 args.unshift(self.runtime);
301 app.app.apply(null, args);
302 });
303 callback();
304
305 return this;
306};
307
308Zetta.prototype._initHttpServer = function(callback) {
309 this.httpServer.init();
310 callback();
311
312 return this;
313};
314
315
316// set all peers to disconnected
317Zetta.prototype._cleanupPeers = function(callback) {
318 var self = this;
319 this.peerRegistry.find(Query.of('peers'), function(err, results) {
320 if(err) {
321 callback(err);
322 return;
323 }
324
325 async.forEach(results, function(peer, next) {
326 peer.status = 'disconnected';
327 self.peerRegistry.save(peer, next);
328 }, callback);
329 });
330};
331
332Zetta.prototype._initPeers = function(peers, callback) {
333 var self = this;
334 var existingUrls = [];
335 var allPeers = [];
336
337 if (!Array.isArray(peers)) {
338 peers = [peers];
339 }
340
341 this.peerRegistry.find(Query.of('peers'), function(err, results) {
342 if(err) {
343 callback(err);
344 return;
345 }
346
347 results.forEach(function(peer) {
348 peer.status = 'disconnected';
349 if (peer.direction === 'initiator' && peer.url) {
350 allPeers.push(peer);
351 existingUrls.push(peer.url);
352 return;
353 }
354 });
355
356 // peers added through js api to registry peers if they don't already exist
357 allPeers = allPeers.concat(peers.filter(function(peer) {
358 return existingUrls.indexOf(peer) === -1;
359 }));
360
361 allPeers.forEach(function(obj) {
362 var existing = (typeof obj === 'object');
363 if (existing) {
364 if(!obj.fromLink || peers.indexOf(obj.url) > -1) {
365 self.peerRegistry.save(obj, function() {
366 self._runPeer(obj);
367 });
368 } else {
369 //Delete
370 self.peerRegistry.remove(obj, function(err){
371 if(err) {
372 console.error(err);
373 }
374 });
375 }
376 } else {
377 var peerData = {
378 url: obj,
379 direction: 'initiator',
380 fromLink:true
381 };
382 self.peerRegistry.add(peerData, function(err, newPeer) {
383 self._runPeer(newPeer);
384 });
385 }
386
387
388 });
389
390 // end after db read
391 callback();
392 });
393
394 return this;
395};
396
397Zetta.prototype._extendPeerRequest = function(client) {
398 this.runtime.modifyPeerRequest(client.ws);
399};
400
401Zetta.prototype._extendPeerResponse = function(client) {
402 this.runtime.modifyPeerResponse(client.ws);
403};
404
405Zetta.prototype._runPeer = function(peer) {
406 var self = this;
407 var peerClient = new PeerClient(peer.url, self);
408 this._extendPeerRequest(peerClient);
409 this._extendPeerResponse(peerClient);
410
411 self._peerClients.push(peerClient);
412
413 // when websocket is established
414 peerClient.on('connecting', function() {
415 peer.status = 'connecting';
416 self.peerRegistry.save(peer);
417 });
418
419 // when peer handshake is made
420 peerClient.on('connected', function() {
421 peer.status = 'connected';
422 peer.connectionId = peerClient.connectionId;
423 self.peerRegistry.save(peer);
424
425 // peer-event
426 self.pubsub.publish('_peer/connect', { peer: peerClient});
427 });
428
429 peerClient.on('error', function(error) {
430 self.peerRegistry.get(peer.id, function(err, result) {
431 result.status = 'failed';
432 result.error = error;
433 self.peerRegistry.save(result);
434
435 // peer-event
436 self.pubsub.publish('_peer/disconnect', { peer: peerClient });
437 });
438 });
439
440 peerClient.on('closed', function() {
441 self.peerRegistry.get(peer.id, function(err, result) {
442 result.status = 'disconnected';
443
444 // peer-event
445 self.pubsub.publish('_peer/disconnect', { peer: peerClient });
446 self.peerRegistry.save(result, function() { });
447 });
448 });
449
450 peerClient.start();
451}