UNPKG

16.1 kBJavaScriptView Raw
1var http = require('http');
2var path = require('path');
3var url = require('url');
4var querystring = require('querystring');
5var async = require('async');
6var spdy = require('spdy');
7var argo = require('argo');
8var titan = require('titan');
9var WebSocketServer = require('ws').Server;
10var SpdyAgent = require('./spdy_agent');
11var EventBroker = require('./event_broker');
12var PeerSocket = require('./peer_socket');
13var EventSocket = require('./event_socket');
14var Siren = require('argo-formatter-siren');
15var DevicesResource = require('./api_resources/devices');
16var PeersManagementResource = require('./api_resources/peer_management');
17var RootResource = require('./api_resources/root');
18var ServersResource = require('./api_resources/servers');
19var deviceFormatter = require('./api_formats/siren/device.siren');
20var rels = require('zetta-rels');
21
22var querytopic = require('./query_topic');
23
24var ZettaHttpServer = module.exports = function(zettaInstance, options) {
25 var self = this;
26 options = (typeof options === 'object') ? options : {};
27 if(typeof options.useXForwardedHostHeader !== 'undefined') {
28 this.useXForwardedHostHeader = options.useXForwardedHostHeader ? true : false;
29 } else {
30 this.useXForwardedHostHeader = true;
31 }
32 this.idCounter = 0;
33 this.zetta = zettaInstance;
34 this.peerRegistry = zettaInstance.peerRegistry;
35 this.eventBroker = new EventBroker(zettaInstance);
36 this.clients = {};
37 this.peers = {}; // connected peers
38
39 this._deviceQueries = [];
40
41 this._collectors = {};
42
43 // WS hooks to be called before finishing upgrade
44 this._wsHooks = {
45 peerConnect: [],
46 websocketConnect: []
47 };
48
49 // external http(s) server
50 var httpOptions = {
51 windowSize: 1024 * 1024
52 };
53 var tlsCheckOptions = ['cert', 'key', 'pfx', 'ca'];
54 var usingSSL = false;
55 Object.keys(options).forEach(function(k) {
56 httpOptions[k] = options[k];
57 if (tlsCheckOptions.indexOf(k) > -1) {
58 usingSSL = true;
59 }
60 });
61
62 // If any tls options were specified, use ssl and not plain
63 httpOptions.plain = (usingSSL) ? false : true;
64 httpOptions.ssl = (usingSSL) ? true : false;
65 this.server = spdy.createServer(httpOptions);
66
67 // internal server for z2z, allways ssl: false, plain: true
68 this.spdyServer = spdy.createServer({
69 windowSize: 1024 * 1024,
70 plain: true,
71 ssl: false
72 });
73
74 var ValidWSUrls = [
75 /^\/events$/, // /events
76 /^\/events\?.+$/, // /events?topic=query:where type="led"
77 /^\/servers\/(.+)\/events/, // /servers/BC2832FD-9437-4473-A4A8-AC1D56B12C61/events
78 /^\/peers\/(.+)$/, // /peers/123123...
79 /^\/peer-management$/, // /peer-management
80 ];
81
82 function match(request) {
83 return ValidWSUrls.some(function(re) {
84 return re.test(request.url);
85 });
86 }
87
88 this.wss = new WebSocketServer({ noServer: true });
89 this.server.on('upgrade', function(request, socket, headers) {
90
91 var sendError = function(code) {
92 // Check any custom websocket paths from extentions
93 var finish = function() {
94 var responseLine = 'HTTP/1.1 ' + code + ' ' + http.STATUS_CODES[code] + '\r\n\r\n\r\n';
95 socket.end(responseLine);
96 };
97
98 if (self.server.listeners('upgrade').length > 1) {
99 var timer = setTimeout(function() {
100 if (socket.bytesWritten === 0) {
101 finish();
102 }
103 }, 5000);
104 socket.on('close', function() {
105 clearTimeout(timer);
106 });
107 } else {
108 finish();
109 }
110 };
111
112 if (/^\/peers\/(.+)$/.exec(request.url)) {
113 async.eachSeries(self._wsHooks.peerConnect, function(handler, next) {
114 return handler(request, socket, headers, next);
115 }, function(err) {
116 if (err) {
117 return sendError(500);
118 }
119
120 // Handle Peer Request
121 self.wss.handleUpgrade(request, socket, headers, function(ws) {
122 self.setupPeerSocket(ws);
123 });
124 });
125 } else if (match(request)) {
126 async.eachSeries(self._wsHooks.websocketConnect, function(handler, next) {
127 return handler(request, socket, headers, next);
128 }, function(err) {
129 if (err) {
130 return sendError(500);
131 }
132
133 self.wss.handleUpgrade(request, socket, headers, function(ws) {
134 if (ws.upgradeReq.url === '/peer-management') {
135 var query = [
136 { name: self.zetta.id, topic: '_peer/connect' },
137 { name: self.zetta.id, topic: '_peer/disconnect' }];
138
139 var client = new EventSocket(ws, query);
140 self.eventBroker.client(client);
141 } else {
142 self.setupEventSocket(ws);
143 }
144 });
145 });
146 } else {
147 // 404
148 sendError(404);
149 }
150
151 });
152
153 this.cloud = titan({useXForwardedHostHeader: this.useXForwardedHostHeader})
154 .format({ engines: [Siren], override: { 'application/json': Siren }, directory: path.join(__dirname, './api_formats') })
155 .add(RootResource, zettaInstance)
156 .add(PeersManagementResource, zettaInstance)
157 .add(DevicesResource, zettaInstance)
158 .add(ServersResource, zettaInstance)
159 .allow({
160 methods: ['DELETE', 'PUT', 'PATCH', 'POST'],
161 origins: ['*'],
162 headers: ['accept', 'content-type'],
163 maxAge: '432000'
164 })
165 .use(function(handle) {
166 handle('request', function(env, next) {
167 if (env.request.method === 'OPTIONS') {
168 env.argo._routed = true;
169 }
170 next(env);
171 });
172 })
173 .use(function(handle) {
174 handle('request', function(env, next) {
175 // stop execution in argo for initiate peer requests, handled by peer_client
176 if (!(/^\/_initiate_peer\/(.+)$/.exec(env.request.url)) ) {
177 next(env);
178 }
179 });
180 });
181};
182
183ZettaHttpServer.prototype.init = function(cb) {
184 var self = this;
185
186 // handle http registration of device
187 this.cloud = this.cloud.use(this.httpRegistration.bind(this));
188 // handle proxying to peer
189 //this.cloud = this.cloud.route('*', this.proxyToPeer.bind(this));
190 // setup http servers request handler to argo routes
191 this.cloud = this.cloud.build();
192
193 this.server.on('request', this.cloud.run);
194 this.spdyServer.on('request', this.cloud.run);
195
196 if (cb) {
197 cb();
198 }
199};
200
201ZettaHttpServer.prototype.listen = function() {
202 this.server.listen.apply(this.server,arguments);
203 return this;
204};
205
206
207ZettaHttpServer.prototype.onPeerConnect = function(handler) {
208 if (typeof handler !== 'function') {
209 throw new Error('Must supply function as a hook');
210 }
211 this._wsHooks.peerConnect.push(handler);
212};
213
214ZettaHttpServer.prototype.onEventWebsocketConnect = function(handler) {
215 if (typeof handler !== 'function') {
216 throw new Error('Must supply function as a hook');
217 }
218 this._wsHooks.websocketConnect.push(handler);
219};
220
221ZettaHttpServer.prototype.collector = function(name, collector) {
222 if(typeof name === 'function'){
223 collector = name;
224 name = '_logs';
225 }
226
227 if(!this._collectors[name]) {
228 this._collectors[name] = [];
229 }
230
231 this._collectors[name].push(collector);
232 return this;
233};
234
235function getCurrentProtocol(req) {
236 var xfp = req.headers['x-forwarded-proto'];
237 var protocol;
238
239 if (xfp && xfp.length) {
240 protocol = xfp.replace(/\s*/, '').split(',')[0];
241 } else {
242 protocol = req.connection.encrypted ? 'https' : 'http';
243 }
244
245 return protocol;
246}
247
248ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
249 ws._env = { helpers: {}};
250 ws._loader = { path: p };
251
252 ws._env.uri = function() {
253 var protocol = getCurrentProtocol(ws.upgradeReq);
254
255 if (!host) {
256 var address = ws.upgradeReq.connection.address();
257 host = address.address;
258 if (address.port) {
259 if (!(protocol === 'https' && address.port === 443) &&
260 !(protocol === 'http' && address.port === 80)) {
261 host += ':' + address.port
262 }
263 }
264 }
265
266 return protocol + '://' + path.join(host, ws.upgradeReq.url);
267 };
268
269 ws._env.helpers.url = {};
270 ws._env.helpers.url.path = function(pathname) {
271 var parsed = url.parse(ws._env.uri());
272 parsed.search = null;
273 parsed.pathname = pathname;
274 return url.format(parsed);
275 };
276};
277
278ZettaHttpServer.prototype.setupPeerSocket = function(ws) {
279 var self = this;
280 var name = /^\/peers\/(.+)$/.exec(url.parse(ws.upgradeReq.url, true).pathname)[1];
281 name = decodeURI(name);
282 self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer "' + name + '" established.');
283
284 // Include ._env and ._loader on websocket, allows argo formatters to work used in virtual_device build actions.
285 var host = ws.upgradeReq.headers['host']
286 self.wireUpWebSocketForEvent(ws, host, '/servers/' + name);
287
288 if (self.peers[name] && self.peers[name].state !== PeerSocket.DISCONNECTED) {
289 // peer already connected or connecting
290 ws.close(4000, 'peer already connected');
291 } else if (self.peers[name]) {
292 // peer has been disconnected but has connected before.
293 self.peers[name].init(ws);
294 } else {
295 var peer = new PeerSocket(ws, name, self.peerRegistry);
296 self.peers[name] = peer;
297
298 // Events coming from the peers pubsub using push streams
299 peer.on('zetta-events', function(topic, data) {
300 self.zetta.pubsub.publish(name + '/' + topic, data, true); // Set fromRemote flag to true
301 });
302
303 peer.on('connected', function() {
304 self.eventBroker.peer(peer);
305 self.zetta.log.emit('log', 'http_server', 'Peer connection established "' + name + '".');
306 self.zetta.pubsub.publish('_peer/connect', { peer: peer });
307 });
308
309 peer.on('error', function(err) {
310 self.zetta.log.emit('log', 'http_server', 'Peer connection failed for "' + name + '".');
311 self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
312 });
313
314 peer.on('end', function() {
315 self.zetta.log.emit('log', 'http_server', 'Peer connection closed for "' + name + '".');
316 self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
317 });
318 }
319};
320
321ZettaHttpServer.prototype.setupEventSocket = function(ws) {
322 var self = this;
323 var host = ws.upgradeReq.headers['host'];
324
325 if (/^\/events/.exec(ws.upgradeReq.url)) {
326 self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name);
327 var queryString = url.parse(ws.upgradeReq.url).query;
328
329 if(!queryString) {
330 var client = new EventSocket(ws, null, true);
331 self.eventBroker.client(client);
332 return;
333 }
334
335 var query = querystring.parse(queryString);
336
337 function copy(q) {
338 var c = {};
339 Object.keys(q).forEach(function(k) {
340 c[k] = q[k];
341 });
342
343 return c;
344 }
345
346 [self.zetta._name].concat(Object.keys(self.peers)).forEach(function(serverId) {
347 var q = copy(query);
348 q.name = serverId;
349
350 if (q.topic) {
351 var qt = querytopic.parse(query.topic);
352 if (qt) {
353 q.topic = querytopic.format(qt);
354 }
355 var client = new EventSocket(ws, q, false);
356 self.eventBroker.client(client);
357 }
358 });
359
360 function subscribeOnPeerConnect(e, obj) {
361 var q = copy(query);
362 q.name = obj.peer.name;
363
364 if (q.topic) {
365 var qt = querytopic.parse(query.topic);
366 if (qt) {
367 q.topic = querytopic.format(qt);
368 }
369
370 var client = new EventSocket(ws, q);
371 self.eventBroker.client(client);
372 }
373 }
374
375 ws.on('close', function() {
376 self.zetta.pubsub.unsubscribe('_peer/connect', subscribeOnPeerConnect);
377 });
378
379 self.zetta.pubsub.subscribe('_peer/connect', subscribeOnPeerConnect);
380 } else {
381 var match = /^\/servers\/(.+)\/events/.exec(ws.upgradeReq.url);
382 if(!match) {
383 ws.close(1001); // go away status code
384 return;
385 }
386
387 var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
388 query.serverId = match[1]; // set serverId on query
389
390 self.wireUpWebSocketForEvent(ws, host, '/servers/' + query.serverId);
391
392 var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
393 query.name = decodeURI(match[1]);
394
395 if (query.topic) {
396 var qt = querytopic.parse(query.topic);
397 if (qt) {
398 query.topic = querytopic.format(qt);
399 }
400 var client = new EventSocket(ws, query);
401 self.eventBroker.client(client);
402 }
403 }
404};
405
406ZettaHttpServer.prototype.httpRegistration = function(handle) {
407 handle('request', function(env, next) {
408 if (!(env.request.method === 'POST' && env.request.url === '/registration')) {
409 return next(env);
410 }
411
412 env.request.getBody(function(err, body) {
413 body = JSON.parse(body.toString());
414 var peer = self.peers[body.target];
415
416 if (!peer.agent) {
417 env.response.statusCode = 404;
418 return next(env);
419 }
420
421 env.request.body = new Buffer(JSON.stringify(body.device));
422 env.zettaAgent = peer.agent;
423 next(env);
424 });
425 });
426};
427
428ZettaHttpServer.prototype.proxyToPeers = function(peers, env, cb) {
429 var self = this;
430
431 var req = env.request;
432 var res = env.response;
433 var protocol = getCurrentProtocol(req);
434
435 var messageId = ++self.idCounter;
436 self.clients[messageId] = res;
437
438 var tasks = peers.map(function(p) {
439 var name = p.id;
440 return function(callback) {
441
442 var reqUrl = req.templateUrl.replace('{{peerName}}', encodeURIComponent(name));
443 var headers = {};
444
445 Object.keys(req.headers).forEach(function(key) {
446 headers[key] = req.headers[key];
447 });
448
449 if (!req.isSpdy) {
450 headers['x-forwarded-proto'] = protocol;
451 }
452
453 var peer = self.peers[name];
454 if (!peer || peer.state !== PeerSocket.CONNECTED){
455 callback(null, { err: new Error('Peer does not exist.') });
456 return;
457 }
458
459 var agent = env.zettaAgent || peer.agent;
460
461 var opts = { method: req.method, headers: headers, path: reqUrl, agent: agent };
462 var request = http.request(opts, function(response) {
463 response.getBody(function(err, body) {
464 callback(null, { res: response, err: err, body: body });
465 });
466 }).on('error', function(err) {
467 return callback(null, { err: err });
468 });
469
470 if (req.body) {
471 request.end(req.body);
472 } else {
473 req.pipe(request);
474 }
475 };
476 });
477
478 async.parallelLimit(tasks, 5, function(err, results) {
479 cb(err, results, messageId);
480 });
481};
482
483ZettaHttpServer.prototype.proxyToPeer = function(env, next) {
484 var self = this;
485 var req = env.request;
486 var res = env.response;
487
488 var parsed = url.parse(req.url);
489 var name = decodeURIComponent(parsed.pathname.split('/')[2]);
490
491 if (!req.isSpdy) {
492 req.headers['x-forwarded-proto'] = getCurrentProtocol(req);
493 }
494
495 var peer = self.peers[name];
496 if (!peer || peer.state !== PeerSocket.CONNECTED){
497 res.statusCode = 404;
498 res.end();
499 return;
500 }
501
502 var agent = env.zettaAgent || peer.agent;
503
504 var opts = {
505 method: req.method,
506 headers: req.headers,
507 path: req.url,
508 agent: agent,
509 pipe: true
510 };
511 if (typeof env.proxyOpts === 'object') {
512 Object.keys(env.proxyOpts).forEach(function(k) {
513 opts[k] = env.proxyOpts[k];
514 });
515 }
516
517 var request = http.request(opts, function(response) {
518
519 Object.keys(response.headers).forEach(function(header) {
520 res.setHeader(header, response.headers[header]);
521 });
522
523 res.statusCode = response.statusCode;
524
525 if (!opts.pipe) {
526 var body = null;
527 var buf = [];
528 var len = 0;
529
530 response.on('readable', function() {
531 var chunk;
532
533 while ((chunk = response.read()) != null) {
534 buf.push(chunk);
535 len += chunk.length;
536 }
537
538 if (!buf.length) {
539 return;
540 }
541
542 body = new Buffer(len);
543 var i = 0;
544 buf.forEach(function(chunk) {
545 chunk.copy(body, i, 0, chunk.length);
546 i += chunk.length;
547 });
548 });
549
550 response.on('end', function() {
551 env.response.body = body;
552 next(env);
553 });
554 } else {
555 env.response.body = response;
556 next(env);
557 }
558 }).on('error', function(err) {
559 env.response.statusCode = 502;
560 return next(env);
561 });
562
563 if (req.body) {
564 request.end(req.body);
565 } else {
566 req.pipe(request);
567 }
568};