UNPKG

16.6 kBJavaScriptView Raw
1var http = require('http');
2var path = require('path');
3var url = require('url');
4var querystring = require('querystring');
5var async = require('async');
6var spdy = require('spdy');
7var argo = require('argo');
8var titan = require('titan');
9var WebSocketServer = require('ws').Server;
10var SpdyAgent = require('./spdy_agent');
11var EventBroker = require('./event_broker');
12var PeerSocket = require('./peer_socket');
13var EventSocket = require('./event_socket');
14var Siren = require('argo-formatter-siren');
15var DevicesResource = require('./api_resources/devices');
16var PeersManagementResource = require('./api_resources/peer_management');
17var RootResource = require('./api_resources/root');
18var ServersResource = require('./api_resources/servers');
19var deviceFormatter = require('./api_formats/siren/device.siren');
20var rels = require('zetta-rels');
21
22var querytopic = require('./query_topic');
23
24var ZettaHttpServer = module.exports = function(zettaInstance, options) {
25 var self = this;
26 options = (typeof options === 'object') ? options : {};
27 if(typeof options.useXForwardedHostHeader !== 'undefined') {
28 this.useXForwardedHostHeader = options.useXForwardedHostHeader ? true : false;
29 } else {
30 this.useXForwardedHostHeader = true;
31 }
32 if(typeof options.useXForwardedPathHeader !== 'undefined') {
33 this.useXForwardedPathHeader = options.useXForwardedPathHeader ? true : false;
34 } else {
35 this.useXForwardedPathHeader = true;
36 }
37 this.idCounter = 0;
38 this.zetta = zettaInstance;
39 this.peerRegistry = zettaInstance.peerRegistry;
40 this.eventBroker = new EventBroker(zettaInstance);
41 this.clients = {};
42 this.peers = {}; // connected peers
43 this.peerOptions = {}; // default empty options for PeerSocket
44
45 this._deviceQueries = [];
46
47 this._collectors = {};
48
49 // WS hooks to be called before finishing upgrade
50 this._wsHooks = {
51 peerConnect: [],
52 websocketConnect: []
53 };
54
55 // external http(s) server
56 var httpOptions = {
57 windowSize: 1024 * 1024
58 };
59 var tlsCheckOptions = ['cert', 'key', 'pfx', 'ca'];
60 var usingSSL = false;
61 Object.keys(options).forEach(function(k) {
62 httpOptions[k] = options[k];
63 if (tlsCheckOptions.indexOf(k) > -1) {
64 usingSSL = true;
65 }
66 });
67
68 // If any tls options were specified, use ssl and not plain
69 httpOptions.plain = (usingSSL) ? false : true;
70 httpOptions.ssl = (usingSSL) ? true : false;
71 this.server = spdy.createServer(httpOptions);
72
73 // internal server for z2z, allways ssl: false, plain: true
74 this.spdyServer = spdy.createServer({
75 windowSize: 1024 * 1024,
76 plain: true,
77 ssl: false
78 });
79
80 var ValidWSUrls = [
81 /^\/events$/, // /events
82 /^\/events\?.+$/, // /events?topic=query:where type="led"
83 /^\/servers\/(.+)\/events/, // /servers/BC2832FD-9437-4473-A4A8-AC1D56B12C61/events
84 /^\/peers\/(.+)$/, // /peers/123123...
85 /^\/peer-management$/, // /peer-management
86 ];
87
88 function match(request) {
89 return ValidWSUrls.some(function(re) {
90 return re.test(request.url);
91 });
92 }
93
94 this.wss = new WebSocketServer({ noServer: true });
95 this.server.on('upgrade', function(request, socket, headers) {
96
97 var sendError = function(code) {
98 // Check any custom websocket paths from extentions
99 var finish = function() {
100 var responseLine = 'HTTP/1.1 ' + code + ' ' + http.STATUS_CODES[code] + '\r\n\r\n\r\n';
101 socket.end(responseLine);
102 };
103
104 if (self.server.listeners('upgrade').length > 1) {
105 var timer = setTimeout(function() {
106 if (socket.bytesWritten === 0) {
107 finish();
108 }
109 }, 5000);
110 socket.on('close', function() {
111 clearTimeout(timer);
112 });
113 } else {
114 finish();
115 }
116 };
117
118 if (/^\/peers\/(.+)$/.exec(request.url)) {
119 async.eachSeries(self._wsHooks.peerConnect, function(handler, next) {
120 return handler(request, socket, headers, next);
121 }, function(err) {
122 if (err) {
123 return sendError(500);
124 }
125
126 // Handle Peer Request
127 self.wss.handleUpgrade(request, socket, headers, function(ws) {
128 self.setupPeerSocket(ws);
129 });
130 });
131 } else if (match(request)) {
132 async.eachSeries(self._wsHooks.websocketConnect, function(handler, next) {
133 return handler(request, socket, headers, next);
134 }, function(err) {
135 if (err) {
136 return sendError(500);
137 }
138
139 self.wss.handleUpgrade(request, socket, headers, function(ws) {
140 if (ws.upgradeReq.url === '/peer-management') {
141 var query = [
142 { name: self.zetta.id, topic: '_peer/connect' },
143 { name: self.zetta.id, topic: '_peer/disconnect' }];
144
145 var client = new EventSocket(ws, query);
146 self.eventBroker.client(client);
147 } else {
148 self.setupEventSocket(ws);
149 }
150 });
151 });
152 } else {
153 // 404
154 sendError(404);
155 }
156
157 });
158
159 var titanOpts = {
160 useXForwardedHostHeader: this.useXForwardedHostHeader,
161 useXForwardedPathHeader: this.useXForwardedPathHeader
162 };
163 this.cloud = titan(titanOpts)
164 .format({ engines: [Siren], override: { 'application/json': Siren }, directory: path.join(__dirname, './api_formats') })
165 .add(RootResource, zettaInstance)
166 .add(PeersManagementResource, zettaInstance)
167 .add(DevicesResource, zettaInstance)
168 .add(ServersResource, zettaInstance)
169 .allow({
170 methods: ['DELETE', 'PUT', 'PATCH', 'POST'],
171 origins: ['*'],
172 headers: ['accept', 'content-type'],
173 maxAge: '432000'
174 })
175 .use(function(handle) {
176 handle('request', function(env, next) {
177 if (env.request.method === 'OPTIONS') {
178 env.argo._routed = true;
179 }
180 next(env);
181 });
182 })
183 .use(function(handle) {
184 handle('request', function(env, next) {
185 // stop execution in argo for initiate peer requests, handled by peer_client
186 if (!(/^\/_initiate_peer\/(.+)$/.exec(env.request.url)) ) {
187 next(env);
188 }
189 });
190 });
191};
192
193ZettaHttpServer.prototype.init = function(cb) {
194 var self = this;
195
196 // handle http registration of device
197 this.cloud = this.cloud.use(this.httpRegistration.bind(this));
198 // handle proxying to peer
199 //this.cloud = this.cloud.route('*', this.proxyToPeer.bind(this));
200 // setup http servers request handler to argo routes
201 this.cloud = this.cloud.build();
202
203 this.server.on('request', this.cloud.run);
204 this.spdyServer.on('request', this.cloud.run);
205
206 if (cb) {
207 cb();
208 }
209};
210
211ZettaHttpServer.prototype.listen = function() {
212 this.server.listen.apply(this.server,arguments);
213 return this;
214};
215
216
217ZettaHttpServer.prototype.onPeerConnect = function(handler) {
218 if (typeof handler !== 'function') {
219 throw new Error('Must supply function as a hook');
220 }
221 this._wsHooks.peerConnect.push(handler);
222};
223
224ZettaHttpServer.prototype.onEventWebsocketConnect = function(handler) {
225 if (typeof handler !== 'function') {
226 throw new Error('Must supply function as a hook');
227 }
228 this._wsHooks.websocketConnect.push(handler);
229};
230
231ZettaHttpServer.prototype.collector = function(name, collector) {
232 if(typeof name === 'function'){
233 collector = name;
234 name = '_logs';
235 }
236
237 if(!this._collectors[name]) {
238 this._collectors[name] = [];
239 }
240
241 this._collectors[name].push(collector);
242 return this;
243};
244
245function getCurrentProtocol(req) {
246 var xfp = req.headers['x-forwarded-proto'];
247 var protocol;
248
249 if (xfp && xfp.length) {
250 protocol = xfp.replace(/\s*/, '').split(',')[0];
251 } else {
252 protocol = req.connection.encrypted ? 'https' : 'http';
253 }
254
255 return protocol;
256}
257
258ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
259 ws._env = { helpers: {}};
260 ws._loader = { path: p };
261
262 ws._env.uri = function() {
263 var protocol = getCurrentProtocol(ws.upgradeReq);
264
265 if (!host) {
266 var address = ws.upgradeReq.connection.address();
267 host = address.address;
268 if (address.port) {
269 if (!(protocol === 'https' && address.port === 443) &&
270 !(protocol === 'http' && address.port === 80)) {
271 host += ':' + address.port
272 }
273 }
274 }
275
276 return protocol + '://' + path.join(host, ws.upgradeReq.url);
277 };
278
279 ws._env.helpers.url = {};
280 ws._env.helpers.url.path = function(pathname) {
281 var parsed = url.parse(ws._env.uri());
282 parsed.search = null;
283 parsed.pathname = pathname;
284 return url.format(parsed);
285 };
286};
287
288ZettaHttpServer.prototype.setupPeerSocket = function(ws) {
289 var self = this;
290 var name = /^\/peers\/(.+)$/.exec(url.parse(ws.upgradeReq.url, true).pathname)[1];
291 name = decodeURI(name);
292 self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer "' + name + '" established.');
293
294 // Include ._env and ._loader on websocket, allows argo formatters to work used in virtual_device build actions.
295 var host = ws.upgradeReq.headers['host']
296 self.wireUpWebSocketForEvent(ws, host, '/servers/' + name);
297
298 if (self.peers[name] && self.peers[name].state !== PeerSocket.DISCONNECTED) {
299 // peer already connected or connecting
300 ws.close(4000, 'peer already connected');
301 } else if (self.peers[name]) {
302 // peer has been disconnected but has connected before.
303 self.peers[name].init(ws);
304 } else {
305 var peer = new PeerSocket(ws, name, self.peerRegistry, self.peerOptions);
306 self.peers[name] = peer;
307
308 // Events coming from the peers pubsub using push streams
309 peer.on('zetta-events', function(topic, data) {
310 self.zetta.pubsub.publish(name + '/' + topic, data, true); // Set fromRemote flag to true
311 });
312
313 peer.on('connected', function() {
314 self.eventBroker.peer(peer);
315 self.zetta.log.emit('log', 'http_server', 'Peer connection established "' + name + '".');
316 self.zetta.pubsub.publish('_peer/connect', { peer: peer });
317 });
318
319 peer.on('error', function(err) {
320 self.zetta.log.emit('log', 'http_server', 'Peer connection failed for "' + name + '": ' + err.message + '.');
321 self.zetta.pubsub.publish('_peer/disconnect', { peer: peer, err: err });
322 });
323
324 peer.on('end', function() {
325 self.zetta.log.emit('log', 'http_server', 'Peer connection closed for "' + name + '".');
326 self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
327 });
328 }
329};
330
331ZettaHttpServer.prototype.setupEventSocket = function(ws) {
332 var self = this;
333 var host = ws.upgradeReq.headers['host'];
334
335 if (/^\/events/.exec(ws.upgradeReq.url)) {
336 self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name);
337 var parsed = url.parse(ws.upgradeReq.url, true);
338 var query = parsed.query;
339
340 if(!query.topic) {
341 var client = new EventSocket(ws, null, { streamEnabled: true, filterMultiple: !!(query.filterMultiple) });
342 self.eventBroker.client(client);
343 return;
344 }
345
346 function copy(q) {
347 var c = {};
348 Object.keys(q).forEach(function(k) {
349 c[k] = q[k];
350 });
351
352 return c;
353 }
354
355 [self.zetta._name].concat(Object.keys(self.peers)).forEach(function(serverId) {
356 var q = copy(query);
357 q.name = serverId;
358
359 if (q.topic) {
360 var qt = querytopic.parse(query.topic);
361 if (qt) {
362 q.topic = querytopic.format(qt);
363 }
364 var client = new EventSocket(ws, q, { streamEnabled: false });
365 self.eventBroker.client(client);
366 }
367 });
368
369 function subscribeOnPeerConnect(e, obj) {
370 var q = copy(query);
371 q.name = obj.peer.name;
372
373 if (q.topic) {
374 var qt = querytopic.parse(query.topic);
375 if (qt) {
376 q.topic = querytopic.format(qt);
377 }
378
379 var client = new EventSocket(ws, q);
380 self.eventBroker.client(client);
381 }
382 }
383
384 ws.on('close', function() {
385 self.zetta.pubsub.unsubscribe('_peer/connect', subscribeOnPeerConnect);
386 });
387
388 self.zetta.pubsub.subscribe('_peer/connect', subscribeOnPeerConnect);
389 } else {
390 var match = /^\/servers\/(.+)\/events/.exec(ws.upgradeReq.url);
391 if(!match) {
392 ws.close(1001); // go away status code
393 return;
394 }
395
396 var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
397 query.serverId = match[1]; // set serverId on query
398
399 self.wireUpWebSocketForEvent(ws, host, '/servers/' + query.serverId);
400
401 var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
402 query.name = decodeURI(match[1]);
403
404 if (query.topic) {
405 var qt = querytopic.parse(query.topic);
406 if (qt) {
407 query.topic = querytopic.format(qt);
408 }
409 var client = new EventSocket(ws, query);
410 self.eventBroker.client(client);
411 }
412 }
413};
414
415ZettaHttpServer.prototype.httpRegistration = function(handle) {
416 handle('request', function(env, next) {
417 if (!(env.request.method === 'POST' && env.request.url === '/registration')) {
418 return next(env);
419 }
420
421 env.request.getBody(function(err, body) {
422 body = JSON.parse(body.toString());
423 var peer = self.peers[body.target];
424
425 if (!peer.agent) {
426 env.response.statusCode = 404;
427 return next(env);
428 }
429
430 env.request.body = new Buffer(JSON.stringify(body.device));
431 env.zettaAgent = peer.agent;
432 next(env);
433 });
434 });
435};
436
437ZettaHttpServer.prototype.proxyToPeers = function(peers, env, cb) {
438 var self = this;
439
440 var req = env.request;
441 var res = env.response;
442 var protocol = getCurrentProtocol(req);
443
444 var messageId = ++self.idCounter;
445 self.clients[messageId] = res;
446
447 var tasks = peers.map(function(p) {
448 var name = p.id;
449 return function(callback) {
450
451 var reqUrl = req.templateUrl.replace('{{peerName}}', encodeURIComponent(name));
452 var headers = {};
453
454 Object.keys(req.headers).forEach(function(key) {
455 headers[key] = req.headers[key];
456 });
457
458 if (!req.isSpdy) {
459 headers['x-forwarded-proto'] = protocol;
460 }
461
462 var peer = self.peers[name];
463 if (!peer || peer.state !== PeerSocket.CONNECTED){
464 callback(null, { err: new Error('Peer does not exist.') });
465 return;
466 }
467
468 var agent = env.zettaAgent || peer.agent;
469
470 var opts = { method: req.method, headers: headers, path: reqUrl, agent: agent };
471 var request = http.request(opts, function(response) {
472 response.getBody(function(err, body) {
473 callback(null, { res: response, err: err, body: body });
474 });
475 }).on('error', function(err) {
476 return callback(null, { err: err });
477 });
478
479 if (req.body) {
480 request.end(req.body);
481 } else {
482 req.pipe(request);
483 }
484 };
485 });
486
487 async.parallelLimit(tasks, 5, function(err, results) {
488 cb(err, results, messageId);
489 });
490};
491
492ZettaHttpServer.prototype.proxyToPeer = function(env, next) {
493 var self = this;
494 var req = env.request;
495 var res = env.response;
496
497 var parsed = url.parse(req.url);
498 var name = decodeURIComponent(parsed.pathname.split('/')[2]);
499
500 if (!req.isSpdy) {
501 req.headers['x-forwarded-proto'] = getCurrentProtocol(req);
502 }
503
504 var peer = self.peers[name];
505 if (!peer || peer.state !== PeerSocket.CONNECTED){
506 res.statusCode = 404;
507 res.end();
508 return;
509 }
510
511 var agent = env.zettaAgent || peer.agent;
512
513 var opts = {
514 method: req.method,
515 headers: req.headers,
516 path: req.url,
517 agent: agent,
518 pipe: true
519 };
520 if (typeof env.proxyOpts === 'object') {
521 Object.keys(env.proxyOpts).forEach(function(k) {
522 opts[k] = env.proxyOpts[k];
523 });
524 }
525
526 var request = http.request(opts, function(response) {
527
528 Object.keys(response.headers).forEach(function(header) {
529 res.setHeader(header, response.headers[header]);
530 });
531
532 res.statusCode = response.statusCode;
533
534 if (!opts.pipe) {
535 var body = null;
536 var buf = [];
537 var len = 0;
538
539 response.on('readable', function() {
540 var chunk;
541
542 while ((chunk = response.read()) != null) {
543 buf.push(chunk);
544 len += chunk.length;
545 }
546
547 if (!buf.length) {
548 return;
549 }
550
551 body = new Buffer(len);
552 var i = 0;
553 buf.forEach(function(chunk) {
554 chunk.copy(body, i, 0, chunk.length);
555 i += chunk.length;
556 });
557 });
558
559 response.on('end', function() {
560 env.response.body = body;
561 next(env);
562 });
563 } else {
564 env.response.body = response;
565 next(env);
566 }
567 }).on('error', function(err) {
568 env.response.statusCode = 502;
569 return next(env);
570 });
571
572 if (req.body) {
573 request.end(req.body);
574 } else {
575 req.pipe(request);
576 }
577};