1 | var os = require('os');
|
2 | var AutoScout = require('zetta-auto-scout');
|
3 | var async = require('async');
|
4 | var HttpScout = require('./lib/http_scout');
|
5 | var HttpServer = require('./lib/http_server');
|
6 | var Logger = require('./lib/logger');
|
7 | var PeerClient = require('./lib/peer_client');
|
8 | var PeerRegistry = require('./lib/peer_registry');
|
9 | var PubSub = require('./lib/pubsub_service');
|
10 | var Runtime = require('./lib/runtime');
|
11 | var scientist = require('zetta-scientist');
|
12 | var Query = require('calypso').Query;
|
13 |
|
14 | var Zetta = module.exports = function(opts) {
|
15 | if (!(this instanceof Zetta)) {
|
16 | return new Zetta(opts);
|
17 | }
|
18 |
|
19 | opts = opts || {};
|
20 |
|
21 | this._name = os.hostname();
|
22 | this.id = this._name;
|
23 | this._properties = {};
|
24 |
|
25 | this._exposeQuery = '';
|
26 | this._scouts = [];
|
27 | this._apps = [];
|
28 | this._peers = [];
|
29 | this._peerClients = [];
|
30 |
|
31 | this.peerRegistry = opts.peerRegistry || new PeerRegistry();
|
32 |
|
33 | this.pubsub = opts.pubsub || new PubSub();
|
34 | this.log = opts.log || new Logger({ pubsub: this.pubsub });
|
35 | this.log.init();
|
36 | this._silent = false;
|
37 |
|
38 | var httpOptions = {};
|
39 | if(typeof opts.useXForwardedHostHeader !== 'undefined') {
|
40 | httpOptions.useXForwardedHostHeader = opts.useXForwardedHostHeader;
|
41 | }
|
42 |
|
43 | if (typeof opts.tls === 'object') {
|
44 | Object.keys(opts.tls).forEach(function(k) {
|
45 | httpOptions[k] = opts.tls[k];
|
46 | });
|
47 | }
|
48 | this.httpServer = new HttpServer(this, httpOptions);
|
49 |
|
50 | var runtimeOptions = {
|
51 | pubsub: this.pubsub,
|
52 | log: this.log,
|
53 | httpServer: this.httpServer
|
54 | };
|
55 |
|
56 | if (opts && opts.registry) {
|
57 | runtimeOptions.registry = opts.registry;
|
58 | }
|
59 | this.runtime = new Runtime(runtimeOptions);
|
60 |
|
61 | var httpScout = scientist.create.apply(null, [HttpScout]);
|
62 | httpScout.server = this.runtime;
|
63 | this.httpScout = httpScout;
|
64 | this._scouts.push(httpScout);
|
65 | };
|
66 |
|
67 | Zetta.prototype.silent = function() {
|
68 | this._silent = true;
|
69 | return this;
|
70 | };
|
71 |
|
72 |
|
73 | Zetta.prototype.logger = function(func) {
|
74 | this._silent = true;
|
75 | func(this.log);
|
76 | return this;
|
77 | };
|
78 |
|
79 | Zetta.prototype.name = function(name) {
|
80 | if (name === '*') {
|
81 | throw new Error('Cannot set name to *');
|
82 | }
|
83 |
|
84 | this._name = name;
|
85 | this.id = this._name;
|
86 | return this;
|
87 | };
|
88 |
|
89 | Zetta.prototype.properties = function(props) {
|
90 | var self = this;
|
91 | if (typeof props === 'object') {
|
92 | delete props.name;
|
93 | this._properties = props;
|
94 | }
|
95 | return this;
|
96 | };
|
97 |
|
98 | Zetta.prototype.getProperties = function() {
|
99 | var self = this;
|
100 | var ret = { name: this._name };
|
101 | Object.keys(this._properties).forEach(function(k) {
|
102 | ret[k] = self._properties[k];
|
103 | });
|
104 | return ret;
|
105 | };
|
106 |
|
107 | Zetta.prototype.use = function() {
|
108 | var args = Array.prototype.slice.call(arguments);
|
109 | var constructor = args[0];
|
110 |
|
111 | var self = this;
|
112 | function addScout(scout) {
|
113 | scout.server = self.runtime;
|
114 | self._scouts.push(scout);
|
115 | }
|
116 |
|
117 | function init() {
|
118 | var machine = Object.create(constructor.prototype);
|
119 | constructor.apply(machine, args.slice(1));
|
120 | machine._pubsub = self.pubsub;
|
121 | machine._log = self.log;
|
122 | machine._registry = self.runtime.registry;
|
123 |
|
124 | var config = scientist.config(machine);
|
125 | return { config: config, instance: machine };
|
126 | }
|
127 |
|
128 | function walk(proto) {
|
129 | if (!proto || !proto.__proto__) {
|
130 | self.load.apply(self, args);
|
131 | } else if (proto.__proto__.constructor.name === 'HttpDevice') {
|
132 | var config = init().config;
|
133 | self.httpScout.driverFunctions[config._type] = constructor;
|
134 | } else if (proto.__proto__.constructor.name === 'Device') {
|
135 | var build = init();
|
136 | args.unshift(build.config._type);
|
137 | var scout = Object.create(AutoScout.prototype);
|
138 | scout._deviceInstance = build;
|
139 | AutoScout.apply(scout, args);
|
140 | addScout(scout);
|
141 | } else if (proto.__proto__.constructor.name === 'Scout') {
|
142 | var scout = scientist.create.apply(null, args);
|
143 | addScout(scout);
|
144 | } else {
|
145 | walk(proto.__proto__);
|
146 | }
|
147 | }
|
148 |
|
149 | walk(constructor.prototype);
|
150 |
|
151 | return this;
|
152 | };
|
153 |
|
154 | Zetta.prototype.expose = function(query) {
|
155 | this._exposeQuery = query;
|
156 | this.runtime.expose(query);
|
157 | return this;
|
158 | };
|
159 |
|
160 | Zetta.prototype.load = function() {
|
161 | var args = Array.prototype.slice.call(arguments);
|
162 | var appArgs = args.slice(1, args.length);
|
163 | var app = {
|
164 | app: args[0],
|
165 | args: appArgs
|
166 | };
|
167 | this._apps.push(app);
|
168 | return this;
|
169 | };
|
170 |
|
171 | Zetta.prototype.link = function(peers) {
|
172 | var self = this;
|
173 | if(!Array.isArray(peers)) {
|
174 | peers = [peers];
|
175 | }
|
176 |
|
177 | peers.forEach(function(peer) {
|
178 |
|
179 | self._peers.push(peer);
|
180 | });
|
181 |
|
182 | return this;
|
183 | };
|
184 |
|
185 |
|
186 | Zetta.prototype.listen = function() {
|
187 | var self = this;
|
188 |
|
189 | var args = Array.prototype.slice.call(arguments);
|
190 |
|
191 | var last = args[args.length - 1];
|
192 |
|
193 | var callback;
|
194 | if (typeof last === 'function') {
|
195 | callback = last;
|
196 | }
|
197 |
|
198 | this._run(function(err){
|
199 | if(err) {
|
200 | if (callback) {
|
201 | return callback(err);
|
202 | } else {
|
203 | throw err;
|
204 | }
|
205 | }
|
206 |
|
207 | var cb = function(err) {
|
208 | if (err) {
|
209 | if (callback) {
|
210 | callback(err);
|
211 | } else {
|
212 | throw err;
|
213 | }
|
214 | }
|
215 |
|
216 | var host;
|
217 | if (typeof args[0] === 'string') {
|
218 | host = args[0];
|
219 | } else if (typeof args[0] === 'number') {
|
220 | if (args.length > 1 && typeof args[1] === 'string') {
|
221 | host = 'http://' + args[1] + ':' + args[0];
|
222 | } else {
|
223 | host = 'http://127.0.0.1:' + args[0];
|
224 | }
|
225 | } else if (typeof args[0] === 'object' && args[0].fd) {
|
226 | host = 'fd: ' + args[0].fd;
|
227 | } else {
|
228 | host = '<unknown>';
|
229 | }
|
230 |
|
231 | self.log.emit('log', 'server', 'Server (' + self._name + ') ' + self.id + ' listening on ' + host);
|
232 |
|
233 | if (callback) {
|
234 | callback(err);
|
235 | }
|
236 | };
|
237 |
|
238 | if (!callback) {
|
239 | args.push(cb);
|
240 | } else {
|
241 | args[args.length - 1] = cb;
|
242 | }
|
243 |
|
244 | self.httpServer.listen.apply(self.httpServer, args);
|
245 | });
|
246 |
|
247 | return this;
|
248 | };
|
249 |
|
250 |
|
251 | Zetta.prototype._run = function(callback) {
|
252 | var self = this;
|
253 |
|
254 | if(!callback) {
|
255 | callback = function(){};
|
256 | }
|
257 |
|
258 | if (!this._silent) {
|
259 | Logger.ConsoleOutput(this.log);
|
260 | }
|
261 |
|
262 | async.series([
|
263 | function(next) {
|
264 | self.runtime.registry._init(next);
|
265 | },
|
266 | function(next) {
|
267 | self.peerRegistry._init(next);
|
268 | },
|
269 | function(next) {
|
270 | self._initScouts(next);
|
271 | },
|
272 | function(next) {
|
273 | self._initApps(next);
|
274 | },
|
275 | function(next) {
|
276 | self._initHttpServer(next);
|
277 | },
|
278 | function(next) {
|
279 | self._cleanupPeers(next);
|
280 | },
|
281 | function(next) {
|
282 | self._initPeers(self._peers, next);
|
283 | self.link = function(peers, cb) {
|
284 | self._initPeers(peers, (cb || function() {}) );
|
285 | };
|
286 | }
|
287 | ], function(err){
|
288 | setImmediate(function() {
|
289 | callback(err);
|
290 | });
|
291 | });
|
292 |
|
293 | return this;
|
294 | };
|
295 |
|
296 | Zetta.prototype._initScouts = function(callback) {
|
297 | async.each(this._scouts, function(scout, next) {
|
298 | scout.init(next);
|
299 | }, function(err) {
|
300 | callback(err);
|
301 | });
|
302 |
|
303 | return this;
|
304 | };
|
305 |
|
306 | Zetta.prototype._initApps = function(callback) {
|
307 | var self = this;
|
308 | this._apps.forEach(function(app) {
|
309 | var args = app.args;
|
310 | args.unshift(self.runtime);
|
311 | app.app.apply(null, args);
|
312 | });
|
313 | callback();
|
314 |
|
315 | return this;
|
316 | };
|
317 |
|
318 | Zetta.prototype._initHttpServer = function(callback) {
|
319 | this.httpServer.init();
|
320 | callback();
|
321 |
|
322 | return this;
|
323 | };
|
324 |
|
325 |
|
326 |
|
327 | Zetta.prototype._cleanupPeers = function(callback) {
|
328 | var self = this;
|
329 | this.peerRegistry.find(Query.of('peers'), function(err, results) {
|
330 | if(err) {
|
331 | callback(err);
|
332 | return;
|
333 | }
|
334 |
|
335 | async.forEach(results, function(peer, next) {
|
336 | peer.status = 'disconnected';
|
337 | self.peerRegistry.save(peer, next);
|
338 | }, callback);
|
339 | });
|
340 | };
|
341 |
|
342 | Zetta.prototype._initPeers = function(peers, callback) {
|
343 | var self = this;
|
344 | var existingUrls = [];
|
345 | var allPeers = [];
|
346 |
|
347 | if (!Array.isArray(peers)) {
|
348 | peers = [peers];
|
349 | }
|
350 |
|
351 | this.peerRegistry.find(Query.of('peers'), function(err, results) {
|
352 | if(err) {
|
353 | callback(err);
|
354 | return;
|
355 | }
|
356 |
|
357 | results.forEach(function(peer) {
|
358 | peer.status = 'disconnected';
|
359 | if (peer.direction === 'initiator' && peer.url) {
|
360 | allPeers.push(peer);
|
361 | existingUrls.push(peer.url);
|
362 | return;
|
363 | }
|
364 | });
|
365 |
|
366 |
|
367 | allPeers = allPeers.concat(peers.filter(function(peer) {
|
368 | return existingUrls.indexOf(peer) === -1;
|
369 | }));
|
370 |
|
371 | allPeers.forEach(function(obj) {
|
372 | var existing = (typeof obj === 'object');
|
373 | if (existing) {
|
374 | if(!obj.fromLink || peers.indexOf(obj.url) > -1) {
|
375 | self.peerRegistry.save(obj, function() {
|
376 | self._runPeer(obj);
|
377 | });
|
378 | } else {
|
379 |
|
380 | self.peerRegistry.remove(obj, function(err){
|
381 | if(err) {
|
382 | console.error(err);
|
383 | }
|
384 | });
|
385 | }
|
386 | } else {
|
387 | var peerData = {
|
388 | url: obj,
|
389 | direction: 'initiator',
|
390 | fromLink:true
|
391 | };
|
392 | self.peerRegistry.add(peerData, function(err, newPeer) {
|
393 | self._runPeer(newPeer);
|
394 | });
|
395 | }
|
396 |
|
397 |
|
398 | });
|
399 |
|
400 |
|
401 | callback();
|
402 | });
|
403 |
|
404 | return this;
|
405 | };
|
406 |
|
407 | Zetta.prototype._extendPeerRequest = function(client) {
|
408 | this.runtime.modifyPeerRequest(client.ws);
|
409 | };
|
410 |
|
411 | Zetta.prototype._extendPeerResponse = function(client) {
|
412 | this.runtime.modifyPeerResponse(client.ws);
|
413 | };
|
414 |
|
415 | Zetta.prototype._runPeer = function(peer) {
|
416 | var self = this;
|
417 | var peerClient = new PeerClient(peer.url, self);
|
418 | this._extendPeerRequest(peerClient);
|
419 | this._extendPeerResponse(peerClient);
|
420 |
|
421 | self._peerClients.push(peerClient);
|
422 |
|
423 |
|
424 | peerClient.on('connecting', function() {
|
425 | self.peerRegistry.get(peer.id, function(err, result) {
|
426 | result.status = 'connecting';
|
427 | result.connectionId = peerClient.connectionId;
|
428 | self.peerRegistry.save(result);
|
429 | });
|
430 | });
|
431 |
|
432 |
|
433 | peerClient.on('connected', function() {
|
434 | self.peerRegistry.get(peer.id, function(err, result) {
|
435 | result.status = 'connected';
|
436 | result.connectionId = peerClient.connectionId;
|
437 | self.peerRegistry.save(result);
|
438 |
|
439 |
|
440 | self.pubsub.publish('_peer/connect', { peer: peerClient});
|
441 | });
|
442 | });
|
443 |
|
444 | peerClient.on('error', function(error) {
|
445 |
|
446 | self.peerRegistry.get(peer.id, function(err, result) {
|
447 | result.status = 'failed';
|
448 | result.error = error;
|
449 | result.connectionId = peerClient.connectionId;
|
450 | self.peerRegistry.save(result);
|
451 |
|
452 |
|
453 | self.pubsub.publish('_peer/disconnect', { peer: peerClient, error: error });
|
454 | });
|
455 | });
|
456 |
|
457 | peerClient.on('closed', function() {
|
458 | self.peerRegistry.get(peer.id, function(err, result) {
|
459 | result.status = 'disconnected';
|
460 | result.connectionId = peerClient.connectionId;
|
461 |
|
462 |
|
463 | self.pubsub.publish('_peer/disconnect', { peer: peerClient });
|
464 | self.peerRegistry.save(result);
|
465 | });
|
466 | });
|
467 |
|
468 | peerClient.start();
|
469 |
|
470 |
|
471 | peer.connectionId = peerClient.connectionId;
|
472 | self.peerRegistry.save(peer);
|
473 | }
|