1 | var http = require('http');
|
2 | var path = require('path');
|
3 | var url = require('url');
|
4 | var querystring = require('querystring');
|
5 | var async = require('async');
|
6 | var spdy = require('spdy');
|
7 | var argo = require('argo');
|
8 | var titan = require('titan');
|
9 | var WebSocketServer = require('ws').Server;
|
10 | var SpdyAgent = require('./spdy_agent');
|
11 | var EventBroker = require('./event_broker');
|
12 | var PeerSocket = require('./peer_socket');
|
13 | var EventSocket = require('./event_socket');
|
14 | var Siren = require('argo-formatter-siren');
|
15 | var DevicesResource = require('./api_resources/devices');
|
16 | var PeersManagementResource = require('./api_resources/peer_management');
|
17 | var RootResource = require('./api_resources/root');
|
18 | var ServersResource = require('./api_resources/servers');
|
19 | var deviceFormatter = require('./api_formats/siren/device.siren');
|
20 | var rels = require('zetta-rels');
|
21 |
|
22 | var querytopic = require('./query_topic');
|
23 |
|
24 | var ZettaHttpServer = module.exports = function(zettaInstance, options) {
|
25 | var self = this;
|
26 | options = (typeof options === 'object') ? options : {};
|
27 | if(typeof options.useXForwardedHostHeader !== 'undefined') {
|
28 | this.useXForwardedHostHeader = options.useXForwardedHostHeader ? true : false;
|
29 | } else {
|
30 | this.useXForwardedHostHeader = true;
|
31 | }
|
32 | if(typeof options.useXForwardedPathHeader !== 'undefined') {
|
33 | this.useXForwardedPathHeader = options.useXForwardedPathHeader ? true : false;
|
34 | } else {
|
35 | this.useXForwardedPathHeader = true;
|
36 | }
|
37 | this.idCounter = 0;
|
38 | this.zetta = zettaInstance;
|
39 | this.peerRegistry = zettaInstance.peerRegistry;
|
40 | this.eventBroker = new EventBroker(zettaInstance);
|
41 | this.clients = {};
|
42 | this.peers = {};
|
43 | this.peerOptions = {};
|
44 |
|
45 | this._deviceQueries = [];
|
46 |
|
47 | this._collectors = {};
|
48 |
|
49 |
|
50 | this._wsHooks = {
|
51 | peerConnect: [],
|
52 | websocketConnect: []
|
53 | };
|
54 |
|
55 |
|
56 | var httpOptions = {
|
57 | connection: {
|
58 | windowSize: 1024 * 1024,
|
59 | autoSpdy31: false
|
60 | },
|
61 | spdy: {
|
62 | plain: true,
|
63 | ssl: false
|
64 | }
|
65 | };
|
66 |
|
67 | var tlsCheckOptions = ['cert', 'key', 'pfx', 'ca'];
|
68 | var usingSSL = false;
|
69 | Object.keys(options).forEach(function(k) {
|
70 | httpOptions[k] = options[k];
|
71 | if (tlsCheckOptions.indexOf(k) > -1) {
|
72 | usingSSL = true;
|
73 | }
|
74 | });
|
75 |
|
76 |
|
77 | httpOptions.spdy.plain = (usingSSL) ? false : true;
|
78 | httpOptions.spdy.ssl = (usingSSL) ? true : false;
|
79 |
|
80 | var spdyServerOpts = {
|
81 | connection: {
|
82 | windowSize: 1024 * 1024,
|
83 | autoSpdy31: false
|
84 | },
|
85 | spdy: {
|
86 | plain: true,
|
87 | ssl: false
|
88 | }
|
89 | };
|
90 |
|
91 |
|
92 | this.server = spdy.createServer(httpOptions);
|
93 |
|
94 |
|
95 |
|
96 | this.spdyServer = spdy.createServer(spdyServerOpts);
|
97 | this.spdyServer.on('ping', function(socket) {
|
98 | socket.emit('spdyPing');
|
99 | })
|
100 |
|
101 | var ValidWSUrls = [
|
102 | /^\/events$/,
|
103 | /^\/events\?.+$/,
|
104 | /^\/servers\/(.+)\/events/,
|
105 | /^\/peers\/(.+)$/,
|
106 | /^\/peer-management$/,
|
107 | ];
|
108 |
|
109 | function match(request) {
|
110 | return ValidWSUrls.some(function(re) {
|
111 | return re.test(request.url);
|
112 | });
|
113 | }
|
114 |
|
115 | this.wss = new WebSocketServer({ noServer: true });
|
116 | this.server.on('upgrade', function(request, socket, headers) {
|
117 | var sendError = function(code) {
|
118 |
|
119 | var finish = function() {
|
120 | var responseLine = 'HTTP/1.1 ' + code + ' ' + http.STATUS_CODES[code] + '\r\n\r\n\r\n';
|
121 | socket.end(responseLine);
|
122 | };
|
123 |
|
124 | if (self.server.listeners('upgrade').length > 1) {
|
125 | var timer = setTimeout(function() {
|
126 | if (socket.bytesWritten === 0) {
|
127 | finish();
|
128 | }
|
129 | }, 5000);
|
130 | socket.on('close', function() {
|
131 | clearTimeout(timer);
|
132 | });
|
133 | } else {
|
134 | finish();
|
135 | }
|
136 | };
|
137 |
|
138 | if (/^\/peers\/(.+)$/.exec(request.url)) {
|
139 | async.eachSeries(self._wsHooks.peerConnect, function(handler, next) {
|
140 | return handler(request, socket, headers, next);
|
141 | }, function(err) {
|
142 | if (err) {
|
143 | return sendError(500);
|
144 | }
|
145 |
|
146 |
|
147 | self.wss.handleUpgrade(request, socket, headers, function(ws) {
|
148 | self.setupPeerSocket(ws, request);
|
149 | });
|
150 | });
|
151 | } else if (match(request)) {
|
152 | async.eachSeries(self._wsHooks.websocketConnect, function(handler, next) {
|
153 | return handler(request, socket, headers, next);
|
154 | }, function(err) {
|
155 | if (err) {
|
156 | return sendError(500);
|
157 | }
|
158 |
|
159 | self.wss.handleUpgrade(request, socket, headers, function(ws) {
|
160 | if (request.url === '/peer-management') {
|
161 | var query = [
|
162 | { name: self.zetta.id, topic: '_peer/connect' },
|
163 | { name: self.zetta.id, topic: '_peer/disconnect' }];
|
164 |
|
165 | var client = new EventSocket(ws, query);
|
166 | self.eventBroker.client(client);
|
167 | } else {
|
168 | self.setupEventSocket(ws, request);
|
169 | }
|
170 | });
|
171 | });
|
172 | } else {
|
173 |
|
174 | sendError(404);
|
175 | }
|
176 |
|
177 | });
|
178 |
|
179 | var titanOpts = {
|
180 | useXForwardedHostHeader: this.useXForwardedHostHeader,
|
181 | useXForwardedPathHeader: this.useXForwardedPathHeader
|
182 | };
|
183 | this.cloud = titan(titanOpts)
|
184 | .format({ engines: [Siren], override: { 'application/json': Siren }, directory: path.join(__dirname, './api_formats') })
|
185 | .add(RootResource, zettaInstance)
|
186 | .add(PeersManagementResource, zettaInstance)
|
187 | .add(DevicesResource, zettaInstance)
|
188 | .add(ServersResource, zettaInstance)
|
189 | .allow({
|
190 | methods: ['DELETE', 'PUT', 'PATCH', 'POST'],
|
191 | origins: ['*'],
|
192 | headers: ['accept', 'content-type'],
|
193 | maxAge: '432000'
|
194 | })
|
195 | .use(function(handle) {
|
196 | handle('request', function(env, next) {
|
197 | if (env.request.method === 'OPTIONS') {
|
198 | env.argo._routed = true;
|
199 | }
|
200 | next(env);
|
201 | });
|
202 | })
|
203 | .use(function(handle) {
|
204 | handle('request', function(env, next) {
|
205 |
|
206 | if (!(/^\/_initiate_peer\/(.+)$/.exec(env.request.url)) ) {
|
207 | next(env);
|
208 | }
|
209 | });
|
210 | });
|
211 | };
|
212 |
|
213 | ZettaHttpServer.prototype.init = function(cb) {
|
214 | var self = this;
|
215 |
|
216 |
|
217 | this.cloud = this.cloud.use(this.httpRegistration.bind(this));
|
218 |
|
219 |
|
220 |
|
221 | this.cloud = this.cloud.build();
|
222 |
|
223 | this.server.on('request', this.cloud.run);
|
224 | this.spdyServer.on('request', this.cloud.run);
|
225 |
|
226 | if (cb) {
|
227 | cb();
|
228 | }
|
229 | };
|
230 |
|
231 | ZettaHttpServer.prototype.listen = function() {
|
232 | this.server.listen.apply(this.server,arguments);
|
233 | return this;
|
234 | };
|
235 |
|
236 |
|
237 | ZettaHttpServer.prototype.onPeerConnect = function(handler) {
|
238 | if (typeof handler !== 'function') {
|
239 | throw new Error('Must supply function as a hook');
|
240 | }
|
241 | this._wsHooks.peerConnect.push(handler);
|
242 | };
|
243 |
|
244 | ZettaHttpServer.prototype.onEventWebsocketConnect = function(handler) {
|
245 | if (typeof handler !== 'function') {
|
246 | throw new Error('Must supply function as a hook');
|
247 | }
|
248 | this._wsHooks.websocketConnect.push(handler);
|
249 | };
|
250 |
|
251 | ZettaHttpServer.prototype.collector = function(name, collector) {
|
252 | if(typeof name === 'function'){
|
253 | collector = name;
|
254 | name = '_logs';
|
255 | }
|
256 |
|
257 | if(!this._collectors[name]) {
|
258 | this._collectors[name] = [];
|
259 | }
|
260 |
|
261 | this._collectors[name].push(collector);
|
262 | return this;
|
263 | };
|
264 |
|
265 | function getCurrentProtocol(req) {
|
266 | var xfp = req.headers['x-forwarded-proto'];
|
267 | var protocol;
|
268 |
|
269 | if (xfp && xfp.length) {
|
270 | protocol = xfp.replace(/\s*/, '').split(',')[0];
|
271 | } else {
|
272 | protocol = req.connection.encrypted ? 'https' : 'http';
|
273 | }
|
274 |
|
275 | return protocol;
|
276 | }
|
277 |
|
278 | ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, request, host, p) {
|
279 | ws._env = { helpers: {}};
|
280 | ws._loader = { path: p };
|
281 |
|
282 | ws._env.uri = function() {
|
283 | var protocol = getCurrentProtocol(request);
|
284 |
|
285 | if (!host) {
|
286 | var address = request.connection.address();
|
287 | host = address.address;
|
288 | if (address.port) {
|
289 | if (!(protocol === 'https' && address.port === 443) &&
|
290 | !(protocol === 'http' && address.port === 80)) {
|
291 | host += ':' + address.port
|
292 | }
|
293 | }
|
294 | }
|
295 | return (protocol + '://' + path.join(host, request.url)).replace(/\\/g, '/');
|
296 | };
|
297 |
|
298 | ws._env.helpers.url = {};
|
299 | ws._env.helpers.url.path = function(pathname) {
|
300 | var parsed = url.parse(ws._env.uri());
|
301 | parsed.search = null;
|
302 | parsed.pathname = pathname;
|
303 | return url.format(parsed);
|
304 | };
|
305 | };
|
306 |
|
307 | ZettaHttpServer.prototype.setupPeerSocket = function(ws, request) {
|
308 | var self = this;
|
309 | var name = /^\/peers\/(.+)$/.exec(url.parse(request.url, true).pathname)[1];
|
310 | name = decodeURI(name);
|
311 | self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer "' + name + '" established.');
|
312 |
|
313 |
|
314 | var host = request.headers['host']
|
315 | self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + name);
|
316 |
|
317 | if (self.peers[name] && self.peers[name].state !== PeerSocket.DISCONNECTED) {
|
318 |
|
319 | ws.close(4000, 'peer already connected');
|
320 | } else if (self.peers[name]) {
|
321 |
|
322 | self.peers[name].init(ws, request);
|
323 | } else {
|
324 | var peer = new PeerSocket(ws, request, name, self.peerRegistry, self.peerOptions);
|
325 | self.peers[name] = peer;
|
326 |
|
327 |
|
328 | peer.on('zetta-events', function(topic, data) {
|
329 | self.zetta.pubsub.publish(name + '/' + topic, data, true);
|
330 | });
|
331 |
|
332 | peer.on('connected', function() {
|
333 | self.eventBroker.peer(peer);
|
334 | self.zetta.log.emit('log', 'http_server', 'Peer connection established "' + name + '".');
|
335 | self.zetta.pubsub.publish('_peer/connect', { peer: peer });
|
336 | });
|
337 |
|
338 | peer.on('error', function(err) {
|
339 | self.zetta.log.emit('log', 'http_server', 'Peer connection failed for "' + name + '": ' + err.message + '.');
|
340 | self.zetta.pubsub.publish('_peer/disconnect', { peer: peer, err: err });
|
341 | });
|
342 |
|
343 | peer.on('end', function() {
|
344 | self.zetta.log.emit('log', 'http_server', 'Peer connection closed for "' + name + '".');
|
345 | self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
|
346 | });
|
347 | }
|
348 | };
|
349 |
|
350 | ZettaHttpServer.prototype.setupEventSocket = function(ws, request) {
|
351 | var self = this;
|
352 | var host = request.headers['host'];
|
353 |
|
354 | if (/^\/events/.exec(request.url)) {
|
355 | self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + self.zetta._name);
|
356 | var parsed = url.parse(request.url, true);
|
357 | var query = parsed.query;
|
358 |
|
359 | if(!query.topic) {
|
360 | var client = new EventSocket(ws, null, { streamEnabled: true, filterMultiple: !!(query.filterMultiple) });
|
361 | self.eventBroker.client(client);
|
362 | return;
|
363 | }
|
364 |
|
365 | function copy(q) {
|
366 | var c = {};
|
367 | Object.keys(q).forEach(function(k) {
|
368 | c[k] = q[k];
|
369 | });
|
370 |
|
371 | return c;
|
372 | }
|
373 |
|
374 | [self.zetta._name].concat(Object.keys(self.peers)).forEach(function(serverId) {
|
375 | var q = copy(query);
|
376 | q.name = serverId;
|
377 |
|
378 | if (q.topic) {
|
379 | var qt = querytopic.parse(query.topic);
|
380 | if (qt) {
|
381 | q.topic = querytopic.format(qt);
|
382 | }
|
383 | var client = new EventSocket(ws, q, { streamEnabled: false });
|
384 | self.eventBroker.client(client);
|
385 | }
|
386 | });
|
387 |
|
388 | function subscribeOnPeerConnect(e, obj) {
|
389 | var q = copy(query);
|
390 | q.name = obj.peer.name;
|
391 |
|
392 | if (q.topic) {
|
393 | var qt = querytopic.parse(query.topic);
|
394 | if (qt) {
|
395 | q.topic = querytopic.format(qt);
|
396 | }
|
397 |
|
398 | var client = new EventSocket(ws, q);
|
399 | self.eventBroker.client(client);
|
400 | }
|
401 | }
|
402 |
|
403 | ws.on('close', function() {
|
404 | self.zetta.pubsub.unsubscribe('_peer/connect', subscribeOnPeerConnect);
|
405 | });
|
406 |
|
407 | self.zetta.pubsub.subscribe('_peer/connect', subscribeOnPeerConnect);
|
408 | } else {
|
409 | var match = /^\/servers\/(.+)\/events/.exec(request.url);
|
410 | if(!match) {
|
411 | ws.close(1001);
|
412 | return;
|
413 | }
|
414 |
|
415 | var query = querystring.parse(url.parse(request.url).query);
|
416 | query.serverId = match[1];
|
417 |
|
418 | self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + query.serverId);
|
419 |
|
420 | var query = querystring.parse(url.parse(request.url).query);
|
421 | query.name = decodeURI(match[1]);
|
422 |
|
423 | if (query.topic) {
|
424 | var qt = querytopic.parse(query.topic);
|
425 | if (qt) {
|
426 | query.topic = querytopic.format(qt);
|
427 | }
|
428 | var client = new EventSocket(ws, query);
|
429 | self.eventBroker.client(client);
|
430 | }
|
431 | }
|
432 | };
|
433 |
|
434 | ZettaHttpServer.prototype.httpRegistration = function(handle) {
|
435 | handle('request', function(env, next) {
|
436 | if (!(env.request.method === 'POST' && env.request.url === '/registration')) {
|
437 | return next(env);
|
438 | }
|
439 |
|
440 | env.request.getBody(function(err, body) {
|
441 | body = JSON.parse(body.toString());
|
442 | var peer = self.peers[body.target];
|
443 |
|
444 | if (!peer.agent) {
|
445 | env.response.statusCode = 404;
|
446 | return next(env);
|
447 | }
|
448 |
|
449 | env.request.body = new Buffer(JSON.stringify(body.device));
|
450 | env.zettaAgent = peer.agent;
|
451 | next(env);
|
452 | });
|
453 | });
|
454 | };
|
455 |
|
456 | ZettaHttpServer.prototype.proxyToPeers = function(peers, env, cb) {
|
457 | var self = this;
|
458 |
|
459 | var req = env.request;
|
460 | var res = env.response;
|
461 | var protocol = getCurrentProtocol(req);
|
462 |
|
463 | var messageId = ++self.idCounter;
|
464 | self.clients[messageId] = res;
|
465 |
|
466 | var tasks = peers.map(function(p) {
|
467 | var name = p.id;
|
468 | return function(callback) {
|
469 |
|
470 | var reqUrl = req.templateUrl.replace('{{peerName}}', encodeURIComponent(name));
|
471 | var headers = {};
|
472 |
|
473 | Object.keys(req.headers).forEach(function(key) {
|
474 | headers[key] = req.headers[key];
|
475 | });
|
476 |
|
477 | if (!req.isSpdy) {
|
478 | headers['x-forwarded-proto'] = protocol;
|
479 | }
|
480 |
|
481 | var peer = self.peers[name];
|
482 | if (!peer || peer.state !== PeerSocket.CONNECTED){
|
483 | callback(null, { err: new Error('Peer does not exist.') });
|
484 | return;
|
485 | }
|
486 |
|
487 | var agent = env.zettaAgent || peer.agent;
|
488 |
|
489 | var opts = { method: req.method, headers: headers, path: reqUrl, agent: agent };
|
490 | var request = http.request(opts, function(response) {
|
491 | response.getBody(function(err, body) {
|
492 | callback(null, { res: response, err: err, body: body });
|
493 | });
|
494 | }).on('error', function(err) {
|
495 | return callback(null, { err: err });
|
496 | });
|
497 |
|
498 | if (req.body) {
|
499 | request.end(req.body);
|
500 | } else {
|
501 | req.pipe(request);
|
502 | }
|
503 | };
|
504 | });
|
505 |
|
506 | async.parallelLimit(tasks, 5, function(err, results) {
|
507 | cb(err, results, messageId);
|
508 | });
|
509 | };
|
510 |
|
511 | ZettaHttpServer.prototype.proxyToPeer = function(env, next) {
|
512 | var self = this;
|
513 | var req = env.request;
|
514 | var res = env.response;
|
515 |
|
516 | var parsed = url.parse(req.url);
|
517 | var name = decodeURIComponent(parsed.pathname.split('/')[2]);
|
518 |
|
519 | if (!req.isSpdy) {
|
520 | req.headers['x-forwarded-proto'] = getCurrentProtocol(req);
|
521 | }
|
522 |
|
523 | var peer = self.peers[name];
|
524 | if (!peer || peer.state !== PeerSocket.CONNECTED){
|
525 | res.statusCode = 404;
|
526 | res.end();
|
527 | return;
|
528 | }
|
529 |
|
530 | var agent = env.zettaAgent || peer.agent;
|
531 |
|
532 | var opts = {
|
533 | method: req.method,
|
534 | headers: req.headers,
|
535 | path: req.url,
|
536 | agent: agent,
|
537 | pipe: true
|
538 | };
|
539 | if (typeof env.proxyOpts === 'object') {
|
540 | Object.keys(env.proxyOpts).forEach(function(k) {
|
541 | opts[k] = env.proxyOpts[k];
|
542 | });
|
543 | }
|
544 |
|
545 | var request = http.request(opts, function(response) {
|
546 |
|
547 | Object.keys(response.headers).forEach(function(header) {
|
548 | res.setHeader(header, response.headers[header]);
|
549 | });
|
550 |
|
551 | res.statusCode = response.statusCode;
|
552 |
|
553 | if (!opts.pipe) {
|
554 | var body = null;
|
555 | var buf = [];
|
556 | var len = 0;
|
557 |
|
558 | response.on('readable', function() {
|
559 | var chunk;
|
560 |
|
561 | while ((chunk = response.read()) != null) {
|
562 | buf.push(chunk);
|
563 | len += chunk.length;
|
564 | }
|
565 |
|
566 | if (!buf.length) {
|
567 | return;
|
568 | }
|
569 |
|
570 | body = new Buffer(len);
|
571 | var i = 0;
|
572 | buf.forEach(function(chunk) {
|
573 | chunk.copy(body, i, 0, chunk.length);
|
574 | i += chunk.length;
|
575 | });
|
576 | });
|
577 |
|
578 | response.on('end', function() {
|
579 | env.response.body = body;
|
580 | next(env);
|
581 | });
|
582 | } else {
|
583 | env.response.body = response;
|
584 | next(env);
|
585 | }
|
586 | }).on('error', function(err) {
|
587 | env.response.statusCode = 502;
|
588 | return next(env);
|
589 | });
|
590 |
|
591 | if (req.body) {
|
592 | request.end(req.body);
|
593 | } else {
|
594 | req.pipe(request);
|
595 | }
|
596 | };
|