1 | var http = require('http');
|
2 | var path = require('path');
|
3 | var url = require('url');
|
4 | var querystring = require('querystring');
|
5 | var async = require('async');
|
6 | var spdy = require('spdy');
|
7 | var argo = require('argo');
|
8 | var titan = require('titan');
|
9 | var WebSocketServer = require('ws').Server;
|
10 | var SpdyAgent = require('./spdy_agent');
|
11 | var EventBroker = require('./event_broker');
|
12 | var PeerSocket = require('./peer_socket');
|
13 | var EventSocket = require('./event_socket');
|
14 | var Siren = require('argo-formatter-siren');
|
15 | var DevicesResource = require('./api_resources/devices');
|
16 | var PeersManagementResource = require('./api_resources/peer_management');
|
17 | var RootResource = require('./api_resources/root');
|
18 | var ServersResource = require('./api_resources/servers');
|
19 | var deviceFormatter = require('./api_formats/siren/device.siren');
|
20 | var rels = require('zetta-rels');
|
21 |
|
22 | var querytopic = require('./query_topic');
|
23 |
|
24 | var ZettaHttpServer = module.exports = function(zettaInstance, options) {
|
25 | var self = this;
|
26 | options = (typeof options === 'object') ? options : {};
|
27 | if(typeof options.useXForwardedHostHeader !== 'undefined') {
|
28 | this.useXForwardedHostHeader = options.useXForwardedHostHeader ? true : false;
|
29 | } else {
|
30 | this.useXForwardedHostHeader = true;
|
31 | }
|
32 | this.idCounter = 0;
|
33 | this.zetta = zettaInstance;
|
34 | this.peerRegistry = zettaInstance.peerRegistry;
|
35 | this.eventBroker = new EventBroker(zettaInstance);
|
36 | this.clients = {};
|
37 | this.peers = {};
|
38 |
|
39 | this._deviceQueries = [];
|
40 |
|
41 | this._collectors = {};
|
42 |
|
43 |
|
44 | this._wsHooks = {
|
45 | peerConnect: [],
|
46 | websocketConnect: []
|
47 | };
|
48 |
|
49 |
|
50 | var httpOptions = {
|
51 | windowSize: 1024 * 1024
|
52 | };
|
53 | var tlsCheckOptions = ['cert', 'key', 'pfx', 'ca'];
|
54 | var usingSSL = false;
|
55 | Object.keys(options).forEach(function(k) {
|
56 | httpOptions[k] = options[k];
|
57 | if (tlsCheckOptions.indexOf(k) > -1) {
|
58 | usingSSL = true;
|
59 | }
|
60 | });
|
61 |
|
62 |
|
63 | httpOptions.plain = (usingSSL) ? false : true;
|
64 | httpOptions.ssl = (usingSSL) ? true : false;
|
65 | this.server = spdy.createServer(httpOptions);
|
66 |
|
67 |
|
68 | this.spdyServer = spdy.createServer({
|
69 | windowSize: 1024 * 1024,
|
70 | plain: true,
|
71 | ssl: false
|
72 | });
|
73 |
|
74 | var ValidWSUrls = [
|
75 | /^\/events$/,
|
76 | /^\/events\?.+$/,
|
77 | /^\/servers\/(.+)\/events/,
|
78 | /^\/peers\/(.+)$/,
|
79 | /^\/peer-management$/,
|
80 | ];
|
81 |
|
82 | function match(request) {
|
83 | return ValidWSUrls.some(function(re) {
|
84 | return re.test(request.url);
|
85 | });
|
86 | }
|
87 |
|
88 | this.wss = new WebSocketServer({ noServer: true });
|
89 | this.server.on('upgrade', function(request, socket, headers) {
|
90 |
|
91 | var sendError = function(code) {
|
92 |
|
93 | var finish = function() {
|
94 | var responseLine = 'HTTP/1.1 ' + code + ' ' + http.STATUS_CODES[code] + '\r\n\r\n\r\n';
|
95 | socket.end(responseLine);
|
96 | };
|
97 |
|
98 | if (self.server.listeners('upgrade').length > 1) {
|
99 | var timer = setTimeout(function() {
|
100 | if (socket.bytesWritten === 0) {
|
101 | finish();
|
102 | }
|
103 | }, 5000);
|
104 | socket.on('close', function() {
|
105 | clearTimeout(timer);
|
106 | });
|
107 | } else {
|
108 | finish();
|
109 | }
|
110 | };
|
111 |
|
112 | if (/^\/peers\/(.+)$/.exec(request.url)) {
|
113 | async.eachSeries(self._wsHooks.peerConnect, function(handler, next) {
|
114 | return handler(request, socket, headers, next);
|
115 | }, function(err) {
|
116 | if (err) {
|
117 | return sendError(500);
|
118 | }
|
119 |
|
120 |
|
121 | self.wss.handleUpgrade(request, socket, headers, function(ws) {
|
122 | self.setupPeerSocket(ws);
|
123 | });
|
124 | });
|
125 | } else if (match(request)) {
|
126 | async.eachSeries(self._wsHooks.websocketConnect, function(handler, next) {
|
127 | return handler(request, socket, headers, next);
|
128 | }, function(err) {
|
129 | if (err) {
|
130 | return sendError(500);
|
131 | }
|
132 |
|
133 | self.wss.handleUpgrade(request, socket, headers, function(ws) {
|
134 | if (ws.upgradeReq.url === '/peer-management') {
|
135 | var query = [
|
136 | { name: self.zetta.id, topic: '_peer/connect' },
|
137 | { name: self.zetta.id, topic: '_peer/disconnect' }];
|
138 |
|
139 | var client = new EventSocket(ws, query);
|
140 | self.eventBroker.client(client);
|
141 | } else {
|
142 | self.setupEventSocket(ws);
|
143 | }
|
144 | });
|
145 | });
|
146 | } else {
|
147 |
|
148 | sendError(404);
|
149 | }
|
150 |
|
151 | });
|
152 |
|
153 | this.cloud = titan({useXForwardedHostHeader: this.useXForwardedHostHeader})
|
154 | .format({ engines: [Siren], override: { 'application/json': Siren }, directory: path.join(__dirname, './api_formats') })
|
155 | .add(RootResource, zettaInstance)
|
156 | .add(PeersManagementResource, zettaInstance)
|
157 | .add(DevicesResource, zettaInstance)
|
158 | .add(ServersResource, zettaInstance)
|
159 | .allow({
|
160 | methods: ['DELETE', 'PUT', 'PATCH', 'POST'],
|
161 | origins: ['*'],
|
162 | headers: ['accept', 'content-type'],
|
163 | maxAge: '432000'
|
164 | })
|
165 | .use(function(handle) {
|
166 | handle('request', function(env, next) {
|
167 | if (env.request.method === 'OPTIONS') {
|
168 | env.argo._routed = true;
|
169 | }
|
170 | next(env);
|
171 | });
|
172 | })
|
173 | .use(function(handle) {
|
174 | handle('request', function(env, next) {
|
175 |
|
176 | if (!(/^\/_initiate_peer\/(.+)$/.exec(env.request.url)) ) {
|
177 | next(env);
|
178 | }
|
179 | });
|
180 | });
|
181 | };
|
182 |
|
183 | ZettaHttpServer.prototype.init = function(cb) {
|
184 | var self = this;
|
185 |
|
186 |
|
187 | this.cloud = this.cloud.use(this.httpRegistration.bind(this));
|
188 |
|
189 |
|
190 |
|
191 | this.cloud = this.cloud.build();
|
192 |
|
193 | this.server.on('request', this.cloud.run);
|
194 | this.spdyServer.on('request', this.cloud.run);
|
195 |
|
196 | if (cb) {
|
197 | cb();
|
198 | }
|
199 | };
|
200 |
|
201 | ZettaHttpServer.prototype.listen = function() {
|
202 | this.server.listen.apply(this.server,arguments);
|
203 | return this;
|
204 | };
|
205 |
|
206 |
|
207 | ZettaHttpServer.prototype.onPeerConnect = function(handler) {
|
208 | if (typeof handler !== 'function') {
|
209 | throw new Error('Must supply function as a hook');
|
210 | }
|
211 | this._wsHooks.peerConnect.push(handler);
|
212 | };
|
213 |
|
214 | ZettaHttpServer.prototype.onEventWebsocketConnect = function(handler) {
|
215 | if (typeof handler !== 'function') {
|
216 | throw new Error('Must supply function as a hook');
|
217 | }
|
218 | this._wsHooks.websocketConnect.push(handler);
|
219 | };
|
220 |
|
221 | ZettaHttpServer.prototype.collector = function(name, collector) {
|
222 | if(typeof name === 'function'){
|
223 | collector = name;
|
224 | name = '_logs';
|
225 | }
|
226 |
|
227 | if(!this._collectors[name]) {
|
228 | this._collectors[name] = [];
|
229 | }
|
230 |
|
231 | this._collectors[name].push(collector);
|
232 | return this;
|
233 | };
|
234 |
|
235 | function getCurrentProtocol(req) {
|
236 | var xfp = req.headers['x-forwarded-proto'];
|
237 | var protocol;
|
238 |
|
239 | if (xfp && xfp.length) {
|
240 | protocol = xfp.replace(/\s*/, '').split(',')[0];
|
241 | } else {
|
242 | protocol = req.connection.encrypted ? 'https' : 'http';
|
243 | }
|
244 |
|
245 | return protocol;
|
246 | }
|
247 |
|
248 | ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
|
249 | ws._env = { helpers: {}};
|
250 | ws._loader = { path: p };
|
251 |
|
252 | ws._env.uri = function() {
|
253 | var protocol = getCurrentProtocol(ws.upgradeReq);
|
254 |
|
255 | if (!host) {
|
256 | var address = ws.upgradeReq.connection.address();
|
257 | host = address.address;
|
258 | if (address.port) {
|
259 | if (!(protocol === 'https' && address.port === 443) &&
|
260 | !(protocol === 'http' && address.port === 80)) {
|
261 | host += ':' + address.port
|
262 | }
|
263 | }
|
264 | }
|
265 |
|
266 | return protocol + '://' + path.join(host, ws.upgradeReq.url);
|
267 | };
|
268 |
|
269 | ws._env.helpers.url = {};
|
270 | ws._env.helpers.url.path = function(pathname) {
|
271 | var parsed = url.parse(ws._env.uri());
|
272 | parsed.search = null;
|
273 | parsed.pathname = pathname;
|
274 | return url.format(parsed);
|
275 | };
|
276 | };
|
277 |
|
278 | ZettaHttpServer.prototype.setupPeerSocket = function(ws) {
|
279 | var self = this;
|
280 | var name = /^\/peers\/(.+)$/.exec(url.parse(ws.upgradeReq.url, true).pathname)[1];
|
281 | name = decodeURI(name);
|
282 | self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer "' + name + '" established.');
|
283 |
|
284 |
|
285 | var host = ws.upgradeReq.headers['host']
|
286 | self.wireUpWebSocketForEvent(ws, host, '/servers/' + name);
|
287 |
|
288 | if (self.peers[name] && self.peers[name].state !== PeerSocket.DISCONNECTED) {
|
289 |
|
290 | ws.close(4000, 'peer already connected');
|
291 | } else if (self.peers[name]) {
|
292 |
|
293 | self.peers[name].init(ws);
|
294 | } else {
|
295 | var peer = new PeerSocket(ws, name, self.peerRegistry);
|
296 | self.peers[name] = peer;
|
297 |
|
298 |
|
299 | peer.on('zetta-events', function(topic, data) {
|
300 | self.zetta.pubsub.publish(name + '/' + topic, data, true);
|
301 | });
|
302 |
|
303 | peer.on('connected', function() {
|
304 | self.eventBroker.peer(peer);
|
305 | self.zetta.log.emit('log', 'http_server', 'Peer connection established "' + name + '".');
|
306 | self.zetta.pubsub.publish('_peer/connect', { peer: peer });
|
307 | });
|
308 |
|
309 | peer.on('error', function(err) {
|
310 | self.zetta.log.emit('log', 'http_server', 'Peer connection failed for "' + name + '".');
|
311 | self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
|
312 | });
|
313 |
|
314 | peer.on('end', function() {
|
315 | self.zetta.log.emit('log', 'http_server', 'Peer connection closed for "' + name + '".');
|
316 | self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
|
317 | });
|
318 | }
|
319 | };
|
320 |
|
321 | ZettaHttpServer.prototype.setupEventSocket = function(ws) {
|
322 | var self = this;
|
323 | var host = ws.upgradeReq.headers['host'];
|
324 |
|
325 | if (/^\/events/.exec(ws.upgradeReq.url)) {
|
326 | self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name);
|
327 | var queryString = url.parse(ws.upgradeReq.url).query;
|
328 |
|
329 | if(!queryString) {
|
330 | var client = new EventSocket(ws, null, true);
|
331 | self.eventBroker.client(client);
|
332 | return;
|
333 | }
|
334 |
|
335 | var query = querystring.parse(queryString);
|
336 |
|
337 | function copy(q) {
|
338 | var c = {};
|
339 | Object.keys(q).forEach(function(k) {
|
340 | c[k] = q[k];
|
341 | });
|
342 |
|
343 | return c;
|
344 | }
|
345 |
|
346 | [self.zetta._name].concat(Object.keys(self.peers)).forEach(function(serverId) {
|
347 | var q = copy(query);
|
348 | q.name = serverId;
|
349 |
|
350 | if (q.topic) {
|
351 | var qt = querytopic.parse(query.topic);
|
352 | if (qt) {
|
353 | q.topic = querytopic.format(qt);
|
354 | }
|
355 | var client = new EventSocket(ws, q, false);
|
356 | self.eventBroker.client(client);
|
357 | }
|
358 | });
|
359 |
|
360 | function subscribeOnPeerConnect(e, obj) {
|
361 | var q = copy(query);
|
362 | q.name = obj.peer.name;
|
363 |
|
364 | if (q.topic) {
|
365 | var qt = querytopic.parse(query.topic);
|
366 | if (qt) {
|
367 | q.topic = querytopic.format(qt);
|
368 | }
|
369 |
|
370 | var client = new EventSocket(ws, q);
|
371 | self.eventBroker.client(client);
|
372 | }
|
373 | }
|
374 |
|
375 | ws.on('close', function() {
|
376 | self.zetta.pubsub.unsubscribe('_peer/connect', subscribeOnPeerConnect);
|
377 | });
|
378 |
|
379 | self.zetta.pubsub.subscribe('_peer/connect', subscribeOnPeerConnect);
|
380 | } else {
|
381 | var match = /^\/servers\/(.+)\/events/.exec(ws.upgradeReq.url);
|
382 | if(!match) {
|
383 | ws.close(1001);
|
384 | return;
|
385 | }
|
386 |
|
387 | var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
|
388 | query.serverId = match[1];
|
389 |
|
390 | self.wireUpWebSocketForEvent(ws, host, '/servers/' + query.serverId);
|
391 |
|
392 | var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
|
393 | query.name = decodeURI(match[1]);
|
394 |
|
395 | if (query.topic) {
|
396 | var qt = querytopic.parse(query.topic);
|
397 | if (qt) {
|
398 | query.topic = querytopic.format(qt);
|
399 | }
|
400 | var client = new EventSocket(ws, query);
|
401 | self.eventBroker.client(client);
|
402 | }
|
403 | }
|
404 | };
|
405 |
|
406 | ZettaHttpServer.prototype.httpRegistration = function(handle) {
|
407 | handle('request', function(env, next) {
|
408 | if (!(env.request.method === 'POST' && env.request.url === '/registration')) {
|
409 | return next(env);
|
410 | }
|
411 |
|
412 | env.request.getBody(function(err, body) {
|
413 | body = JSON.parse(body.toString());
|
414 | var peer = self.peers[body.target];
|
415 |
|
416 | if (!peer.agent) {
|
417 | env.response.statusCode = 404;
|
418 | return next(env);
|
419 | }
|
420 |
|
421 | env.request.body = new Buffer(JSON.stringify(body.device));
|
422 | env.zettaAgent = peer.agent;
|
423 | next(env);
|
424 | });
|
425 | });
|
426 | };
|
427 |
|
428 | ZettaHttpServer.prototype.proxyToPeers = function(peers, env, cb) {
|
429 | var self = this;
|
430 |
|
431 | var req = env.request;
|
432 | var res = env.response;
|
433 | var protocol = getCurrentProtocol(req);
|
434 |
|
435 | var messageId = ++self.idCounter;
|
436 | self.clients[messageId] = res;
|
437 |
|
438 | var tasks = peers.map(function(p) {
|
439 | var name = p.id;
|
440 | return function(callback) {
|
441 |
|
442 | var reqUrl = req.templateUrl.replace('{{peerName}}', encodeURIComponent(name));
|
443 | var headers = {};
|
444 |
|
445 | Object.keys(req.headers).forEach(function(key) {
|
446 | headers[key] = req.headers[key];
|
447 | });
|
448 |
|
449 | if (!req.isSpdy) {
|
450 | headers['x-forwarded-proto'] = protocol;
|
451 | }
|
452 |
|
453 | var peer = self.peers[name];
|
454 | if (!peer || peer.state !== PeerSocket.CONNECTED){
|
455 | callback(null, { err: new Error('Peer does not exist.') });
|
456 | return;
|
457 | }
|
458 |
|
459 | var agent = env.zettaAgent || peer.agent;
|
460 |
|
461 | var opts = { method: req.method, headers: headers, path: reqUrl, agent: agent };
|
462 | var request = http.request(opts, function(response) {
|
463 | response.getBody(function(err, body) {
|
464 | callback(null, { res: response, err: err, body: body });
|
465 | });
|
466 | }).on('error', function(err) {
|
467 | return callback(null, { err: err });
|
468 | });
|
469 |
|
470 | if (req.body) {
|
471 | request.end(req.body);
|
472 | } else {
|
473 | req.pipe(request);
|
474 | }
|
475 | };
|
476 | });
|
477 |
|
478 | async.parallelLimit(tasks, 5, function(err, results) {
|
479 | cb(err, results, messageId);
|
480 | });
|
481 | };
|
482 |
|
483 | ZettaHttpServer.prototype.proxyToPeer = function(env, next) {
|
484 | var self = this;
|
485 | var req = env.request;
|
486 | var res = env.response;
|
487 |
|
488 | var parsed = url.parse(req.url);
|
489 | var name = decodeURIComponent(parsed.pathname.split('/')[2]);
|
490 |
|
491 | if (!req.isSpdy) {
|
492 | req.headers['x-forwarded-proto'] = getCurrentProtocol(req);
|
493 | }
|
494 |
|
495 | var peer = self.peers[name];
|
496 | if (!peer || peer.state !== PeerSocket.CONNECTED){
|
497 | res.statusCode = 404;
|
498 | res.end();
|
499 | return;
|
500 | }
|
501 |
|
502 | var agent = env.zettaAgent || peer.agent;
|
503 |
|
504 | var opts = {
|
505 | method: req.method,
|
506 | headers: req.headers,
|
507 | path: req.url,
|
508 | agent: agent,
|
509 | pipe: true
|
510 | };
|
511 | if (typeof env.proxyOpts === 'object') {
|
512 | Object.keys(env.proxyOpts).forEach(function(k) {
|
513 | opts[k] = env.proxyOpts[k];
|
514 | });
|
515 | }
|
516 |
|
517 | var request = http.request(opts, function(response) {
|
518 |
|
519 | Object.keys(response.headers).forEach(function(header) {
|
520 | res.setHeader(header, response.headers[header]);
|
521 | });
|
522 |
|
523 | res.statusCode = response.statusCode;
|
524 |
|
525 | if (!opts.pipe) {
|
526 | var body = null;
|
527 | var buf = [];
|
528 | var len = 0;
|
529 |
|
530 | response.on('readable', function() {
|
531 | var chunk;
|
532 |
|
533 | while ((chunk = response.read()) != null) {
|
534 | buf.push(chunk);
|
535 | len += chunk.length;
|
536 | }
|
537 |
|
538 | if (!buf.length) {
|
539 | return;
|
540 | }
|
541 |
|
542 | body = new Buffer(len);
|
543 | var i = 0;
|
544 | buf.forEach(function(chunk) {
|
545 | chunk.copy(body, i, 0, chunk.length);
|
546 | i += chunk.length;
|
547 | });
|
548 | });
|
549 |
|
550 | response.on('end', function() {
|
551 | env.response.body = body;
|
552 | next(env);
|
553 | });
|
554 | } else {
|
555 | env.response.body = response;
|
556 | next(env);
|
557 | }
|
558 | }).on('error', function(err) {
|
559 | env.response.statusCode = 502;
|
560 | return next(env);
|
561 | });
|
562 |
|
563 | if (req.body) {
|
564 | request.end(req.body);
|
565 | } else {
|
566 | req.pipe(request);
|
567 | }
|
568 | };
|