UNPKG

11.5 kBJavaScriptView Raw
1var os = require('os');
2var AutoScout = require('zetta-auto-scout');
3var async = require('async');
4var HttpScout = require('./lib/http_scout');
5var HttpServer = require('./lib/http_server');
6var Logger = require('./lib/logger');
7var PeerClient = require('./lib/peer_client');
8var PeerRegistry = require('./lib/peer_registry');
9var PubSub = require('./lib/pubsub_service');
10var Runtime = require('./lib/runtime');
11var scientist = require('zetta-scientist');
12var Query = require('calypso').Query;
13
14var Zetta = module.exports = function(opts) {
15 if (!(this instanceof Zetta)) {
16 return new Zetta(opts);
17 }
18
19 opts = opts || {};
20
21 this._name = os.hostname(); // optional name, defaults to OS hostname
22 this.id = this._name;
23 this._properties = {}; // custom properties
24
25 this._exposeQuery = '';
26 this._scouts = [];
27 this._apps = [];
28 this._peers = [];
29 this._peerClients = [];
30
31 this.peerRegistry = opts.peerRegistry || new PeerRegistry();
32
33 this.pubsub = opts.pubsub || new PubSub();
34 this.log = opts.log || new Logger({ pubsub: this.pubsub });
35 this.log.init();
36 this._silent = false;
37
38 var httpOptions = {};
39 if(typeof opts.useXForwardedHostHeader !== 'undefined') {
40 httpOptions.useXForwardedHostHeader = opts.useXForwardedHostHeader;
41 }
42 if(typeof opts.useXForwardedPathHeader !== 'undefined') {
43 httpOptions.useXForwardedPathHeader = opts.useXForwardedPathHeader;
44 }
45
46 if (typeof opts.tls === 'object') {
47 Object.keys(opts.tls).forEach(function(k) {
48 httpOptions[k] = opts.tls[k];
49 });
50 }
51 this.httpServer = new HttpServer(this, httpOptions);
52
53 var runtimeOptions = {
54 pubsub: this.pubsub,
55 log: this.log,
56 httpServer: this.httpServer
57 };
58
59 if (opts && opts.registry) {
60 runtimeOptions.registry = opts.registry;
61 }
62 this.runtime = new Runtime(runtimeOptions);
63
64 var httpScout = scientist.create.apply(null, [HttpScout]);
65 httpScout.server = this.runtime;
66 this.httpScout = httpScout;
67 this._scouts.push(httpScout);
68};
69
70Zetta.prototype.silent = function() {
71 this._silent = true;
72 return this;
73};
74
75// pass in a custom logging
76Zetta.prototype.logger = function(func) {
77 this._silent = true;
78 func(this.log);
79 return this;
80};
81
82Zetta.prototype.name = function(name) {
83 if (name === '*') {
84 throw new Error('Cannot set name to *');
85 }
86
87 this._name = name;
88 this.id = this._name;
89 return this;
90};
91
92Zetta.prototype.properties = function(props) {
93 var self = this;
94 if (typeof props === 'object') {
95 delete props.name; // cannot overide name
96 this._properties = props;
97 }
98 return this;
99};
100
101Zetta.prototype.getProperties = function() {
102 var self = this;
103 var ret = { name: this._name };
104 Object.keys(this._properties).forEach(function(k) {
105 ret[k] = self._properties[k];
106 });
107 return ret;
108};
109
110Zetta.prototype.use = function() {
111 var args = Array.prototype.slice.call(arguments);
112 var constructor = args[0];
113
114 var self = this;
115 function addScout(scout) {
116 scout.server = self.runtime;
117 self._scouts.push(scout);
118 }
119
120 function init() {
121 var machine = Object.create(constructor.prototype);
122 constructor.apply(machine, args.slice(1));
123 machine._pubsub = self.pubsub;
124 machine._log = self.log;
125 machine._registry = self.runtime.registry;
126
127 var config = scientist.config(machine);
128 return { config: config, instance: machine };
129 }
130
131 function walk(proto) {
132 if (!proto || !proto.__proto__) {
133 self.load.apply(self, args);
134 } else if (proto.__proto__.constructor.name === 'HttpDevice') {
135 var config = init().config;
136 self.httpScout.driverFunctions[config._type] = constructor;
137 } else if (proto.__proto__.constructor.name === 'Device') {
138 var build = init();
139 args.unshift(build.config._type);
140 var scout = Object.create(AutoScout.prototype);
141 scout._deviceInstance = build; // pass both machine and config to autoscout need to _generate device
142 AutoScout.apply(scout, args);
143 addScout(scout);
144 } else if (proto.__proto__.constructor.name === 'Scout') {
145 var scout = scientist.create.apply(null, args);
146 addScout(scout);
147 } else {
148 walk(proto.__proto__);
149 }
150 }
151
152 walk(constructor.prototype);
153
154 return this;
155};
156
157Zetta.prototype.expose = function(query) {
158 this._exposeQuery = query;
159 this.runtime.expose(query);
160 return this;
161};
162
163Zetta.prototype.load = function() {
164 var args = Array.prototype.slice.call(arguments);
165 var appArgs = args.slice(1, args.length);
166 var app = {
167 app: args[0],
168 args: appArgs
169 };
170 this._apps.push(app);
171 return this;
172};
173
174Zetta.prototype.link = function(peers) {
175 var self = this;
176 if(!Array.isArray(peers)) {
177 peers = [peers];
178 }
179
180 peers.forEach(function(peer) {
181 //self._peers.push(new PeerClient(peer, self));
182 self._peers.push(peer);
183 });
184
185 return this;
186};
187
188
189Zetta.prototype.listen = function() {
190 var self = this;
191
192 var args = Array.prototype.slice.call(arguments);
193
194 var last = args[args.length - 1];
195
196 var callback;
197 if (typeof last === 'function') {
198 callback = last;
199 }
200
201 this._run(function(err){
202 if(err) {
203 if (callback) {
204 return callback(err);
205 } else {
206 throw err;
207 }
208 }
209
210 var cb = function(err) {
211 if (err) {
212 if (callback) {
213 callback(err);
214 } else {
215 throw err;
216 }
217 }
218
219 var host;
220 if (typeof args[0] === 'string') {
221 host = args[0]; // UNIX socket
222 } else if (typeof args[0] === 'number') {
223 if (args.length > 1 && typeof args[1] === 'string') {
224 host = 'http://' + args[1] + ':' + args[0]; // host + port
225 } else {
226 host = 'http://127.0.0.1:' + args[0]; // just port
227 }
228 } else if (typeof args[0] === 'object' && args[0].fd) {
229 host = 'fd: ' + args[0].fd; // handle
230 } else {
231 host = '<unknown>';
232 }
233
234 self.log.emit('log', 'server', 'Server (' + self._name + ') ' + self.id + ' listening on ' + host);
235
236 if (callback) {
237 callback(err);
238 }
239 };
240
241 if (!callback) {
242 args.push(cb);
243 } else {
244 args[args.length - 1] = cb;
245 }
246
247 self.httpServer.listen.apply(self.httpServer, args);
248 });
249
250 return this;
251};
252
253// run scouts/apps init server but do not listening on http port
254Zetta.prototype._run = function(callback) {
255 var self = this;
256
257 if(!callback) {
258 callback = function(){};
259 }
260
261 if (!this._silent) {
262 Logger.ConsoleOutput(this.log);
263 }
264
265 async.series([
266 function(next) {
267 self.runtime.registry._init(next);
268 },
269 function(next) {
270 self.peerRegistry._init(next);
271 },
272 function(next) {
273 self._initScouts(next);
274 },
275 function(next) {
276 self._initApps(next);
277 },
278 function(next) {
279 self._initHttpServer(next);
280 },
281 function(next) {
282 self._cleanupPeers(next);
283 },
284 function(next) {
285 self._initPeers(self._peers, next);
286 self.link = function(peers, cb) {
287 self._initPeers(peers, (cb || function() {}) );
288 };
289 }
290 ], function(err){
291 setImmediate(function() {
292 callback(err);
293 });
294 });
295
296 return this;
297};
298
299Zetta.prototype._initScouts = function(callback) {
300 async.each(this._scouts, function(scout, next) {
301 scout.init(next);
302 }, function(err) {
303 callback(err);
304 });
305
306 return this;
307};
308
309Zetta.prototype._initApps = function(callback) {
310 var self = this;
311 this._apps.forEach(function(app) {
312 var args = app.args;
313 args.unshift(self.runtime);
314 app.app.apply(null, args);
315 });
316 callback();
317
318 return this;
319};
320
321Zetta.prototype._initHttpServer = function(callback) {
322 this.httpServer.init();
323 callback();
324
325 return this;
326};
327
328
329// set all peers to disconnected
330Zetta.prototype._cleanupPeers = function(callback) {
331 var self = this;
332 this.peerRegistry.find(Query.of('peers'), function(err, results) {
333 if(err) {
334 callback(err);
335 return;
336 }
337
338 async.forEach(results, function(peer, next) {
339 peer.status = 'disconnected';
340 self.peerRegistry.save(peer, next);
341 }, callback);
342 });
343};
344
345Zetta.prototype._initPeers = function(peers, callback) {
346 var self = this;
347 var existingUrls = [];
348 var allPeers = [];
349
350 if (!Array.isArray(peers)) {
351 peers = [peers];
352 }
353
354 this.peerRegistry.find(Query.of('peers'), function(err, results) {
355 if(err) {
356 callback(err);
357 return;
358 }
359
360 results.forEach(function(peer) {
361 peer.status = 'disconnected';
362 if (peer.direction === 'initiator' && peer.url) {
363 allPeers.push(peer);
364 existingUrls.push(peer.url);
365 return;
366 }
367 });
368
369 // peers added through js api to registry peers if they don't already exist
370 allPeers = allPeers.concat(peers.filter(function(peer) {
371 return existingUrls.indexOf(peer) === -1;
372 }));
373
374 allPeers.forEach(function(obj) {
375 var existing = (typeof obj === 'object');
376 if (existing) {
377 if(!obj.fromLink || peers.indexOf(obj.url) > -1) {
378 self.peerRegistry.save(obj, function() {
379 self._runPeer(obj);
380 });
381 } else {
382 //Delete
383 self.peerRegistry.remove(obj, function(err){
384 if(err) {
385 console.error(err);
386 }
387 });
388 }
389 } else {
390 var peerData = {
391 url: obj,
392 direction: 'initiator',
393 fromLink:true
394 };
395 self.peerRegistry.add(peerData, function(err, newPeer) {
396 self._runPeer(newPeer);
397 });
398 }
399
400
401 });
402
403 // end after db read
404 callback();
405 });
406
407 return this;
408};
409
410Zetta.prototype._extendPeerRequest = function(client) {
411 this.runtime.modifyPeerRequest(client.ws);
412};
413
414Zetta.prototype._extendPeerResponse = function(client) {
415 this.runtime.modifyPeerResponse(client.ws);
416};
417
418Zetta.prototype._runPeer = function(peer) {
419 var self = this;
420 var peerClient = new PeerClient(peer.url, self);
421 this._extendPeerRequest(peerClient);
422 this._extendPeerResponse(peerClient);
423
424 self._peerClients.push(peerClient);
425
426 // when websocket is established
427 peerClient.on('connecting', function() {
428 self.peerRegistry.get(peer.id, function(err, result) {
429 result.status = 'connecting';
430 result.connectionId = peerClient.connectionId;
431 self.peerRegistry.save(result);
432 });
433 });
434
435 // when peer handshake is made
436 peerClient.on('connected', function() {
437 self.peerRegistry.get(peer.id, function(err, result) {
438 result.status = 'connected';
439 result.connectionId = peerClient.connectionId;
440 self.peerRegistry.save(result);
441
442 // peer-event
443 self.pubsub.publish('_peer/connect', { peer: peerClient});
444 });
445 });
446
447 peerClient.on('error', function(error) {
448
449 self.peerRegistry.get(peer.id, function(err, result) {
450 result.status = 'failed';
451 result.error = error;
452 result.connectionId = peerClient.connectionId;
453 self.peerRegistry.save(result);
454
455 // peer-event
456 self.pubsub.publish('_peer/disconnect', { peer: peerClient, error: error });
457 });
458 });
459
460 peerClient.on('closed', function() {
461 self.peerRegistry.get(peer.id, function(err, result) {
462 result.status = 'disconnected';
463 result.connectionId = peerClient.connectionId;
464
465 // peer-event
466 self.pubsub.publish('_peer/disconnect', { peer: peerClient });
467 self.peerRegistry.save(result);
468 });
469 });
470
471 peerClient.start();
472
473 // update initial connectionId in db
474 peer.connectionId = peerClient.connectionId;
475 self.peerRegistry.save(peer);
476}