1 | var http = require('http');
|
2 | var path = require('path');
|
3 | var url = require('url');
|
4 | var querystring = require('querystring');
|
5 | var async = require('async');
|
6 | var spdy = require('spdy');
|
7 | var argo = require('argo');
|
8 | var titan = require('titan');
|
9 | var WebSocketServer = require('ws').Server;
|
10 | var SpdyAgent = require('./spdy_agent');
|
11 | var EventBroker = require('./event_broker');
|
12 | var PeerSocket = require('./peer_socket');
|
13 | var EventSocket = require('./event_socket');
|
14 | var Siren = require('argo-formatter-siren');
|
15 | var DevicesResource = require('./api_resources/devices');
|
16 | var PeersManagementResource = require('./api_resources/peer_management');
|
17 | var RootResource = require('./api_resources/root');
|
18 | var ServersResource = require('./api_resources/servers');
|
19 | var deviceFormatter = require('./api_formats/siren/device.siren');
|
20 | var rels = require('zetta-rels');
|
21 |
|
22 | var querytopic = require('./query_topic');
|
23 |
|
24 | var ZettaHttpServer = module.exports = function(zettaInstance, options) {
|
25 | var self = this;
|
26 | options = (typeof options === 'object') ? options : {};
|
27 | if(typeof options.useXForwardedHostHeader !== 'undefined') {
|
28 | this.useXForwardedHostHeader = options.useXForwardedHostHeader ? true : false;
|
29 | } else {
|
30 | this.useXForwardedHostHeader = true;
|
31 | }
|
32 | if(typeof options.useXForwardedPathHeader !== 'undefined') {
|
33 | this.useXForwardedPathHeader = options.useXForwardedPathHeader ? true : false;
|
34 | } else {
|
35 | this.useXForwardedPathHeader = true;
|
36 | }
|
37 | this.idCounter = 0;
|
38 | this.zetta = zettaInstance;
|
39 | this.peerRegistry = zettaInstance.peerRegistry;
|
40 | this.eventBroker = new EventBroker(zettaInstance);
|
41 | this.clients = {};
|
42 | this.peers = {};
|
43 | this.peerOptions = {};
|
44 |
|
45 | this._deviceQueries = [];
|
46 |
|
47 | this._collectors = {};
|
48 |
|
49 |
|
50 | this._wsHooks = {
|
51 | peerConnect: [],
|
52 | websocketConnect: []
|
53 | };
|
54 |
|
55 |
|
56 | var httpOptions = {
|
57 | windowSize: 1024 * 1024
|
58 | };
|
59 | var tlsCheckOptions = ['cert', 'key', 'pfx', 'ca'];
|
60 | var usingSSL = false;
|
61 | Object.keys(options).forEach(function(k) {
|
62 | httpOptions[k] = options[k];
|
63 | if (tlsCheckOptions.indexOf(k) > -1) {
|
64 | usingSSL = true;
|
65 | }
|
66 | });
|
67 |
|
68 |
|
69 | httpOptions.plain = (usingSSL) ? false : true;
|
70 | httpOptions.ssl = (usingSSL) ? true : false;
|
71 | this.server = spdy.createServer(httpOptions);
|
72 |
|
73 |
|
74 | this.spdyServer = spdy.createServer({
|
75 | windowSize: 1024 * 1024,
|
76 | plain: true,
|
77 | ssl: false
|
78 | });
|
79 |
|
80 | var ValidWSUrls = [
|
81 | /^\/events$/,
|
82 | /^\/events\?.+$/,
|
83 | /^\/servers\/(.+)\/events/,
|
84 | /^\/peers\/(.+)$/,
|
85 | /^\/peer-management$/,
|
86 | ];
|
87 |
|
88 | function match(request) {
|
89 | return ValidWSUrls.some(function(re) {
|
90 | return re.test(request.url);
|
91 | });
|
92 | }
|
93 |
|
94 | this.wss = new WebSocketServer({ noServer: true });
|
95 | this.server.on('upgrade', function(request, socket, headers) {
|
96 |
|
97 | var sendError = function(code) {
|
98 |
|
99 | var finish = function() {
|
100 | var responseLine = 'HTTP/1.1 ' + code + ' ' + http.STATUS_CODES[code] + '\r\n\r\n\r\n';
|
101 | socket.end(responseLine);
|
102 | };
|
103 |
|
104 | if (self.server.listeners('upgrade').length > 1) {
|
105 | var timer = setTimeout(function() {
|
106 | if (socket.bytesWritten === 0) {
|
107 | finish();
|
108 | }
|
109 | }, 5000);
|
110 | socket.on('close', function() {
|
111 | clearTimeout(timer);
|
112 | });
|
113 | } else {
|
114 | finish();
|
115 | }
|
116 | };
|
117 |
|
118 | if (/^\/peers\/(.+)$/.exec(request.url)) {
|
119 | async.eachSeries(self._wsHooks.peerConnect, function(handler, next) {
|
120 | return handler(request, socket, headers, next);
|
121 | }, function(err) {
|
122 | if (err) {
|
123 | return sendError(500);
|
124 | }
|
125 |
|
126 |
|
127 | self.wss.handleUpgrade(request, socket, headers, function(ws) {
|
128 | self.setupPeerSocket(ws);
|
129 | });
|
130 | });
|
131 | } else if (match(request)) {
|
132 | async.eachSeries(self._wsHooks.websocketConnect, function(handler, next) {
|
133 | return handler(request, socket, headers, next);
|
134 | }, function(err) {
|
135 | if (err) {
|
136 | return sendError(500);
|
137 | }
|
138 |
|
139 | self.wss.handleUpgrade(request, socket, headers, function(ws) {
|
140 | if (ws.upgradeReq.url === '/peer-management') {
|
141 | var query = [
|
142 | { name: self.zetta.id, topic: '_peer/connect' },
|
143 | { name: self.zetta.id, topic: '_peer/disconnect' }];
|
144 |
|
145 | var client = new EventSocket(ws, query);
|
146 | self.eventBroker.client(client);
|
147 | } else {
|
148 | self.setupEventSocket(ws);
|
149 | }
|
150 | });
|
151 | });
|
152 | } else {
|
153 |
|
154 | sendError(404);
|
155 | }
|
156 |
|
157 | });
|
158 |
|
159 | var titanOpts = {
|
160 | useXForwardedHostHeader: this.useXForwardedHostHeader,
|
161 | useXForwardedPathHeader: this.useXForwardedPathHeader
|
162 | };
|
163 | this.cloud = titan(titanOpts)
|
164 | .format({ engines: [Siren], override: { 'application/json': Siren }, directory: path.join(__dirname, './api_formats') })
|
165 | .add(RootResource, zettaInstance)
|
166 | .add(PeersManagementResource, zettaInstance)
|
167 | .add(DevicesResource, zettaInstance)
|
168 | .add(ServersResource, zettaInstance)
|
169 | .allow({
|
170 | methods: ['DELETE', 'PUT', 'PATCH', 'POST'],
|
171 | origins: ['*'],
|
172 | headers: ['accept', 'content-type'],
|
173 | maxAge: '432000'
|
174 | })
|
175 | .use(function(handle) {
|
176 | handle('request', function(env, next) {
|
177 | if (env.request.method === 'OPTIONS') {
|
178 | env.argo._routed = true;
|
179 | }
|
180 | next(env);
|
181 | });
|
182 | })
|
183 | .use(function(handle) {
|
184 | handle('request', function(env, next) {
|
185 |
|
186 | if (!(/^\/_initiate_peer\/(.+)$/.exec(env.request.url)) ) {
|
187 | next(env);
|
188 | }
|
189 | });
|
190 | });
|
191 | };
|
192 |
|
193 | ZettaHttpServer.prototype.init = function(cb) {
|
194 | var self = this;
|
195 |
|
196 |
|
197 | this.cloud = this.cloud.use(this.httpRegistration.bind(this));
|
198 |
|
199 |
|
200 |
|
201 | this.cloud = this.cloud.build();
|
202 |
|
203 | this.server.on('request', this.cloud.run);
|
204 | this.spdyServer.on('request', this.cloud.run);
|
205 |
|
206 | if (cb) {
|
207 | cb();
|
208 | }
|
209 | };
|
210 |
|
211 | ZettaHttpServer.prototype.listen = function() {
|
212 | this.server.listen.apply(this.server,arguments);
|
213 | return this;
|
214 | };
|
215 |
|
216 |
|
217 | ZettaHttpServer.prototype.onPeerConnect = function(handler) {
|
218 | if (typeof handler !== 'function') {
|
219 | throw new Error('Must supply function as a hook');
|
220 | }
|
221 | this._wsHooks.peerConnect.push(handler);
|
222 | };
|
223 |
|
224 | ZettaHttpServer.prototype.onEventWebsocketConnect = function(handler) {
|
225 | if (typeof handler !== 'function') {
|
226 | throw new Error('Must supply function as a hook');
|
227 | }
|
228 | this._wsHooks.websocketConnect.push(handler);
|
229 | };
|
230 |
|
231 | ZettaHttpServer.prototype.collector = function(name, collector) {
|
232 | if(typeof name === 'function'){
|
233 | collector = name;
|
234 | name = '_logs';
|
235 | }
|
236 |
|
237 | if(!this._collectors[name]) {
|
238 | this._collectors[name] = [];
|
239 | }
|
240 |
|
241 | this._collectors[name].push(collector);
|
242 | return this;
|
243 | };
|
244 |
|
245 | function getCurrentProtocol(req) {
|
246 | var xfp = req.headers['x-forwarded-proto'];
|
247 | var protocol;
|
248 |
|
249 | if (xfp && xfp.length) {
|
250 | protocol = xfp.replace(/\s*/, '').split(',')[0];
|
251 | } else {
|
252 | protocol = req.connection.encrypted ? 'https' : 'http';
|
253 | }
|
254 |
|
255 | return protocol;
|
256 | }
|
257 |
|
258 | ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
|
259 | ws._env = { helpers: {}};
|
260 | ws._loader = { path: p };
|
261 |
|
262 | ws._env.uri = function() {
|
263 | var protocol = getCurrentProtocol(ws.upgradeReq);
|
264 |
|
265 | if (!host) {
|
266 | var address = ws.upgradeReq.connection.address();
|
267 | host = address.address;
|
268 | if (address.port) {
|
269 | if (!(protocol === 'https' && address.port === 443) &&
|
270 | !(protocol === 'http' && address.port === 80)) {
|
271 | host += ':' + address.port
|
272 | }
|
273 | }
|
274 | }
|
275 |
|
276 | return protocol + '://' + path.join(host, ws.upgradeReq.url);
|
277 | };
|
278 |
|
279 | ws._env.helpers.url = {};
|
280 | ws._env.helpers.url.path = function(pathname) {
|
281 | var parsed = url.parse(ws._env.uri());
|
282 | parsed.search = null;
|
283 | parsed.pathname = pathname;
|
284 | return url.format(parsed);
|
285 | };
|
286 | };
|
287 |
|
288 | ZettaHttpServer.prototype.setupPeerSocket = function(ws) {
|
289 | var self = this;
|
290 | var name = /^\/peers\/(.+)$/.exec(url.parse(ws.upgradeReq.url, true).pathname)[1];
|
291 | name = decodeURI(name);
|
292 | self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer "' + name + '" established.');
|
293 |
|
294 |
|
295 | var host = ws.upgradeReq.headers['host']
|
296 | self.wireUpWebSocketForEvent(ws, host, '/servers/' + name);
|
297 |
|
298 | if (self.peers[name] && self.peers[name].state !== PeerSocket.DISCONNECTED) {
|
299 |
|
300 | ws.close(4000, 'peer already connected');
|
301 | } else if (self.peers[name]) {
|
302 |
|
303 | self.peers[name].init(ws);
|
304 | } else {
|
305 | var peer = new PeerSocket(ws, name, self.peerRegistry, self.peerOptions);
|
306 | self.peers[name] = peer;
|
307 |
|
308 |
|
309 | peer.on('zetta-events', function(topic, data) {
|
310 | self.zetta.pubsub.publish(name + '/' + topic, data, true);
|
311 | });
|
312 |
|
313 | peer.on('connected', function() {
|
314 | self.eventBroker.peer(peer);
|
315 | self.zetta.log.emit('log', 'http_server', 'Peer connection established "' + name + '".');
|
316 | self.zetta.pubsub.publish('_peer/connect', { peer: peer });
|
317 | });
|
318 |
|
319 | peer.on('error', function(err) {
|
320 | self.zetta.log.emit('log', 'http_server', 'Peer connection failed for "' + name + '": ' + err.message + '.');
|
321 | self.zetta.pubsub.publish('_peer/disconnect', { peer: peer, err: err });
|
322 | });
|
323 |
|
324 | peer.on('end', function() {
|
325 | self.zetta.log.emit('log', 'http_server', 'Peer connection closed for "' + name + '".');
|
326 | self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
|
327 | });
|
328 | }
|
329 | };
|
330 |
|
331 | ZettaHttpServer.prototype.setupEventSocket = function(ws) {
|
332 | var self = this;
|
333 | var host = ws.upgradeReq.headers['host'];
|
334 |
|
335 | if (/^\/events/.exec(ws.upgradeReq.url)) {
|
336 | self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name);
|
337 | var parsed = url.parse(ws.upgradeReq.url, true);
|
338 | var query = parsed.query;
|
339 |
|
340 | if(!query.topic) {
|
341 | var client = new EventSocket(ws, null, { streamEnabled: true, filterMultiple: !!(query.filterMultiple) });
|
342 | self.eventBroker.client(client);
|
343 | return;
|
344 | }
|
345 |
|
346 | function copy(q) {
|
347 | var c = {};
|
348 | Object.keys(q).forEach(function(k) {
|
349 | c[k] = q[k];
|
350 | });
|
351 |
|
352 | return c;
|
353 | }
|
354 |
|
355 | [self.zetta._name].concat(Object.keys(self.peers)).forEach(function(serverId) {
|
356 | var q = copy(query);
|
357 | q.name = serverId;
|
358 |
|
359 | if (q.topic) {
|
360 | var qt = querytopic.parse(query.topic);
|
361 | if (qt) {
|
362 | q.topic = querytopic.format(qt);
|
363 | }
|
364 | var client = new EventSocket(ws, q, { streamEnabled: false });
|
365 | self.eventBroker.client(client);
|
366 | }
|
367 | });
|
368 |
|
369 | function subscribeOnPeerConnect(e, obj) {
|
370 | var q = copy(query);
|
371 | q.name = obj.peer.name;
|
372 |
|
373 | if (q.topic) {
|
374 | var qt = querytopic.parse(query.topic);
|
375 | if (qt) {
|
376 | q.topic = querytopic.format(qt);
|
377 | }
|
378 |
|
379 | var client = new EventSocket(ws, q);
|
380 | self.eventBroker.client(client);
|
381 | }
|
382 | }
|
383 |
|
384 | ws.on('close', function() {
|
385 | self.zetta.pubsub.unsubscribe('_peer/connect', subscribeOnPeerConnect);
|
386 | });
|
387 |
|
388 | self.zetta.pubsub.subscribe('_peer/connect', subscribeOnPeerConnect);
|
389 | } else {
|
390 | var match = /^\/servers\/(.+)\/events/.exec(ws.upgradeReq.url);
|
391 | if(!match) {
|
392 | ws.close(1001);
|
393 | return;
|
394 | }
|
395 |
|
396 | var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
|
397 | query.serverId = match[1];
|
398 |
|
399 | self.wireUpWebSocketForEvent(ws, host, '/servers/' + query.serverId);
|
400 |
|
401 | var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
|
402 | query.name = decodeURI(match[1]);
|
403 |
|
404 | if (query.topic) {
|
405 | var qt = querytopic.parse(query.topic);
|
406 | if (qt) {
|
407 | query.topic = querytopic.format(qt);
|
408 | }
|
409 | var client = new EventSocket(ws, query);
|
410 | self.eventBroker.client(client);
|
411 | }
|
412 | }
|
413 | };
|
414 |
|
415 | ZettaHttpServer.prototype.httpRegistration = function(handle) {
|
416 | handle('request', function(env, next) {
|
417 | if (!(env.request.method === 'POST' && env.request.url === '/registration')) {
|
418 | return next(env);
|
419 | }
|
420 |
|
421 | env.request.getBody(function(err, body) {
|
422 | body = JSON.parse(body.toString());
|
423 | var peer = self.peers[body.target];
|
424 |
|
425 | if (!peer.agent) {
|
426 | env.response.statusCode = 404;
|
427 | return next(env);
|
428 | }
|
429 |
|
430 | env.request.body = new Buffer(JSON.stringify(body.device));
|
431 | env.zettaAgent = peer.agent;
|
432 | next(env);
|
433 | });
|
434 | });
|
435 | };
|
436 |
|
437 | ZettaHttpServer.prototype.proxyToPeers = function(peers, env, cb) {
|
438 | var self = this;
|
439 |
|
440 | var req = env.request;
|
441 | var res = env.response;
|
442 | var protocol = getCurrentProtocol(req);
|
443 |
|
444 | var messageId = ++self.idCounter;
|
445 | self.clients[messageId] = res;
|
446 |
|
447 | var tasks = peers.map(function(p) {
|
448 | var name = p.id;
|
449 | return function(callback) {
|
450 |
|
451 | var reqUrl = req.templateUrl.replace('{{peerName}}', encodeURIComponent(name));
|
452 | var headers = {};
|
453 |
|
454 | Object.keys(req.headers).forEach(function(key) {
|
455 | headers[key] = req.headers[key];
|
456 | });
|
457 |
|
458 | if (!req.isSpdy) {
|
459 | headers['x-forwarded-proto'] = protocol;
|
460 | }
|
461 |
|
462 | var peer = self.peers[name];
|
463 | if (!peer || peer.state !== PeerSocket.CONNECTED){
|
464 | callback(null, { err: new Error('Peer does not exist.') });
|
465 | return;
|
466 | }
|
467 |
|
468 | var agent = env.zettaAgent || peer.agent;
|
469 |
|
470 | var opts = { method: req.method, headers: headers, path: reqUrl, agent: agent };
|
471 | var request = http.request(opts, function(response) {
|
472 | response.getBody(function(err, body) {
|
473 | callback(null, { res: response, err: err, body: body });
|
474 | });
|
475 | }).on('error', function(err) {
|
476 | return callback(null, { err: err });
|
477 | });
|
478 |
|
479 | if (req.body) {
|
480 | request.end(req.body);
|
481 | } else {
|
482 | req.pipe(request);
|
483 | }
|
484 | };
|
485 | });
|
486 |
|
487 | async.parallelLimit(tasks, 5, function(err, results) {
|
488 | cb(err, results, messageId);
|
489 | });
|
490 | };
|
491 |
|
492 | ZettaHttpServer.prototype.proxyToPeer = function(env, next) {
|
493 | var self = this;
|
494 | var req = env.request;
|
495 | var res = env.response;
|
496 |
|
497 | var parsed = url.parse(req.url);
|
498 | var name = decodeURIComponent(parsed.pathname.split('/')[2]);
|
499 |
|
500 | if (!req.isSpdy) {
|
501 | req.headers['x-forwarded-proto'] = getCurrentProtocol(req);
|
502 | }
|
503 |
|
504 | var peer = self.peers[name];
|
505 | if (!peer || peer.state !== PeerSocket.CONNECTED){
|
506 | res.statusCode = 404;
|
507 | res.end();
|
508 | return;
|
509 | }
|
510 |
|
511 | var agent = env.zettaAgent || peer.agent;
|
512 |
|
513 | var opts = {
|
514 | method: req.method,
|
515 | headers: req.headers,
|
516 | path: req.url,
|
517 | agent: agent,
|
518 | pipe: true
|
519 | };
|
520 | if (typeof env.proxyOpts === 'object') {
|
521 | Object.keys(env.proxyOpts).forEach(function(k) {
|
522 | opts[k] = env.proxyOpts[k];
|
523 | });
|
524 | }
|
525 |
|
526 | var request = http.request(opts, function(response) {
|
527 |
|
528 | Object.keys(response.headers).forEach(function(header) {
|
529 | res.setHeader(header, response.headers[header]);
|
530 | });
|
531 |
|
532 | res.statusCode = response.statusCode;
|
533 |
|
534 | if (!opts.pipe) {
|
535 | var body = null;
|
536 | var buf = [];
|
537 | var len = 0;
|
538 |
|
539 | response.on('readable', function() {
|
540 | var chunk;
|
541 |
|
542 | while ((chunk = response.read()) != null) {
|
543 | buf.push(chunk);
|
544 | len += chunk.length;
|
545 | }
|
546 |
|
547 | if (!buf.length) {
|
548 | return;
|
549 | }
|
550 |
|
551 | body = new Buffer(len);
|
552 | var i = 0;
|
553 | buf.forEach(function(chunk) {
|
554 | chunk.copy(body, i, 0, chunk.length);
|
555 | i += chunk.length;
|
556 | });
|
557 | });
|
558 |
|
559 | response.on('end', function() {
|
560 | env.response.body = body;
|
561 | next(env);
|
562 | });
|
563 | } else {
|
564 | env.response.body = response;
|
565 | next(env);
|
566 | }
|
567 | }).on('error', function(err) {
|
568 | env.response.statusCode = 502;
|
569 | return next(env);
|
570 | });
|
571 |
|
572 | if (req.body) {
|
573 | request.end(req.body);
|
574 | } else {
|
575 | req.pipe(request);
|
576 | }
|
577 | };
|