UNPKG

11.3 kBJavaScriptView Raw
1var os = require('os');
2var AutoScout = require('zetta-auto-scout');
3var async = require('async');
4var HttpScout = require('./lib/http_scout');
5var HttpServer = require('./lib/http_server');
6var Logger = require('./lib/logger');
7var PeerClient = require('./lib/peer_client');
8var PeerRegistry = require('./lib/peer_registry');
9var PubSub = require('./lib/pubsub_service');
10var Runtime = require('./lib/runtime');
11var scientist = require('zetta-scientist');
12var Query = require('calypso').Query;
13
14var Zetta = module.exports = function(opts) {
15 if (!(this instanceof Zetta)) {
16 return new Zetta(opts);
17 }
18
19 opts = opts || {};
20
21 this._name = os.hostname(); // optional name, defaults to OS hostname
22 this.id = this._name;
23 this._properties = {}; // custom properties
24
25 this._exposeQuery = '';
26 this._scouts = [];
27 this._apps = [];
28 this._peers = [];
29 this._peerClients = [];
30
31 this.peerRegistry = opts.peerRegistry || new PeerRegistry();
32
33 this.pubsub = opts.pubsub || new PubSub();
34 this.log = opts.log || new Logger({ pubsub: this.pubsub });
35 this.log.init();
36 this._silent = false;
37
38 var httpOptions = {};
39 if(typeof opts.useXForwardedHostHeader !== 'undefined') {
40 httpOptions.useXForwardedHostHeader = opts.useXForwardedHostHeader;
41 }
42
43 if (typeof opts.tls === 'object') {
44 Object.keys(opts.tls).forEach(function(k) {
45 httpOptions[k] = opts.tls[k];
46 });
47 }
48 this.httpServer = new HttpServer(this, httpOptions);
49
50 var runtimeOptions = {
51 pubsub: this.pubsub,
52 log: this.log,
53 httpServer: this.httpServer
54 };
55
56 if (opts && opts.registry) {
57 runtimeOptions.registry = opts.registry;
58 }
59 this.runtime = new Runtime(runtimeOptions);
60
61 var httpScout = scientist.create.apply(null, [HttpScout]);
62 httpScout.server = this.runtime;
63 this.httpScout = httpScout;
64 this._scouts.push(httpScout);
65};
66
67Zetta.prototype.silent = function() {
68 this._silent = true;
69 return this;
70};
71
72// pass in a custom logging
73Zetta.prototype.logger = function(func) {
74 this._silent = true;
75 func(this.log);
76 return this;
77};
78
79Zetta.prototype.name = function(name) {
80 if (name === '*') {
81 throw new Error('Cannot set name to *');
82 }
83
84 this._name = name;
85 this.id = this._name;
86 return this;
87};
88
89Zetta.prototype.properties = function(props) {
90 var self = this;
91 if (typeof props === 'object') {
92 delete props.name; // cannot overide name
93 this._properties = props;
94 }
95 return this;
96};
97
98Zetta.prototype.getProperties = function() {
99 var self = this;
100 var ret = { name: this._name };
101 Object.keys(this._properties).forEach(function(k) {
102 ret[k] = self._properties[k];
103 });
104 return ret;
105};
106
107Zetta.prototype.use = function() {
108 var args = Array.prototype.slice.call(arguments);
109 var constructor = args[0];
110
111 var self = this;
112 function addScout(scout) {
113 scout.server = self.runtime;
114 self._scouts.push(scout);
115 }
116
117 function init() {
118 var machine = Object.create(constructor.prototype);
119 constructor.apply(machine, args.slice(1));
120 machine._pubsub = self.pubsub;
121 machine._log = self.log;
122 machine._registry = self.runtime.registry;
123
124 var config = scientist.config(machine);
125 return { config: config, instance: machine };
126 }
127
128 function walk(proto) {
129 if (!proto || !proto.__proto__) {
130 self.load.apply(self, args);
131 } else if (proto.__proto__.constructor.name === 'HttpDevice') {
132 var config = init().config;
133 self.httpScout.driverFunctions[config._type] = constructor;
134 } else if (proto.__proto__.constructor.name === 'Device') {
135 var build = init();
136 args.unshift(build.config._type);
137 var scout = Object.create(AutoScout.prototype);
138 scout._deviceInstance = build; // pass both machine and config to autoscout need to _generate device
139 AutoScout.apply(scout, args);
140 addScout(scout);
141 } else if (proto.__proto__.constructor.name === 'Scout') {
142 var scout = scientist.create.apply(null, args);
143 addScout(scout);
144 } else {
145 walk(proto.__proto__);
146 }
147 }
148
149 walk(constructor.prototype);
150
151 return this;
152};
153
154Zetta.prototype.expose = function(query) {
155 this._exposeQuery = query;
156 this.runtime.expose(query);
157 return this;
158};
159
160Zetta.prototype.load = function() {
161 var args = Array.prototype.slice.call(arguments);
162 var appArgs = args.slice(1, args.length);
163 var app = {
164 app: args[0],
165 args: appArgs
166 };
167 this._apps.push(app);
168 return this;
169};
170
171Zetta.prototype.link = function(peers) {
172 var self = this;
173 if(!Array.isArray(peers)) {
174 peers = [peers];
175 }
176
177 peers.forEach(function(peer) {
178 //self._peers.push(new PeerClient(peer, self));
179 self._peers.push(peer);
180 });
181
182 return this;
183};
184
185
186Zetta.prototype.listen = function() {
187 var self = this;
188
189 var args = Array.prototype.slice.call(arguments);
190
191 var last = args[args.length - 1];
192
193 var callback;
194 if (typeof last === 'function') {
195 callback = last;
196 }
197
198 this._run(function(err){
199 if(err) {
200 if (callback) {
201 return callback(err);
202 } else {
203 throw err;
204 }
205 }
206
207 var cb = function(err) {
208 if (err) {
209 if (callback) {
210 callback(err);
211 } else {
212 throw err;
213 }
214 }
215
216 var host;
217 if (typeof args[0] === 'string') {
218 host = args[0]; // UNIX socket
219 } else if (typeof args[0] === 'number') {
220 if (args.length > 1 && typeof args[1] === 'string') {
221 host = 'http://' + args[1] + ':' + args[0]; // host + port
222 } else {
223 host = 'http://127.0.0.1:' + args[0]; // just port
224 }
225 } else if (typeof args[0] === 'object' && args[0].fd) {
226 host = 'fd: ' + args[0].fd; // handle
227 } else {
228 host = '<unknown>';
229 }
230
231 self.log.emit('log', 'server', 'Server (' + self._name + ') ' + self.id + ' listening on ' + host);
232
233 if (callback) {
234 callback(err);
235 }
236 };
237
238 if (!callback) {
239 args.push(cb);
240 } else {
241 args[args.length - 1] = cb;
242 }
243
244 self.httpServer.listen.apply(self.httpServer, args);
245 });
246
247 return this;
248};
249
250// run scouts/apps init server but do not listening on http port
251Zetta.prototype._run = function(callback) {
252 var self = this;
253
254 if(!callback) {
255 callback = function(){};
256 }
257
258 if (!this._silent) {
259 Logger.ConsoleOutput(this.log);
260 }
261
262 async.series([
263 function(next) {
264 self.runtime.registry._init(next);
265 },
266 function(next) {
267 self.peerRegistry._init(next);
268 },
269 function(next) {
270 self._initScouts(next);
271 },
272 function(next) {
273 self._initApps(next);
274 },
275 function(next) {
276 self._initHttpServer(next);
277 },
278 function(next) {
279 self._cleanupPeers(next);
280 },
281 function(next) {
282 self._initPeers(self._peers, next);
283 self.link = function(peers, cb) {
284 self._initPeers(peers, (cb || function() {}) );
285 };
286 }
287 ], function(err){
288 setImmediate(function() {
289 callback(err);
290 });
291 });
292
293 return this;
294};
295
296Zetta.prototype._initScouts = function(callback) {
297 async.each(this._scouts, function(scout, next) {
298 scout.init(next);
299 }, function(err) {
300 callback(err);
301 });
302
303 return this;
304};
305
306Zetta.prototype._initApps = function(callback) {
307 var self = this;
308 this._apps.forEach(function(app) {
309 var args = app.args;
310 args.unshift(self.runtime);
311 app.app.apply(null, args);
312 });
313 callback();
314
315 return this;
316};
317
318Zetta.prototype._initHttpServer = function(callback) {
319 this.httpServer.init();
320 callback();
321
322 return this;
323};
324
325
326// set all peers to disconnected
327Zetta.prototype._cleanupPeers = function(callback) {
328 var self = this;
329 this.peerRegistry.find(Query.of('peers'), function(err, results) {
330 if(err) {
331 callback(err);
332 return;
333 }
334
335 async.forEach(results, function(peer, next) {
336 peer.status = 'disconnected';
337 self.peerRegistry.save(peer, next);
338 }, callback);
339 });
340};
341
342Zetta.prototype._initPeers = function(peers, callback) {
343 var self = this;
344 var existingUrls = [];
345 var allPeers = [];
346
347 if (!Array.isArray(peers)) {
348 peers = [peers];
349 }
350
351 this.peerRegistry.find(Query.of('peers'), function(err, results) {
352 if(err) {
353 callback(err);
354 return;
355 }
356
357 results.forEach(function(peer) {
358 peer.status = 'disconnected';
359 if (peer.direction === 'initiator' && peer.url) {
360 allPeers.push(peer);
361 existingUrls.push(peer.url);
362 return;
363 }
364 });
365
366 // peers added through js api to registry peers if they don't already exist
367 allPeers = allPeers.concat(peers.filter(function(peer) {
368 return existingUrls.indexOf(peer) === -1;
369 }));
370
371 allPeers.forEach(function(obj) {
372 var existing = (typeof obj === 'object');
373 if (existing) {
374 if(!obj.fromLink || peers.indexOf(obj.url) > -1) {
375 self.peerRegistry.save(obj, function() {
376 self._runPeer(obj);
377 });
378 } else {
379 //Delete
380 self.peerRegistry.remove(obj, function(err){
381 if(err) {
382 console.error(err);
383 }
384 });
385 }
386 } else {
387 var peerData = {
388 url: obj,
389 direction: 'initiator',
390 fromLink:true
391 };
392 self.peerRegistry.add(peerData, function(err, newPeer) {
393 self._runPeer(newPeer);
394 });
395 }
396
397
398 });
399
400 // end after db read
401 callback();
402 });
403
404 return this;
405};
406
407Zetta.prototype._extendPeerRequest = function(client) {
408 this.runtime.modifyPeerRequest(client.ws);
409};
410
411Zetta.prototype._extendPeerResponse = function(client) {
412 this.runtime.modifyPeerResponse(client.ws);
413};
414
415Zetta.prototype._runPeer = function(peer) {
416 var self = this;
417 var peerClient = new PeerClient(peer.url, self);
418 this._extendPeerRequest(peerClient);
419 this._extendPeerResponse(peerClient);
420
421 self._peerClients.push(peerClient);
422
423 // when websocket is established
424 peerClient.on('connecting', function() {
425 self.peerRegistry.get(peer.id, function(err, result) {
426 result.status = 'connecting';
427 result.connectionId = peerClient.connectionId;
428 self.peerRegistry.save(result);
429 });
430 });
431
432 // when peer handshake is made
433 peerClient.on('connected', function() {
434 self.peerRegistry.get(peer.id, function(err, result) {
435 result.status = 'connected';
436 result.connectionId = peerClient.connectionId;
437 self.peerRegistry.save(result);
438
439 // peer-event
440 self.pubsub.publish('_peer/connect', { peer: peerClient});
441 });
442 });
443
444 peerClient.on('error', function(error) {
445
446 self.peerRegistry.get(peer.id, function(err, result) {
447 result.status = 'failed';
448 result.error = error;
449 result.connectionId = peerClient.connectionId;
450 self.peerRegistry.save(result);
451
452 // peer-event
453 self.pubsub.publish('_peer/disconnect', { peer: peerClient, error: error });
454 });
455 });
456
457 peerClient.on('closed', function() {
458 self.peerRegistry.get(peer.id, function(err, result) {
459 result.status = 'disconnected';
460 result.connectionId = peerClient.connectionId;
461
462 // peer-event
463 self.pubsub.publish('_peer/disconnect', { peer: peerClient });
464 self.peerRegistry.save(result);
465 });
466 });
467
468 peerClient.start();
469
470 // update initial connectionId in db
471 peer.connectionId = peerClient.connectionId;
472 self.peerRegistry.save(peer);
473}