1 | var http = require('http');
|
2 | var path = require('path');
|
3 | var url = require('url');
|
4 | var querystring = require('querystring');
|
5 | var async = require('async');
|
6 | var spdy = require('spdy');
|
7 | var argo = require('argo');
|
8 | var titan = require('titan');
|
9 | var WebSocketServer = require('ws').Server;
|
10 | var SpdyAgent = require('./spdy_agent');
|
11 | var EventBroker = require('./event_broker');
|
12 | var PeerSocket = require('./peer_socket');
|
13 | var EventSocket = require('./event_socket');
|
14 | var Siren = require('argo-formatter-siren');
|
15 | var DevicesResource = require('./api_resources/devices');
|
16 | var PeersManagementResource = require('./api_resources/peer_management');
|
17 | var RootResource = require('./api_resources/root');
|
18 | var ServersResource = require('./api_resources/servers');
|
19 | var deviceFormatter = require('./api_formats/siren/device.siren');
|
20 | var rels = require('zetta-rels');
|
21 |
|
22 | var querytopic = require('./query_topic');
|
23 |
|
24 | var ZettaHttpServer = module.exports = function(zettaInstance) {
|
25 | var self = this;
|
26 | this.idCounter = 0;
|
27 | this.zetta = zettaInstance;
|
28 | this.peerRegistry = zettaInstance.peerRegistry;
|
29 | this.eventBroker = new EventBroker(zettaInstance);
|
30 | this.clients = {};
|
31 | this.peers = {};
|
32 |
|
33 | this._deviceQueries = [];
|
34 |
|
35 | this._collectors = {};
|
36 | this.server = spdy.createServer({
|
37 | windowSize: 1024 * 1024,
|
38 | plain: true,
|
39 | ssl: false
|
40 | });
|
41 | this.spdyServer = spdy.createServer({
|
42 | windowSize: 1024 * 1024,
|
43 | plain: true,
|
44 | ssl: false
|
45 | });
|
46 |
|
47 | this.wss = new WebSocketServer({ noServer: true });
|
48 | this.server.on('upgrade', function(request, socket, headers) {
|
49 | function match(request) {
|
50 | if(/^\/peers\/(.+)$/.exec(request.url)) {
|
51 | return true;
|
52 | } else if(request.url === '/peer-management') {
|
53 | return true;
|
54 | } else if(/^\/servers\/(.+)\/events/.exec(request.url)) {
|
55 | return true;
|
56 | } else if(/^\/events/.exec(request.url)) {
|
57 | return true;
|
58 | } else {
|
59 | return false;
|
60 | }
|
61 | }
|
62 |
|
63 | if(match(request)) {
|
64 | self.wss.handleUpgrade(request, socket, headers, function(ws) {
|
65 | var match = /^\/peers\/(.+)$/.exec(ws.upgradeReq.url);
|
66 | if (match) {
|
67 | var name = /^\/peers\/(.+)$/.exec(url.parse(ws.upgradeReq.url, true).pathname)[1];
|
68 | name = decodeURI(name);
|
69 | self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer "' + name + '" established.');
|
70 |
|
71 | if (self.peers[name] && self.peers[name].state !== PeerSocket.DISCONNECTED) {
|
72 |
|
73 | ws.close(4000, 'peer already connected');
|
74 | } else if (self.peers[name]) {
|
75 |
|
76 | self.peers[name].init(ws);
|
77 | } else {
|
78 | var peer = new PeerSocket(ws, name, self.peerRegistry);
|
79 | self.peers[name] = peer;
|
80 |
|
81 | peer.on('connected', function() {
|
82 | self.eventBroker.peer(peer);
|
83 | self.zetta.log.emit('log', 'http_server', 'Peer connection established "' + name + '".');
|
84 | self.zetta.pubsub.publish('_peer/connect', { peer: peer });
|
85 | });
|
86 |
|
87 | peer.on('error', function(err) {
|
88 | self.zetta.log.emit('log', 'http_server', 'Peer connection failed for "' + name + '".');
|
89 | self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
|
90 | });
|
91 |
|
92 | peer.on('end', function() {
|
93 | self.zetta.log.emit('log', 'http_server', 'Peer connection closed for "' + name + '".');
|
94 | self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
|
95 | });
|
96 | }
|
97 | } else if (ws.upgradeReq.url === '/peer-management') {
|
98 | var query = [
|
99 | { name: self.zetta.id, topic: '_peer/connect' },
|
100 | { name: self.zetta.id, topic: '_peer/disconnect' }];
|
101 |
|
102 | var client = new EventSocket(ws, query);
|
103 | self.eventBroker.client(client);
|
104 | } else {
|
105 | self.setupEventSocket(ws);
|
106 | }
|
107 | });
|
108 | }
|
109 | });
|
110 |
|
111 | this.cloud = argo()
|
112 | .use(titan)
|
113 | .format({ engines: [Siren], override: { 'application/json': Siren }, directory: path.join(__dirname, './api_formats') })
|
114 | .add(RootResource, zettaInstance)
|
115 | .add(PeersManagementResource, zettaInstance)
|
116 | .add(DevicesResource, zettaInstance)
|
117 | .add(ServersResource, zettaInstance)
|
118 | .allow({
|
119 | methods: ['DELETE', 'PUT', 'PATCH', 'POST'],
|
120 | origins: ['*'],
|
121 | headers: ['accept', 'content-type'],
|
122 | maxAge: '432000'
|
123 | })
|
124 | .use(function(handle) {
|
125 | handle('request', function(env, next) {
|
126 | if (env.request.headers['zetta-message-id']) {
|
127 | env.response.setHeader('zetta-message-id', env.request.headers['zetta-message-id']);
|
128 | }
|
129 | next(env);
|
130 | });
|
131 | })
|
132 | .use(function(handle) {
|
133 | handle('request', function(env, next) {
|
134 | if (env.request.method === 'OPTIONS') {
|
135 | env.argo._routed = true;
|
136 | }
|
137 | next(env);
|
138 | });
|
139 | })
|
140 | .use(function(handle) {
|
141 | handle('request', function(env, next) {
|
142 |
|
143 | if (!(/^\/_initiate_peer\/(.+)$/.exec(env.request.url)) ) {
|
144 | next(env);
|
145 | }
|
146 | });
|
147 | });
|
148 | };
|
149 |
|
150 | ZettaHttpServer.prototype.init = function(cb) {
|
151 | var self = this;
|
152 |
|
153 |
|
154 | this.cloud = this.cloud.use(this.httpRegistration.bind(this));
|
155 |
|
156 |
|
157 |
|
158 | this.cloud = this.cloud.build();
|
159 |
|
160 | this.server.on('request', this.cloud.run);
|
161 | this.spdyServer.on('request', this.cloud.run);
|
162 |
|
163 | if (cb) {
|
164 | cb();
|
165 | }
|
166 | };
|
167 |
|
168 | ZettaHttpServer.prototype.listen = function() {
|
169 | this.server.listen.apply(this.server,arguments);
|
170 | return this;
|
171 | };
|
172 |
|
173 | ZettaHttpServer.prototype.collector = function(name, collector) {
|
174 | if(typeof name === 'function'){
|
175 | collector = name;
|
176 | name = '_logs';
|
177 | }
|
178 |
|
179 | if(!this._collectors[name]) {
|
180 | this._collectors[name] = [];
|
181 | }
|
182 |
|
183 | this._collectors[name].push(collector);
|
184 | return this;
|
185 | };
|
186 |
|
187 | function getCurrentProtocol(req) {
|
188 | var xfp = req.headers['x-forwarded-proto'];
|
189 | var protocol;
|
190 |
|
191 | if (xfp && xfp.length) {
|
192 | protocol = xfp.replace(/\s*/, '').split(',')[0];
|
193 | } else {
|
194 | protocol = req.connection.encrypted ? 'https' : 'http';
|
195 | }
|
196 |
|
197 | return protocol;
|
198 | }
|
199 |
|
200 | ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
|
201 | ws._env = { helpers: {}};
|
202 | ws._loader = { path: p };
|
203 |
|
204 | ws._env.uri = function() {
|
205 | var protocol = getCurrentProtocol(ws.upgradeReq);
|
206 |
|
207 | if (!host) {
|
208 | var address = ws.upgradeReq.connection.address();
|
209 | host = address.address;
|
210 | if (address.port) {
|
211 | if (!(protocol === 'https' && address.port === 443) &&
|
212 | !(protocol === 'http' && address.port === 80)) {
|
213 | host += ':' + address.port
|
214 | }
|
215 | }
|
216 | }
|
217 |
|
218 | return protocol + '://' + path.join(host, ws.upgradeReq.url);
|
219 | };
|
220 |
|
221 | ws._env.helpers.url = {};
|
222 | ws._env.helpers.url.path = function(pathname) {
|
223 | var parsed = url.parse(ws._env.uri());
|
224 | parsed.search = null;
|
225 | parsed.pathname = pathname;
|
226 | return url.format(parsed);
|
227 | };
|
228 | };
|
229 |
|
230 | ZettaHttpServer.prototype.setupEventSocket = function(ws) {
|
231 | var self = this;
|
232 | var host = ws.upgradeReq.headers['host'];
|
233 |
|
234 | if (/^\/events/.exec(ws.upgradeReq.url)) {
|
235 | self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name);
|
236 |
|
237 | var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
|
238 |
|
239 | function copy(q) {
|
240 | var c = {};
|
241 | Object.keys(q).forEach(function(k) {
|
242 | c[k] = q[k];
|
243 | });
|
244 |
|
245 | return c;
|
246 | }
|
247 |
|
248 | [self.zetta._name].concat(Object.keys(self.peers)).forEach(function(serverId) {
|
249 | var q = copy(query);
|
250 | q.name = serverId;
|
251 |
|
252 | if (q.topic) {
|
253 | var qt = querytopic.parse(query.topic);
|
254 | if (qt) {
|
255 | q.topic = querytopic.format(qt);
|
256 | }
|
257 | var client = new EventSocket(ws, q);
|
258 | self.eventBroker.client(client);
|
259 | }
|
260 | });
|
261 |
|
262 | function subscribeOnPeerConnect(e, obj) {
|
263 | var q = copy(query);
|
264 | q.name = obj.peer.name;
|
265 |
|
266 | if (q.topic) {
|
267 | var qt = querytopic.parse(query.topic);
|
268 | if (qt) {
|
269 | q.topic = querytopic.format(qt);
|
270 | }
|
271 |
|
272 | var client = new EventSocket(ws, q);
|
273 | self.eventBroker.client(client);
|
274 | }
|
275 | }
|
276 |
|
277 | ws.on('close', function() {
|
278 | self.zetta.pubsub.unsubscribe('_peer/connect', subscribeOnPeerConnect);
|
279 | });
|
280 |
|
281 | self.zetta.pubsub.subscribe('_peer/connect', subscribeOnPeerConnect);
|
282 | } else {
|
283 | var match = /^\/servers\/(.+)\/events/.exec(ws.upgradeReq.url);
|
284 | if(!match) {
|
285 | ws.close(1001);
|
286 | return;
|
287 | }
|
288 |
|
289 | var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
|
290 | query.serverId = match[1];
|
291 |
|
292 | self.wireUpWebSocketForEvent(ws, host, '/servers/' + query.serverId);
|
293 |
|
294 | var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
|
295 | query.name = decodeURI(match[1]);
|
296 |
|
297 | if (query.topic) {
|
298 | var qt = querytopic.parse(query.topic);
|
299 | if (qt) {
|
300 | query.topic = querytopic.format(qt);
|
301 | }
|
302 | var client = new EventSocket(ws, query);
|
303 | self.eventBroker.client(client);
|
304 | }
|
305 | }
|
306 | };
|
307 |
|
308 | ZettaHttpServer.prototype.httpRegistration = function(handle) {
|
309 | handle('request', function(env, next) {
|
310 | if (!(env.request.method === 'POST' && env.request.url === '/registration')) {
|
311 | return next(env);
|
312 | }
|
313 |
|
314 | env.request.getBody(function(err, body) {
|
315 | body = JSON.parse(body.toString());
|
316 | var peer = self.peers[body.target];
|
317 |
|
318 | if (!peer.agent) {
|
319 | env.response.statusCode = 404;
|
320 | return next(env);
|
321 | }
|
322 |
|
323 | env.request.body = new Buffer(JSON.stringify(body.device));
|
324 | env.zettaAgent = peer.agent;
|
325 | next(env);
|
326 | });
|
327 | });
|
328 | };
|
329 |
|
330 | ZettaHttpServer.prototype.proxyToPeers = function(peers, env, cb) {
|
331 | var self = this;
|
332 |
|
333 | var req = env.request;
|
334 | var res = env.response;
|
335 | var protocol = getCurrentProtocol(req);
|
336 |
|
337 | var messageId = ++self.idCounter;
|
338 | self.clients[messageId] = res;
|
339 |
|
340 | var tasks = peers.map(function(p) {
|
341 | var name = p.id;
|
342 | return function(callback) {
|
343 |
|
344 | var reqUrl = req.templateUrl.replace('{{peerName}}', encodeURIComponent(name));
|
345 | var headers = {};
|
346 |
|
347 | Object.keys(req.headers).forEach(function(key) {
|
348 | headers[key] = req.headers[key];
|
349 | });
|
350 |
|
351 | headers['zetta-message-id'] = messageId;
|
352 | headers['zetta-forwarded-server'] = name;
|
353 |
|
354 | if (!req.isSpdy) {
|
355 | headers['x-forwarded-proto'] = protocol;
|
356 | }
|
357 |
|
358 | var peer = self.peers[name];
|
359 | if (!peer || peer.state !== PeerSocket.CONNECTED){
|
360 | callback(null, { err: new Error('Peer does not exist.') });
|
361 | return;
|
362 | }
|
363 |
|
364 | var agent = env.zettaAgent || peer.agent;
|
365 |
|
366 | var opts = { method: req.method, headers: headers, path: reqUrl, agent: agent };
|
367 | var request = http.request(opts, function(response) {
|
368 | response.getBody(function(err, body) {
|
369 | callback(null, { res: response, err: err, body: body });
|
370 | });
|
371 | }).on('error', function(err) {
|
372 | return callback(null, { err: err });
|
373 | });
|
374 |
|
375 | if (req.body) {
|
376 | request.end(req.body);
|
377 | } else {
|
378 | req.pipe(request);
|
379 | }
|
380 | };
|
381 | });
|
382 |
|
383 | async.parallelLimit(tasks, 5, function(err, results) {
|
384 | cb(err, results, messageId);
|
385 | });
|
386 | };
|
387 |
|
388 | ZettaHttpServer.prototype.proxyToPeer = function(env, next) {
|
389 | var self = this;
|
390 | var req = env.request;
|
391 | var res = env.response;
|
392 | var opts = env.proxyOpts || {};
|
393 |
|
394 | var messageId = ++self.idCounter;
|
395 |
|
396 |
|
397 | self.clients[messageId] = res;
|
398 |
|
399 | var parsed = url.parse(req.url);
|
400 | var name = decodeURIComponent(parsed.pathname.split('/')[2]);
|
401 |
|
402 | req.headers['zetta-message-id'] = messageId;
|
403 | req.headers['zetta-forwarded-server'] = name;
|
404 | if (!req.isSpdy) {
|
405 | req.headers['x-forwarded-proto'] = getCurrentProtocol(req);
|
406 | }
|
407 |
|
408 | var peer = self.peers[name];
|
409 | if (!peer){
|
410 | res.statusCode = 404;
|
411 | res.end();
|
412 | return;
|
413 | }
|
414 |
|
415 | var agent = env.zettaAgent || peer.agent;
|
416 |
|
417 | var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent };
|
418 | var request = http.request(opts, function(response) {
|
419 | var id = response.headers['zetta-message-id'];
|
420 | var res = self.clients[id];
|
421 |
|
422 | if (!res) {
|
423 | response.statusCode = 404;
|
424 | return;
|
425 | }
|
426 |
|
427 | Object.keys(response.headers).forEach(function(header) {
|
428 | if (header !== 'zetta-message-id') {
|
429 | res.setHeader(header, response.headers[header]);
|
430 | }
|
431 | });
|
432 |
|
433 | res.statusCode = response.statusCode;
|
434 | var buf = [];
|
435 | if(!opts.pipe) {
|
436 | response.on('data', function(d) {
|
437 | buf += d;
|
438 | });
|
439 | } else {
|
440 | response.pipe(res);
|
441 | }
|
442 |
|
443 | response.on('end', function() {
|
444 | if(!opts.pipe) {
|
445 | env.response.body = buf.toString();
|
446 | }
|
447 | delete self.clients[id];
|
448 | next(env);
|
449 | });
|
450 |
|
451 | }).on('error', function(err) {
|
452 | env.response.statusCode = 502;
|
453 | return next(env);
|
454 | });
|
455 |
|
456 | if (req.body) {
|
457 | request.end(req.body);
|
458 | } else {
|
459 | req.pipe(request);
|
460 | }
|
461 | };
|