1 | var os = require('os');
|
2 | var AutoScout = require('zetta-auto-scout');
|
3 | var async = require('async');
|
4 | var HttpScout = require('./lib/http_scout');
|
5 | var HttpServer = require('./lib/http_server');
|
6 | var Logger = require('./lib/logger');
|
7 | var PeerClient = require('./lib/peer_client');
|
8 | var PeerRegistry = require('./lib/peer_registry');
|
9 | var PubSub = require('./lib/pubsub_service');
|
10 | var Runtime = require('./lib/runtime');
|
11 | var scientist = require('zetta-scientist');
|
12 | var Query = require('calypso').Query;
|
13 |
|
14 | var Zetta = module.exports = function(opts) {
|
15 | if (!(this instanceof Zetta)) {
|
16 | return new Zetta(opts);
|
17 | }
|
18 |
|
19 | opts = opts || {};
|
20 |
|
21 | this._name = os.hostname();
|
22 | this.id = this._name;
|
23 |
|
24 | this._exposeQuery = '';
|
25 | this._scouts = [];
|
26 | this._apps = [];
|
27 | this._peers = [];
|
28 | this._peerClients = [];
|
29 |
|
30 | this.peerRegistry = opts.peerRegistry || new PeerRegistry();
|
31 |
|
32 | this.pubsub = opts.pubsub || new PubSub();
|
33 | this.log = opts.log || new Logger({ pubsub: this.pubsub });
|
34 | this.log.init();
|
35 | this._silent = false;
|
36 |
|
37 | this.httpServer = new HttpServer(this);
|
38 |
|
39 | var runtimeOptions = {
|
40 | pubsub: this.pubsub,
|
41 | log: this.log,
|
42 | httpServer: this.httpServer
|
43 | };
|
44 |
|
45 | if (opts && opts.registry) {
|
46 | runtimeOptions.registry = opts.registry;
|
47 | }
|
48 | this.runtime = new Runtime(runtimeOptions);
|
49 |
|
50 | var httpScout = scientist.create.apply(null, [HttpScout]);
|
51 | httpScout.server = this.runtime;
|
52 | this.httpScout = httpScout;
|
53 | this._scouts.push(httpScout);
|
54 | };
|
55 |
|
56 | Zetta.prototype.silent = function() {
|
57 | this._silent = true;
|
58 | return this;
|
59 | };
|
60 |
|
61 |
|
62 | Zetta.prototype.logger = function(func) {
|
63 | this._silent = true;
|
64 | func(this.log);
|
65 | return this;
|
66 | };
|
67 |
|
68 | Zetta.prototype.name = function(name) {
|
69 | this._name = name;
|
70 | this.id = this._name;
|
71 | return this;
|
72 | };
|
73 |
|
74 | Zetta.prototype.use = function() {
|
75 | var args = Array.prototype.slice.call(arguments);
|
76 | var constructor = args[0];
|
77 |
|
78 | var self = this;
|
79 | function addScout(scout) {
|
80 | scout.server = self.runtime;
|
81 | self._scouts.push(scout);
|
82 | }
|
83 |
|
84 | function init() {
|
85 | var machine = Object.create(constructor.prototype);
|
86 | constructor.apply(machine, args.slice(1));
|
87 | machine._pubsub = self.pubsub;
|
88 | machine._log = self.log;
|
89 | machine._registry = self.runtime.registry;
|
90 |
|
91 | var config = scientist.config(machine);
|
92 | return { config: config, instance: machine };
|
93 | }
|
94 |
|
95 | function walk(proto) {
|
96 | if (!proto || !proto.__proto__) {
|
97 | self.load.apply(self, args);
|
98 | } else if (proto.__proto__.constructor.name === 'HttpDevice') {
|
99 | var config = init().config;
|
100 | self.httpScout.driverFunctions[config._type] = constructor;
|
101 | } else if (proto.__proto__.constructor.name === 'Device') {
|
102 | var build = init();
|
103 | args.unshift(build.config._type);
|
104 | var scout = Object.create(AutoScout.prototype);
|
105 | scout._deviceInstance = build;
|
106 | AutoScout.apply(scout, args);
|
107 | addScout(scout);
|
108 | } else if (proto.__proto__.constructor.name === 'Scout') {
|
109 | var scout = scientist.create.apply(null, args);
|
110 | addScout(scout);
|
111 | } else {
|
112 | walk(proto.__proto__);
|
113 | }
|
114 | }
|
115 |
|
116 | walk(constructor.prototype);
|
117 |
|
118 | return this;
|
119 | };
|
120 |
|
121 | Zetta.prototype.expose = function(query) {
|
122 | this._exposeQuery = query;
|
123 | this.runtime.expose(query);
|
124 | return this;
|
125 | };
|
126 |
|
127 | Zetta.prototype.load = function() {
|
128 | var args = Array.prototype.slice.call(arguments);
|
129 | var appArgs = args.slice(1, args.length);
|
130 | var app = {
|
131 | app: args[0],
|
132 | args: appArgs
|
133 | };
|
134 | this._apps.push(app);
|
135 | return this;
|
136 | };
|
137 |
|
138 | Zetta.prototype.link = function(peers) {
|
139 | var self = this;
|
140 | if(!Array.isArray(peers)) {
|
141 | peers = [peers];
|
142 | }
|
143 |
|
144 | peers.forEach(function(peer) {
|
145 |
|
146 | self._peers.push(peer);
|
147 | });
|
148 |
|
149 | return this;
|
150 | };
|
151 |
|
152 |
|
153 | Zetta.prototype.listen = function() {
|
154 | var self = this;
|
155 |
|
156 | var args = Array.prototype.slice.call(arguments);
|
157 |
|
158 | var last = args[args.length - 1];
|
159 |
|
160 | var callback;
|
161 | if (typeof last === 'function') {
|
162 | callback = last;
|
163 | }
|
164 |
|
165 | this._run(function(err){
|
166 | if(err) {
|
167 | if (callback) {
|
168 | return callback(err);
|
169 | } else {
|
170 | throw err;
|
171 | }
|
172 | }
|
173 |
|
174 | var cb = function(err) {
|
175 | if (err) {
|
176 | if (callback) {
|
177 | callback(err);
|
178 | } else {
|
179 | throw err;
|
180 | }
|
181 | }
|
182 |
|
183 | var host;
|
184 | if (typeof args[0] === 'string') {
|
185 | host = args[0];
|
186 | } else if (typeof args[0] === 'number') {
|
187 | if (args.length > 1 && typeof args[1] === 'string') {
|
188 | host = 'http://' + args[1] + ':' + args[0];
|
189 | } else {
|
190 | host = 'http://127.0.0.1:' + args[0];
|
191 | }
|
192 | } else if (typeof args[0] === 'object' && args[0].fd) {
|
193 | host = 'fd: ' + args[0].fd;
|
194 | } else {
|
195 | host = '<unknown>';
|
196 | }
|
197 |
|
198 | self.log.emit('log', 'server', 'Server (' + self._name + ') ' + self.id + ' listening on ' + host);
|
199 |
|
200 | if (callback) {
|
201 | callback(err);
|
202 | }
|
203 | };
|
204 |
|
205 | if (!callback) {
|
206 | args.push(cb);
|
207 | } else {
|
208 | args[args.length - 1] = cb;
|
209 | }
|
210 |
|
211 | self.httpServer.listen.apply(self.httpServer, args);
|
212 | });
|
213 |
|
214 | return this;
|
215 | };
|
216 |
|
217 |
|
218 | Zetta.prototype._run = function(callback) {
|
219 | var self = this;
|
220 |
|
221 | if(!callback) {
|
222 | callback = function(){};
|
223 | }
|
224 |
|
225 | if (!this._silent) {
|
226 | Logger.ConsoleOutput(this.log);
|
227 | }
|
228 |
|
229 | async.series([
|
230 | function(next) {
|
231 | self.runtime.registry._init(next);
|
232 | },
|
233 | function(next) {
|
234 | self.peerRegistry._init(next);
|
235 | },
|
236 | function(next) {
|
237 | self._initScouts(next);
|
238 | },
|
239 | function(next) {
|
240 | self._initApps(next);
|
241 | },
|
242 | function(next) {
|
243 | self._initHttpServer(next);
|
244 | },
|
245 | function(next) {
|
246 | self._cleanupPeers(next);
|
247 | },
|
248 | function(next) {
|
249 | self._initPeers(self._peers, next);
|
250 | self.link = function(peers, cb) {
|
251 | self._initPeers(peers, (cb || function() {}) );
|
252 | };
|
253 | }
|
254 | ], function(err){
|
255 | setImmediate(function() {
|
256 | callback(err);
|
257 | });
|
258 | });
|
259 |
|
260 | return this;
|
261 | };
|
262 |
|
263 | Zetta.prototype._initScouts = function(callback) {
|
264 | async.each(this._scouts, function(scout, next) {
|
265 | scout.init(next);
|
266 | }, function(err) {
|
267 | callback(err);
|
268 | });
|
269 |
|
270 | return this;
|
271 | };
|
272 |
|
273 | Zetta.prototype._initApps = function(callback) {
|
274 | var self = this;
|
275 | this._apps.forEach(function(app) {
|
276 | var args = app.args;
|
277 | args.unshift(self.runtime);
|
278 | app.app.apply(null, args);
|
279 | });
|
280 | callback();
|
281 |
|
282 | return this;
|
283 | };
|
284 |
|
285 | Zetta.prototype._initHttpServer = function(callback) {
|
286 | this.httpServer.init();
|
287 | callback();
|
288 |
|
289 | return this;
|
290 | };
|
291 |
|
292 |
|
293 |
|
294 | Zetta.prototype._cleanupPeers = function(callback) {
|
295 | var self = this;
|
296 | this.peerRegistry.find(Query.of('peers'), function(err, results) {
|
297 | if(err) {
|
298 | callback(err);
|
299 | return;
|
300 | }
|
301 |
|
302 | async.forEach(results, function(peer, next) {
|
303 | peer.status = 'disconnected';
|
304 | self.peerRegistry.save(peer, next);
|
305 | }, callback);
|
306 | });
|
307 | };
|
308 |
|
309 | Zetta.prototype._initPeers = function(peers, callback) {
|
310 | var self = this;
|
311 | var existingUrls = [];
|
312 | var allPeers = [];
|
313 |
|
314 | if (!Array.isArray(peers)) {
|
315 | peers = [peers];
|
316 | }
|
317 |
|
318 | this.peerRegistry.find(Query.of('peers'), function(err, results) {
|
319 | if(err) {
|
320 | callback(err);
|
321 | return;
|
322 | }
|
323 |
|
324 | results.forEach(function(peer) {
|
325 | peer.status = 'disconnected';
|
326 | if (peer.direction === 'initiator' && peer.url) {
|
327 | allPeers.push(peer);
|
328 | existingUrls.push(peer.url);
|
329 | return;
|
330 | }
|
331 | });
|
332 |
|
333 |
|
334 | allPeers = allPeers.concat(peers.filter(function(peer) {
|
335 | return existingUrls.indexOf(peer) === -1;
|
336 | }));
|
337 |
|
338 | allPeers.forEach(function(obj) {
|
339 | var existing = (typeof obj === 'object');
|
340 | if (existing) {
|
341 | if(!obj.fromLink || peers.indexOf(obj.url) > -1) {
|
342 | self.peerRegistry.save(obj, function() {
|
343 | self._runPeer(obj);
|
344 | });
|
345 | } else {
|
346 |
|
347 | self.peerRegistry.remove(obj, function(err){
|
348 | if(err) {
|
349 | console.error(err);
|
350 | }
|
351 | });
|
352 | }
|
353 | } else {
|
354 | var peerData = {
|
355 | url: obj,
|
356 | direction: 'initiator',
|
357 | fromLink:true
|
358 | };
|
359 | self.peerRegistry.add(peerData, function(err, newPeer) {
|
360 | self._runPeer(newPeer);
|
361 | });
|
362 | }
|
363 |
|
364 |
|
365 | });
|
366 |
|
367 |
|
368 | callback();
|
369 | });
|
370 |
|
371 | return this;
|
372 | };
|
373 |
|
374 | Zetta.prototype._extendPeerRequest = function(client) {
|
375 | this.runtime.modifyPeerRequest(client.ws);
|
376 | };
|
377 |
|
378 | Zetta.prototype._extendPeerResponse = function(client) {
|
379 | this.runtime.modifyPeerResponse(client.ws);
|
380 | };
|
381 |
|
382 | Zetta.prototype._runPeer = function(peer) {
|
383 | var self = this;
|
384 | var peerClient = new PeerClient(peer.url, self);
|
385 | this._extendPeerRequest(peerClient);
|
386 | this._extendPeerResponse(peerClient);
|
387 |
|
388 | self._peerClients.push(peerClient);
|
389 |
|
390 |
|
391 | peerClient.on('connecting', function() {
|
392 | peer.status = 'connecting';
|
393 | self.peerRegistry.save(peer);
|
394 | });
|
395 |
|
396 |
|
397 | peerClient.on('connected', function() {
|
398 | peer.status = 'connected';
|
399 | peer.connectionId = peerClient.connectionId;
|
400 | self.peerRegistry.save(peer);
|
401 |
|
402 |
|
403 | self.pubsub.publish('_peer/connect', { peer: peerClient});
|
404 | });
|
405 |
|
406 | peerClient.on('error', function(error) {
|
407 | self.peerRegistry.get(peer.id, function(err, result) {
|
408 | result.status = 'failed';
|
409 | result.error = error;
|
410 | self.peerRegistry.save(result);
|
411 |
|
412 |
|
413 | self.pubsub.publish('_peer/disconnect', { peer: peerClient });
|
414 | });
|
415 | });
|
416 |
|
417 | peerClient.on('closed', function() {
|
418 | self.peerRegistry.get(peer.id, function(err, result) {
|
419 | result.status = 'disconnected';
|
420 |
|
421 |
|
422 | self.pubsub.publish('_peer/disconnect', { peer: peerClient });
|
423 | self.peerRegistry.save(result, function() { });
|
424 | });
|
425 | });
|
426 |
|
427 | peerClient.start();
|
428 | }
|