UNPKG

10.1 kBJavaScriptView Raw
1var os = require('os');
2var AutoScout = require('zetta-auto-scout');
3var async = require('async');
4var HttpScout = require('./lib/http_scout');
5var HttpServer = require('./lib/http_server');
6var Logger = require('./lib/logger');
7var PeerClient = require('./lib/peer_client');
8var PeerRegistry = require('./lib/peer_registry');
9var PubSub = require('./lib/pubsub_service');
10var Runtime = require('./lib/runtime');
11var scientist = require('zetta-scientist');
12var Query = require('calypso').Query;
13
14var Zetta = module.exports = function(opts) {
15 if (!(this instanceof Zetta)) {
16 return new Zetta(opts);
17 }
18
19 opts = opts || {};
20
21 this._name = os.hostname(); // optional name, defaults to OS hostname
22 this.id = this._name;
23
24 this._exposeQuery = '';
25 this._scouts = [];
26 this._apps = [];
27 this._peers = [];
28 this._peerClients = [];
29
30 this.peerRegistry = opts.peerRegistry || new PeerRegistry();
31
32 this.pubsub = opts.pubsub || new PubSub();
33 this.log = opts.log || new Logger({ pubsub: this.pubsub });
34 this.log.init();
35 this._silent = false;
36
37 this.httpServer = new HttpServer(this);
38
39 var runtimeOptions = {
40 pubsub: this.pubsub,
41 log: this.log,
42 httpServer: this.httpServer
43 };
44
45 if (opts && opts.registry) {
46 runtimeOptions.registry = opts.registry;
47 }
48 this.runtime = new Runtime(runtimeOptions);
49
50 var httpScout = scientist.create.apply(null, [HttpScout]);
51 httpScout.server = this.runtime;
52 this.httpScout = httpScout;
53 this._scouts.push(httpScout);
54};
55
56Zetta.prototype.silent = function() {
57 this._silent = true;
58 return this;
59};
60
61// pass in a custom logging
62Zetta.prototype.logger = function(func) {
63 this._silent = true;
64 func(this.log);
65 return this;
66};
67
68Zetta.prototype.name = function(name) {
69 this._name = name;
70 this.id = this._name;
71 return this;
72};
73
74Zetta.prototype.use = function() {
75 var args = Array.prototype.slice.call(arguments);
76 var constructor = args[0];
77
78 var self = this;
79 function addScout(scout) {
80 scout.server = self.runtime;
81 self._scouts.push(scout);
82 }
83
84 function init() {
85 var machine = Object.create(constructor.prototype);
86 constructor.apply(machine, args.slice(1));
87 machine._pubsub = self.pubsub;
88 machine._log = self.log;
89 machine._registry = self.runtime.registry;
90
91 var config = scientist.config(machine);
92 return { config: config, instance: machine };
93 }
94
95 function walk(proto) {
96 if (!proto || !proto.__proto__) {
97 self.load.apply(self, args);
98 } else if (proto.__proto__.constructor.name === 'HttpDevice') {
99 var config = init().config;
100 self.httpScout.driverFunctions[config._type] = constructor;
101 } else if (proto.__proto__.constructor.name === 'Device') {
102 var build = init();
103 args.unshift(build.config._type);
104 var scout = Object.create(AutoScout.prototype);
105 scout._deviceInstance = build; // pass both machine and config to autoscout need to _generate device
106 AutoScout.apply(scout, args);
107 addScout(scout);
108 } else if (proto.__proto__.constructor.name === 'Scout') {
109 var scout = scientist.create.apply(null, args);
110 addScout(scout);
111 } else {
112 walk(proto.__proto__);
113 }
114 }
115
116 walk(constructor.prototype);
117
118 return this;
119};
120
121Zetta.prototype.expose = function(query) {
122 this._exposeQuery = query;
123 this.runtime.expose(query);
124 return this;
125};
126
127Zetta.prototype.load = function() {
128 var args = Array.prototype.slice.call(arguments);
129 var appArgs = args.slice(1, args.length);
130 var app = {
131 app: args[0],
132 args: appArgs
133 };
134 this._apps.push(app);
135 return this;
136};
137
138Zetta.prototype.link = function(peers) {
139 var self = this;
140 if(!Array.isArray(peers)) {
141 peers = [peers];
142 }
143
144 peers.forEach(function(peer) {
145 //self._peers.push(new PeerClient(peer, self));
146 self._peers.push(peer);
147 });
148
149 return this;
150};
151
152
153Zetta.prototype.listen = function() {
154 var self = this;
155
156 var args = Array.prototype.slice.call(arguments);
157
158 var last = args[args.length - 1];
159
160 var callback;
161 if (typeof last === 'function') {
162 callback = last;
163 }
164
165 this._run(function(err){
166 if(err) {
167 if (callback) {
168 return callback(err);
169 } else {
170 throw err;
171 }
172 }
173
174 var cb = function(err) {
175 if (err) {
176 if (callback) {
177 callback(err);
178 } else {
179 throw err;
180 }
181 }
182
183 var host;
184 if (typeof args[0] === 'string') {
185 host = args[0]; // UNIX socket
186 } else if (typeof args[0] === 'number') {
187 if (args.length > 1 && typeof args[1] === 'string') {
188 host = 'http://' + args[1] + ':' + args[0]; // host + port
189 } else {
190 host = 'http://127.0.0.1:' + args[0]; // just port
191 }
192 } else if (typeof args[0] === 'object' && args[0].fd) {
193 host = 'fd: ' + args[0].fd; // handle
194 } else {
195 host = '<unknown>';
196 }
197
198 self.log.emit('log', 'server', 'Server (' + self._name + ') ' + self.id + ' listening on ' + host);
199
200 if (callback) {
201 callback(err);
202 }
203 };
204
205 if (!callback) {
206 args.push(cb);
207 } else {
208 args[args.length - 1] = cb;
209 }
210
211 self.httpServer.listen.apply(self.httpServer, args);
212 });
213
214 return this;
215};
216
217// run scouts/apps init server but do not listening on http port
218Zetta.prototype._run = function(callback) {
219 var self = this;
220
221 if(!callback) {
222 callback = function(){};
223 }
224
225 if (!this._silent) {
226 Logger.ConsoleOutput(this.log);
227 }
228
229 async.series([
230 function(next) {
231 self.runtime.registry._init(next);
232 },
233 function(next) {
234 self.peerRegistry._init(next);
235 },
236 function(next) {
237 self._initScouts(next);
238 },
239 function(next) {
240 self._initApps(next);
241 },
242 function(next) {
243 self._initHttpServer(next);
244 },
245 function(next) {
246 self._cleanupPeers(next);
247 },
248 function(next) {
249 self._initPeers(self._peers, next);
250 self.link = function(peers, cb) {
251 self._initPeers(peers, (cb || function() {}) );
252 };
253 }
254 ], function(err){
255 setImmediate(function() {
256 callback(err);
257 });
258 });
259
260 return this;
261};
262
263Zetta.prototype._initScouts = function(callback) {
264 async.each(this._scouts, function(scout, next) {
265 scout.init(next);
266 }, function(err) {
267 callback(err);
268 });
269
270 return this;
271};
272
273Zetta.prototype._initApps = function(callback) {
274 var self = this;
275 this._apps.forEach(function(app) {
276 var args = app.args;
277 args.unshift(self.runtime);
278 app.app.apply(null, args);
279 });
280 callback();
281
282 return this;
283};
284
285Zetta.prototype._initHttpServer = function(callback) {
286 this.httpServer.init();
287 callback();
288
289 return this;
290};
291
292
293// set all peers to disconnected
294Zetta.prototype._cleanupPeers = function(callback) {
295 var self = this;
296 this.peerRegistry.find(Query.of('peers'), function(err, results) {
297 if(err) {
298 callback(err);
299 return;
300 }
301
302 async.forEach(results, function(peer, next) {
303 peer.status = 'disconnected';
304 self.peerRegistry.save(peer, next);
305 }, callback);
306 });
307};
308
309Zetta.prototype._initPeers = function(peers, callback) {
310 var self = this;
311 var existingUrls = [];
312 var allPeers = [];
313
314 if (!Array.isArray(peers)) {
315 peers = [peers];
316 }
317
318 this.peerRegistry.find(Query.of('peers'), function(err, results) {
319 if(err) {
320 callback(err);
321 return;
322 }
323
324 results.forEach(function(peer) {
325 peer.status = 'disconnected';
326 if (peer.direction === 'initiator' && peer.url) {
327 allPeers.push(peer);
328 existingUrls.push(peer.url);
329 return;
330 }
331 });
332
333 // peers added through js api to registry peers if they don't already exist
334 allPeers = allPeers.concat(peers.filter(function(peer) {
335 return existingUrls.indexOf(peer) === -1;
336 }));
337
338 allPeers.forEach(function(obj) {
339 var existing = (typeof obj === 'object');
340 if (existing) {
341 if(!obj.fromLink || peers.indexOf(obj.url) > -1) {
342 self.peerRegistry.save(obj, function() {
343 self._runPeer(obj);
344 });
345 } else {
346 //Delete
347 self.peerRegistry.remove(obj, function(err){
348 if(err) {
349 console.error(err);
350 }
351 });
352 }
353 } else {
354 var peerData = {
355 url: obj,
356 direction: 'initiator',
357 fromLink:true
358 };
359 self.peerRegistry.add(peerData, function(err, newPeer) {
360 self._runPeer(newPeer);
361 });
362 }
363
364
365 });
366
367 // end after db read
368 callback();
369 });
370
371 return this;
372};
373
374Zetta.prototype._extendPeerRequest = function(client) {
375 this.runtime.modifyPeerRequest(client.ws);
376};
377
378Zetta.prototype._extendPeerResponse = function(client) {
379 this.runtime.modifyPeerResponse(client.ws);
380};
381
382Zetta.prototype._runPeer = function(peer) {
383 var self = this;
384 var peerClient = new PeerClient(peer.url, self);
385 this._extendPeerRequest(peerClient);
386 this._extendPeerResponse(peerClient);
387
388 self._peerClients.push(peerClient);
389
390 // when websocket is established
391 peerClient.on('connecting', function() {
392 peer.status = 'connecting';
393 self.peerRegistry.save(peer);
394 });
395
396 // when peer handshake is made
397 peerClient.on('connected', function() {
398 peer.status = 'connected';
399 peer.connectionId = peerClient.connectionId;
400 self.peerRegistry.save(peer);
401
402 // peer-event
403 self.pubsub.publish('_peer/connect', { peer: peerClient});
404 });
405
406 peerClient.on('error', function(error) {
407 self.peerRegistry.get(peer.id, function(err, result) {
408 result.status = 'failed';
409 result.error = error;
410 self.peerRegistry.save(result);
411
412 // peer-event
413 self.pubsub.publish('_peer/disconnect', { peer: peerClient });
414 });
415 });
416
417 peerClient.on('closed', function() {
418 self.peerRegistry.get(peer.id, function(err, result) {
419 result.status = 'disconnected';
420
421 // peer-event
422 self.pubsub.publish('_peer/disconnect', { peer: peerClient });
423 self.peerRegistry.save(result, function() { });
424 });
425 });
426
427 peerClient.start();
428}