1 | var os = require('os');
|
2 | var AutoScout = require('zetta-auto-scout');
|
3 | var async = require('async');
|
4 | var HttpScout = require('./lib/http_scout');
|
5 | var HttpServer = require('./lib/http_server');
|
6 | var Logger = require('./lib/logger');
|
7 | var PeerClient = require('./lib/peer_client');
|
8 | var PeerRegistry = require('./lib/peer_registry');
|
9 | var PubSub = require('./lib/pubsub_service');
|
10 | var Runtime = require('./lib/runtime');
|
11 | var scientist = require('zetta-scientist');
|
12 | var Query = require('calypso').Query;
|
13 |
|
14 | var Zetta = module.exports = function(opts) {
|
15 | if (!(this instanceof Zetta)) {
|
16 | return new Zetta(opts);
|
17 | }
|
18 |
|
19 | opts = opts || {};
|
20 |
|
21 | this._name = os.hostname();
|
22 | this.id = this._name;
|
23 | this._properties = {};
|
24 |
|
25 | this._exposeQuery = '';
|
26 | this._scouts = [];
|
27 | this._apps = [];
|
28 | this._peers = [];
|
29 | this._peerClients = [];
|
30 |
|
31 | this.peerRegistry = opts.peerRegistry || new PeerRegistry();
|
32 |
|
33 | this.pubsub = opts.pubsub || new PubSub();
|
34 | this.log = opts.log || new Logger({ pubsub: this.pubsub });
|
35 | this.log.init();
|
36 | this._silent = false;
|
37 |
|
38 | var httpOptions = {};
|
39 | if(typeof opts.useXForwardedHostHeader !== 'undefined') {
|
40 | httpOptions.useXForwardedHostHeader = opts.useXForwardedHostHeader;
|
41 | }
|
42 | if(typeof opts.useXForwardedPathHeader !== 'undefined') {
|
43 | httpOptions.useXForwardedPathHeader = opts.useXForwardedPathHeader;
|
44 | }
|
45 |
|
46 | if (typeof opts.tls === 'object') {
|
47 | Object.keys(opts.tls).forEach(function(k) {
|
48 | httpOptions[k] = opts.tls[k];
|
49 | });
|
50 | }
|
51 | this.httpServer = new HttpServer(this, httpOptions);
|
52 |
|
53 | var runtimeOptions = {
|
54 | pubsub: this.pubsub,
|
55 | log: this.log,
|
56 | httpServer: this.httpServer
|
57 | };
|
58 |
|
59 | if (opts && opts.registry) {
|
60 | runtimeOptions.registry = opts.registry;
|
61 | }
|
62 | this.runtime = new Runtime(runtimeOptions);
|
63 |
|
64 | var httpScout = scientist.create.apply(null, [HttpScout]);
|
65 | httpScout.server = this.runtime;
|
66 | this.httpScout = httpScout;
|
67 | this._scouts.push(httpScout);
|
68 | };
|
69 |
|
70 | Zetta.prototype.silent = function() {
|
71 | this._silent = true;
|
72 | return this;
|
73 | };
|
74 |
|
75 |
|
76 | Zetta.prototype.logger = function(func) {
|
77 | this._silent = true;
|
78 | func(this.log);
|
79 | return this;
|
80 | };
|
81 |
|
82 | Zetta.prototype.name = function(name) {
|
83 | if (name === '*') {
|
84 | throw new Error('Cannot set name to *');
|
85 | }
|
86 |
|
87 | this._name = name;
|
88 | this.id = this._name;
|
89 | return this;
|
90 | };
|
91 |
|
92 | Zetta.prototype.properties = function(props) {
|
93 | var self = this;
|
94 | if (typeof props === 'object') {
|
95 | delete props.name;
|
96 | this._properties = props;
|
97 | }
|
98 | return this;
|
99 | };
|
100 |
|
101 | Zetta.prototype.getProperties = function() {
|
102 | var self = this;
|
103 | var ret = { name: this._name };
|
104 | Object.keys(this._properties).forEach(function(k) {
|
105 | ret[k] = self._properties[k];
|
106 | });
|
107 | return ret;
|
108 | };
|
109 |
|
110 | Zetta.prototype.use = function() {
|
111 | var args = Array.prototype.slice.call(arguments);
|
112 | var constructor = args[0];
|
113 |
|
114 | var self = this;
|
115 | function addScout(scout) {
|
116 | scout.server = self.runtime;
|
117 | self._scouts.push(scout);
|
118 | }
|
119 |
|
120 | function init() {
|
121 | var machine = Object.create(constructor.prototype);
|
122 | constructor.apply(machine, args.slice(1));
|
123 | machine._pubsub = self.pubsub;
|
124 | machine._log = self.log;
|
125 | machine._registry = self.runtime.registry;
|
126 |
|
127 | var config = scientist.config(machine);
|
128 | return { config: config, instance: machine };
|
129 | }
|
130 |
|
131 | function walk(proto) {
|
132 | if (!proto || !proto.__proto__) {
|
133 | self.load.apply(self, args);
|
134 | } else if (proto.__proto__.constructor.name === 'HttpDevice') {
|
135 | var config = init().config;
|
136 | self.httpScout.driverFunctions[config._type] = constructor;
|
137 | } else if (proto.__proto__.constructor.name === 'Device') {
|
138 | var build = init();
|
139 | args.unshift(build.config._type);
|
140 | var scout = Object.create(AutoScout.prototype);
|
141 | scout._deviceInstance = build;
|
142 | AutoScout.apply(scout, args);
|
143 | addScout(scout);
|
144 | } else if (proto.__proto__.constructor.name === 'Scout') {
|
145 | var scout = scientist.create.apply(null, args);
|
146 | addScout(scout);
|
147 | } else {
|
148 | walk(proto.__proto__);
|
149 | }
|
150 | }
|
151 |
|
152 | walk(constructor.prototype);
|
153 |
|
154 | return this;
|
155 | };
|
156 |
|
157 | Zetta.prototype.expose = function(query) {
|
158 | this._exposeQuery = query;
|
159 | this.runtime.expose(query);
|
160 | return this;
|
161 | };
|
162 |
|
163 | Zetta.prototype.load = function() {
|
164 | var args = Array.prototype.slice.call(arguments);
|
165 | var appArgs = args.slice(1, args.length);
|
166 | var app = {
|
167 | app: args[0],
|
168 | args: appArgs
|
169 | };
|
170 | this._apps.push(app);
|
171 | return this;
|
172 | };
|
173 |
|
174 | Zetta.prototype.link = function(peers) {
|
175 | var self = this;
|
176 | if(!Array.isArray(peers)) {
|
177 | peers = [peers];
|
178 | }
|
179 |
|
180 | peers.forEach(function(peer) {
|
181 |
|
182 | self._peers.push(peer);
|
183 | });
|
184 |
|
185 | return this;
|
186 | };
|
187 |
|
188 |
|
189 | Zetta.prototype.listen = function() {
|
190 | var self = this;
|
191 |
|
192 | var args = Array.prototype.slice.call(arguments);
|
193 |
|
194 | var last = args[args.length - 1];
|
195 |
|
196 | var callback;
|
197 | if (typeof last === 'function') {
|
198 | callback = last;
|
199 | }
|
200 |
|
201 | this._run(function(err){
|
202 | if(err) {
|
203 | if (callback) {
|
204 | return callback(err);
|
205 | } else {
|
206 | throw err;
|
207 | }
|
208 | }
|
209 |
|
210 | var cb = function(err) {
|
211 | if (err) {
|
212 | if (callback) {
|
213 | callback(err);
|
214 | } else {
|
215 | throw err;
|
216 | }
|
217 | }
|
218 |
|
219 | var host;
|
220 | if (typeof args[0] === 'string') {
|
221 | host = args[0];
|
222 | } else if (typeof args[0] === 'number') {
|
223 | if (args.length > 1 && typeof args[1] === 'string') {
|
224 | host = 'http://' + args[1] + ':' + args[0];
|
225 | } else {
|
226 | host = 'http://127.0.0.1:' + args[0];
|
227 | }
|
228 | } else if (typeof args[0] === 'object' && args[0].fd) {
|
229 | host = 'fd: ' + args[0].fd;
|
230 | } else {
|
231 | host = '<unknown>';
|
232 | }
|
233 |
|
234 | self.log.emit('log', 'server', 'Server (' + self._name + ') ' + self.id + ' listening on ' + host);
|
235 |
|
236 | if (callback) {
|
237 | callback(err);
|
238 | }
|
239 | };
|
240 |
|
241 | if (!callback) {
|
242 | args.push(cb);
|
243 | } else {
|
244 | args[args.length - 1] = cb;
|
245 | }
|
246 |
|
247 | self.httpServer.listen.apply(self.httpServer, args);
|
248 | });
|
249 |
|
250 | return this;
|
251 | };
|
252 |
|
253 |
|
254 | Zetta.prototype._run = function(callback) {
|
255 | var self = this;
|
256 |
|
257 | if(!callback) {
|
258 | callback = function(){};
|
259 | }
|
260 |
|
261 | if (!this._silent) {
|
262 | Logger.ConsoleOutput(this.log);
|
263 | }
|
264 |
|
265 | async.series([
|
266 | function(next) {
|
267 | self.runtime.registry._init(next);
|
268 | },
|
269 | function(next) {
|
270 | self.peerRegistry._init(next);
|
271 | },
|
272 | function(next) {
|
273 | self._initScouts(next);
|
274 | },
|
275 | function(next) {
|
276 | self._initApps(next);
|
277 | },
|
278 | function(next) {
|
279 | self._initHttpServer(next);
|
280 | },
|
281 | function(next) {
|
282 | self._cleanupPeers(next);
|
283 | },
|
284 | function(next) {
|
285 | self._initPeers(self._peers, next);
|
286 | self.link = function(peers, cb) {
|
287 | self._initPeers(peers, (cb || function() {}) );
|
288 | };
|
289 | }
|
290 | ], function(err){
|
291 | setImmediate(function() {
|
292 | callback(err);
|
293 | });
|
294 | });
|
295 |
|
296 | return this;
|
297 | };
|
298 |
|
299 | Zetta.prototype._initScouts = function(callback) {
|
300 | async.each(this._scouts, function(scout, next) {
|
301 | scout.init(next);
|
302 | }, function(err) {
|
303 | callback(err);
|
304 | });
|
305 |
|
306 | return this;
|
307 | };
|
308 |
|
309 | Zetta.prototype._initApps = function(callback) {
|
310 | var self = this;
|
311 | this._apps.forEach(function(app) {
|
312 | var args = app.args;
|
313 | args.unshift(self.runtime);
|
314 | app.app.apply(null, args);
|
315 | });
|
316 | callback();
|
317 |
|
318 | return this;
|
319 | };
|
320 |
|
321 | Zetta.prototype._initHttpServer = function(callback) {
|
322 | this.httpServer.init();
|
323 | callback();
|
324 |
|
325 | return this;
|
326 | };
|
327 |
|
328 |
|
329 |
|
330 | Zetta.prototype._cleanupPeers = function(callback) {
|
331 | var self = this;
|
332 | this.peerRegistry.find(Query.of('peers'), function(err, results) {
|
333 | if(err) {
|
334 | callback(err);
|
335 | return;
|
336 | }
|
337 |
|
338 | async.forEach(results, function(peer, next) {
|
339 | peer.status = 'disconnected';
|
340 | self.peerRegistry.save(peer, next);
|
341 | }, callback);
|
342 | });
|
343 | };
|
344 |
|
345 | Zetta.prototype._initPeers = function(peers, callback) {
|
346 | var self = this;
|
347 | var existingUrls = [];
|
348 | var allPeers = [];
|
349 |
|
350 | if (!Array.isArray(peers)) {
|
351 | peers = [peers];
|
352 | }
|
353 |
|
354 | this.peerRegistry.find(Query.of('peers'), function(err, results) {
|
355 | if(err) {
|
356 | callback(err);
|
357 | return;
|
358 | }
|
359 |
|
360 | results.forEach(function(peer) {
|
361 | peer.status = 'disconnected';
|
362 | if (peer.direction === 'initiator' && peer.url) {
|
363 | allPeers.push(peer);
|
364 | existingUrls.push(peer.url);
|
365 | return;
|
366 | }
|
367 | });
|
368 |
|
369 |
|
370 | allPeers = allPeers.concat(peers.filter(function(peer) {
|
371 | return existingUrls.indexOf(peer) === -1;
|
372 | }));
|
373 |
|
374 | allPeers.forEach(function(obj) {
|
375 | var existing = (typeof obj === 'object');
|
376 | if (existing) {
|
377 | if(!obj.fromLink || peers.indexOf(obj.url) > -1) {
|
378 | self.peerRegistry.save(obj, function() {
|
379 | self._runPeer(obj);
|
380 | });
|
381 | } else {
|
382 |
|
383 | self.peerRegistry.remove(obj, function(err){
|
384 | if(err) {
|
385 | console.error(err);
|
386 | }
|
387 | });
|
388 | }
|
389 | } else {
|
390 | var peerData = {
|
391 | url: obj,
|
392 | direction: 'initiator',
|
393 | fromLink:true
|
394 | };
|
395 | self.peerRegistry.add(peerData, function(err, newPeer) {
|
396 | self._runPeer(newPeer);
|
397 | });
|
398 | }
|
399 |
|
400 |
|
401 | });
|
402 |
|
403 |
|
404 | callback();
|
405 | });
|
406 |
|
407 | return this;
|
408 | };
|
409 |
|
410 | Zetta.prototype._extendPeerRequest = function(client) {
|
411 | this.runtime.modifyPeerRequest(client.ws);
|
412 | };
|
413 |
|
414 | Zetta.prototype._extendPeerResponse = function(client) {
|
415 | this.runtime.modifyPeerResponse(client.ws);
|
416 | };
|
417 |
|
418 | Zetta.prototype._runPeer = function(peer) {
|
419 | var self = this;
|
420 | var peerClient = new PeerClient(peer.url, self);
|
421 | this._extendPeerRequest(peerClient);
|
422 | this._extendPeerResponse(peerClient);
|
423 |
|
424 | self._peerClients.push(peerClient);
|
425 |
|
426 |
|
427 | peerClient.on('connecting', function() {
|
428 | self.peerRegistry.get(peer.id, function(err, result) {
|
429 | result.status = 'connecting';
|
430 | result.connectionId = peerClient.connectionId;
|
431 | self.peerRegistry.save(result);
|
432 | });
|
433 | });
|
434 |
|
435 |
|
436 | peerClient.on('connected', function() {
|
437 | self.peerRegistry.get(peer.id, function(err, result) {
|
438 | result.status = 'connected';
|
439 | result.connectionId = peerClient.connectionId;
|
440 | self.peerRegistry.save(result);
|
441 |
|
442 |
|
443 | self.pubsub.publish('_peer/connect', { peer: peerClient});
|
444 | });
|
445 | });
|
446 |
|
447 | peerClient.on('error', function(error) {
|
448 |
|
449 | self.peerRegistry.get(peer.id, function(err, result) {
|
450 | result.status = 'failed';
|
451 | result.error = error;
|
452 | result.connectionId = peerClient.connectionId;
|
453 | self.peerRegistry.save(result);
|
454 |
|
455 |
|
456 | self.pubsub.publish('_peer/disconnect', { peer: peerClient, error: error });
|
457 | });
|
458 | });
|
459 |
|
460 | peerClient.on('closed', function() {
|
461 | self.peerRegistry.get(peer.id, function(err, result) {
|
462 | result.status = 'disconnected';
|
463 | result.connectionId = peerClient.connectionId;
|
464 |
|
465 |
|
466 | self.pubsub.publish('_peer/disconnect', { peer: peerClient });
|
467 | self.peerRegistry.save(result);
|
468 | });
|
469 | });
|
470 |
|
471 | peerClient.start();
|
472 |
|
473 |
|
474 | peer.connectionId = peerClient.connectionId;
|
475 | self.peerRegistry.save(peer);
|
476 | }
|