UNPKG

17 kBJavaScriptView Raw
1var http = require('http');
2var path = require('path');
3var url = require('url');
4var querystring = require('querystring');
5var async = require('async');
6var spdy = require('spdy');
7var argo = require('argo');
8var titan = require('titan');
9var WebSocketServer = require('ws').Server;
10var SpdyAgent = require('./spdy_agent');
11var EventBroker = require('./event_broker');
12var PeerSocket = require('./peer_socket');
13var EventSocket = require('./event_socket');
14var Siren = require('argo-formatter-siren');
15var DevicesResource = require('./api_resources/devices');
16var PeersManagementResource = require('./api_resources/peer_management');
17var RootResource = require('./api_resources/root');
18var ServersResource = require('./api_resources/servers');
19var deviceFormatter = require('./api_formats/siren/device.siren');
20var rels = require('zetta-rels');
21
22var querytopic = require('./query_topic');
23
24var ZettaHttpServer = module.exports = function(zettaInstance, options) {
25 var self = this;
26 options = (typeof options === 'object') ? options : {};
27 if(typeof options.useXForwardedHostHeader !== 'undefined') {
28 this.useXForwardedHostHeader = options.useXForwardedHostHeader ? true : false;
29 } else {
30 this.useXForwardedHostHeader = true;
31 }
32 if(typeof options.useXForwardedPathHeader !== 'undefined') {
33 this.useXForwardedPathHeader = options.useXForwardedPathHeader ? true : false;
34 } else {
35 this.useXForwardedPathHeader = true;
36 }
37 this.idCounter = 0;
38 this.zetta = zettaInstance;
39 this.peerRegistry = zettaInstance.peerRegistry;
40 this.eventBroker = new EventBroker(zettaInstance);
41 this.clients = {};
42 this.peers = {}; // connected peers
43 this.peerOptions = {}; // default empty options for PeerSocket
44
45 this._deviceQueries = [];
46
47 this._collectors = {};
48
49 // WS hooks to be called before finishing upgrade
50 this._wsHooks = {
51 peerConnect: [],
52 websocketConnect: []
53 };
54
55 // external http(s) server
56 var httpOptions = {
57 connection: {
58 windowSize: 1024 * 1024,
59 autoSpdy31: false
60 },
61 spdy: {
62 plain: true,
63 ssl: false
64 }
65 };
66
67 var tlsCheckOptions = ['cert', 'key', 'pfx', 'ca'];
68 var usingSSL = false;
69 Object.keys(options).forEach(function(k) {
70 httpOptions[k] = options[k];
71 if (tlsCheckOptions.indexOf(k) > -1) {
72 usingSSL = true;
73 }
74 });
75
76 // If any tls options were specified, use ssl and not plain
77 httpOptions.spdy.plain = (usingSSL) ? false : true;
78 httpOptions.spdy.ssl = (usingSSL) ? true : false;
79
80 var spdyServerOpts = {
81 connection: {
82 windowSize: 1024 * 1024,
83 autoSpdy31: false
84 },
85 spdy: {
86 plain: true,
87 ssl: false
88 }
89 };
90
91 // Outside http server
92 this.server = spdy.createServer(httpOptions);
93
94 // internal server for z2z, allways ssl: false, plain: true
95 // TODO: remove this as it is unneeded now.
96 this.spdyServer = spdy.createServer(spdyServerOpts);
97 this.spdyServer.on('ping', function(socket) {
98 socket.emit('spdyPing');
99 })
100
101 var ValidWSUrls = [
102 /^\/events$/, // /events
103 /^\/events\?.+$/, // /events?topic=query:where type="led"
104 /^\/servers\/(.+)\/events/, // /servers/BC2832FD-9437-4473-A4A8-AC1D56B12C61/events
105 /^\/peers\/(.+)$/, // /peers/123123...
106 /^\/peer-management$/, // /peer-management
107 ];
108
109 function match(request) {
110 return ValidWSUrls.some(function(re) {
111 return re.test(request.url);
112 });
113 }
114
115 this.wss = new WebSocketServer({ noServer: true });
116 this.server.on('upgrade', function(request, socket, headers) {
117 var sendError = function(code) {
118 // Check any custom websocket paths from extentions
119 var finish = function() {
120 var responseLine = 'HTTP/1.1 ' + code + ' ' + http.STATUS_CODES[code] + '\r\n\r\n\r\n';
121 socket.end(responseLine);
122 };
123
124 if (self.server.listeners('upgrade').length > 1) {
125 var timer = setTimeout(function() {
126 if (socket.bytesWritten === 0) {
127 finish();
128 }
129 }, 5000);
130 socket.on('close', function() {
131 clearTimeout(timer);
132 });
133 } else {
134 finish();
135 }
136 };
137
138 if (/^\/peers\/(.+)$/.exec(request.url)) {
139 async.eachSeries(self._wsHooks.peerConnect, function(handler, next) {
140 return handler(request, socket, headers, next);
141 }, function(err) {
142 if (err) {
143 return sendError(500);
144 }
145
146 // Handle Peer Request
147 self.wss.handleUpgrade(request, socket, headers, function(ws) {
148 self.setupPeerSocket(ws, request);
149 });
150 });
151 } else if (match(request)) {
152 async.eachSeries(self._wsHooks.websocketConnect, function(handler, next) {
153 return handler(request, socket, headers, next);
154 }, function(err) {
155 if (err) {
156 return sendError(500);
157 }
158
159 self.wss.handleUpgrade(request, socket, headers, function(ws) {
160 if (request.url === '/peer-management') {
161 var query = [
162 { name: self.zetta.id, topic: '_peer/connect' },
163 { name: self.zetta.id, topic: '_peer/disconnect' }];
164
165 var client = new EventSocket(ws, query);
166 self.eventBroker.client(client);
167 } else {
168 self.setupEventSocket(ws, request);
169 }
170 });
171 });
172 } else {
173 // 404
174 sendError(404);
175 }
176
177 });
178
179 var titanOpts = {
180 useXForwardedHostHeader: this.useXForwardedHostHeader,
181 useXForwardedPathHeader: this.useXForwardedPathHeader
182 };
183 this.cloud = titan(titanOpts)
184 .format({ engines: [Siren], override: { 'application/json': Siren }, directory: path.join(__dirname, './api_formats') })
185 .add(RootResource, zettaInstance)
186 .add(PeersManagementResource, zettaInstance)
187 .add(DevicesResource, zettaInstance)
188 .add(ServersResource, zettaInstance)
189 .allow({
190 methods: ['DELETE', 'PUT', 'PATCH', 'POST'],
191 origins: ['*'],
192 headers: ['accept', 'content-type'],
193 maxAge: '432000'
194 })
195 .use(function(handle) {
196 handle('request', function(env, next) {
197 if (env.request.method === 'OPTIONS') {
198 env.argo._routed = true;
199 }
200 next(env);
201 });
202 })
203 .use(function(handle) {
204 handle('request', function(env, next) {
205 // stop execution in argo for initiate peer requests, handled by peer_client
206 if (!(/^\/_initiate_peer\/(.+)$/.exec(env.request.url)) ) {
207 next(env);
208 }
209 });
210 });
211};
212
213ZettaHttpServer.prototype.init = function(cb) {
214 var self = this;
215
216 // handle http registration of device
217 this.cloud = this.cloud.use(this.httpRegistration.bind(this));
218 // handle proxying to peer
219 //this.cloud = this.cloud.route('*', this.proxyToPeer.bind(this));
220 // setup http servers request handler to argo routes
221 this.cloud = this.cloud.build();
222
223 this.server.on('request', this.cloud.run);
224 this.spdyServer.on('request', this.cloud.run);
225
226 if (cb) {
227 cb();
228 }
229};
230
231ZettaHttpServer.prototype.listen = function() {
232 this.server.listen.apply(this.server,arguments);
233 return this;
234};
235
236
237ZettaHttpServer.prototype.onPeerConnect = function(handler) {
238 if (typeof handler !== 'function') {
239 throw new Error('Must supply function as a hook');
240 }
241 this._wsHooks.peerConnect.push(handler);
242};
243
244ZettaHttpServer.prototype.onEventWebsocketConnect = function(handler) {
245 if (typeof handler !== 'function') {
246 throw new Error('Must supply function as a hook');
247 }
248 this._wsHooks.websocketConnect.push(handler);
249};
250
251ZettaHttpServer.prototype.collector = function(name, collector) {
252 if(typeof name === 'function'){
253 collector = name;
254 name = '_logs';
255 }
256
257 if(!this._collectors[name]) {
258 this._collectors[name] = [];
259 }
260
261 this._collectors[name].push(collector);
262 return this;
263};
264
265function getCurrentProtocol(req) {
266 var xfp = req.headers['x-forwarded-proto'];
267 var protocol;
268
269 if (xfp && xfp.length) {
270 protocol = xfp.replace(/\s*/, '').split(',')[0];
271 } else {
272 protocol = req.connection.encrypted ? 'https' : 'http';
273 }
274
275 return protocol;
276}
277
278ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, request, host, p) {
279 ws._env = { helpers: {}};
280 ws._loader = { path: p };
281
282 ws._env.uri = function() {
283 var protocol = getCurrentProtocol(request);
284
285 if (!host) {
286 var address = request.connection.address();
287 host = address.address;
288 if (address.port) {
289 if (!(protocol === 'https' && address.port === 443) &&
290 !(protocol === 'http' && address.port === 80)) {
291 host += ':' + address.port
292 }
293 }
294 }
295 return (protocol + '://' + path.join(host, request.url)).replace(/\\/g, '/');
296 };
297
298 ws._env.helpers.url = {};
299 ws._env.helpers.url.path = function(pathname) {
300 var parsed = url.parse(ws._env.uri());
301 parsed.search = null;
302 parsed.pathname = pathname;
303 return url.format(parsed);
304 };
305};
306
307ZettaHttpServer.prototype.setupPeerSocket = function(ws, request) {
308 var self = this;
309 var name = /^\/peers\/(.+)$/.exec(url.parse(request.url, true).pathname)[1];
310 name = decodeURI(name);
311 self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer "' + name + '" established.');
312
313 // Include ._env and ._loader on websocket, allows argo formatters to work used in virtual_device build actions.
314 var host = request.headers['host']
315 self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + name);
316
317 if (self.peers[name] && self.peers[name].state !== PeerSocket.DISCONNECTED) {
318 // peer already connected or connecting
319 ws.close(4000, 'peer already connected');
320 } else if (self.peers[name]) {
321 // peer has been disconnected but has connected before.
322 self.peers[name].init(ws, request);
323 } else {
324 var peer = new PeerSocket(ws, request, name, self.peerRegistry, self.peerOptions);
325 self.peers[name] = peer;
326
327 // Events coming from the peers pubsub using push streams
328 peer.on('zetta-events', function(topic, data) {
329 self.zetta.pubsub.publish(name + '/' + topic, data, true); // Set fromRemote flag to true
330 });
331
332 peer.on('connected', function() {
333 self.eventBroker.peer(peer);
334 self.zetta.log.emit('log', 'http_server', 'Peer connection established "' + name + '".');
335 self.zetta.pubsub.publish('_peer/connect', { peer: peer });
336 });
337
338 peer.on('error', function(err) {
339 self.zetta.log.emit('log', 'http_server', 'Peer connection failed for "' + name + '": ' + err.message + '.');
340 self.zetta.pubsub.publish('_peer/disconnect', { peer: peer, err: err });
341 });
342
343 peer.on('end', function() {
344 self.zetta.log.emit('log', 'http_server', 'Peer connection closed for "' + name + '".');
345 self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
346 });
347 }
348};
349
350ZettaHttpServer.prototype.setupEventSocket = function(ws, request) {
351 var self = this;
352 var host = request.headers['host'];
353
354 if (/^\/events/.exec(request.url)) {
355 self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + self.zetta._name);
356 var parsed = url.parse(request.url, true);
357 var query = parsed.query;
358
359 if(!query.topic) {
360 var client = new EventSocket(ws, null, { streamEnabled: true, filterMultiple: !!(query.filterMultiple) });
361 self.eventBroker.client(client);
362 return;
363 }
364
365 function copy(q) {
366 var c = {};
367 Object.keys(q).forEach(function(k) {
368 c[k] = q[k];
369 });
370
371 return c;
372 }
373
374 [self.zetta._name].concat(Object.keys(self.peers)).forEach(function(serverId) {
375 var q = copy(query);
376 q.name = serverId;
377
378 if (q.topic) {
379 var qt = querytopic.parse(query.topic);
380 if (qt) {
381 q.topic = querytopic.format(qt);
382 }
383 var client = new EventSocket(ws, q, { streamEnabled: false });
384 self.eventBroker.client(client);
385 }
386 });
387
388 function subscribeOnPeerConnect(e, obj) {
389 var q = copy(query);
390 q.name = obj.peer.name;
391
392 if (q.topic) {
393 var qt = querytopic.parse(query.topic);
394 if (qt) {
395 q.topic = querytopic.format(qt);
396 }
397
398 var client = new EventSocket(ws, q);
399 self.eventBroker.client(client);
400 }
401 }
402
403 ws.on('close', function() {
404 self.zetta.pubsub.unsubscribe('_peer/connect', subscribeOnPeerConnect);
405 });
406
407 self.zetta.pubsub.subscribe('_peer/connect', subscribeOnPeerConnect);
408 } else {
409 var match = /^\/servers\/(.+)\/events/.exec(request.url);
410 if(!match) {
411 ws.close(1001); // go away status code
412 return;
413 }
414
415 var query = querystring.parse(url.parse(request.url).query);
416 query.serverId = match[1]; // set serverId on query
417
418 self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + query.serverId);
419
420 var query = querystring.parse(url.parse(request.url).query);
421 query.name = decodeURI(match[1]);
422
423 if (query.topic) {
424 var qt = querytopic.parse(query.topic);
425 if (qt) {
426 query.topic = querytopic.format(qt);
427 }
428 var client = new EventSocket(ws, query);
429 self.eventBroker.client(client);
430 }
431 }
432};
433
434ZettaHttpServer.prototype.httpRegistration = function(handle) {
435 handle('request', function(env, next) {
436 if (!(env.request.method === 'POST' && env.request.url === '/registration')) {
437 return next(env);
438 }
439
440 env.request.getBody(function(err, body) {
441 body = JSON.parse(body.toString());
442 var peer = self.peers[body.target];
443
444 if (!peer.agent) {
445 env.response.statusCode = 404;
446 return next(env);
447 }
448
449 env.request.body = new Buffer(JSON.stringify(body.device));
450 env.zettaAgent = peer.agent;
451 next(env);
452 });
453 });
454};
455
456ZettaHttpServer.prototype.proxyToPeers = function(peers, env, cb) {
457 var self = this;
458
459 var req = env.request;
460 var res = env.response;
461 var protocol = getCurrentProtocol(req);
462
463 var messageId = ++self.idCounter;
464 self.clients[messageId] = res;
465
466 var tasks = peers.map(function(p) {
467 var name = p.id;
468 return function(callback) {
469
470 var reqUrl = req.templateUrl.replace('{{peerName}}', encodeURIComponent(name));
471 var headers = {};
472
473 Object.keys(req.headers).forEach(function(key) {
474 headers[key] = req.headers[key];
475 });
476
477 if (!req.isSpdy) {
478 headers['x-forwarded-proto'] = protocol;
479 }
480
481 var peer = self.peers[name];
482 if (!peer || peer.state !== PeerSocket.CONNECTED){
483 callback(null, { err: new Error('Peer does not exist.') });
484 return;
485 }
486
487 var agent = env.zettaAgent || peer.agent;
488
489 var opts = { method: req.method, headers: headers, path: reqUrl, agent: agent };
490 var request = http.request(opts, function(response) {
491 response.getBody(function(err, body) {
492 callback(null, { res: response, err: err, body: body });
493 });
494 }).on('error', function(err) {
495 return callback(null, { err: err });
496 });
497
498 if (req.body) {
499 request.end(req.body);
500 } else {
501 req.pipe(request);
502 }
503 };
504 });
505
506 async.parallelLimit(tasks, 5, function(err, results) {
507 cb(err, results, messageId);
508 });
509};
510
511ZettaHttpServer.prototype.proxyToPeer = function(env, next) {
512 var self = this;
513 var req = env.request;
514 var res = env.response;
515
516 var parsed = url.parse(req.url);
517 var name = decodeURIComponent(parsed.pathname.split('/')[2]);
518
519 if (!req.isSpdy) {
520 req.headers['x-forwarded-proto'] = getCurrentProtocol(req);
521 }
522
523 var peer = self.peers[name];
524 if (!peer || peer.state !== PeerSocket.CONNECTED){
525 res.statusCode = 404;
526 res.end();
527 return;
528 }
529
530 var agent = env.zettaAgent || peer.agent;
531
532 var opts = {
533 method: req.method,
534 headers: req.headers,
535 path: req.url,
536 agent: agent,
537 pipe: true
538 };
539 if (typeof env.proxyOpts === 'object') {
540 Object.keys(env.proxyOpts).forEach(function(k) {
541 opts[k] = env.proxyOpts[k];
542 });
543 }
544
545 var request = http.request(opts, function(response) {
546
547 Object.keys(response.headers).forEach(function(header) {
548 res.setHeader(header, response.headers[header]);
549 });
550
551 res.statusCode = response.statusCode;
552
553 if (!opts.pipe) {
554 var body = null;
555 var buf = [];
556 var len = 0;
557
558 response.on('readable', function() {
559 var chunk;
560
561 while ((chunk = response.read()) != null) {
562 buf.push(chunk);
563 len += chunk.length;
564 }
565
566 if (!buf.length) {
567 return;
568 }
569
570 body = new Buffer(len);
571 var i = 0;
572 buf.forEach(function(chunk) {
573 chunk.copy(body, i, 0, chunk.length);
574 i += chunk.length;
575 });
576 });
577
578 response.on('end', function() {
579 env.response.body = body;
580 next(env);
581 });
582 } else {
583 env.response.body = response;
584 next(env);
585 }
586 }).on('error', function(err) {
587 env.response.statusCode = 502;
588 return next(env);
589 });
590
591 if (req.body) {
592 request.end(req.body);
593 } else {
594 req.pipe(request);
595 }
596};