UNPKG

13.3 kBJavaScriptView Raw
1var http = require('http');
2var path = require('path');
3var url = require('url');
4var querystring = require('querystring');
5var async = require('async');
6var spdy = require('spdy');
7var argo = require('argo');
8var titan = require('titan');
9var WebSocketServer = require('ws').Server;
10var SpdyAgent = require('./spdy_agent');
11var EventBroker = require('./event_broker');
12var PeerSocket = require('./peer_socket');
13var EventSocket = require('./event_socket');
14var Siren = require('argo-formatter-siren');
15var DevicesResource = require('./api_resources/devices');
16var PeersManagementResource = require('./api_resources/peer_management');
17var RootResource = require('./api_resources/root');
18var ServersResource = require('./api_resources/servers');
19var deviceFormatter = require('./api_formats/siren/device.siren');
20var rels = require('zetta-rels');
21
22var querytopic = require('./query_topic');
23
24var ZettaHttpServer = module.exports = function(zettaInstance) {
25 var self = this;
26 this.idCounter = 0;
27 this.zetta = zettaInstance;
28 this.peerRegistry = zettaInstance.peerRegistry;
29 this.eventBroker = new EventBroker(zettaInstance);
30 this.clients = {};
31 this.peers = {}; // connected peers
32
33 this._deviceQueries = [];
34
35 this._collectors = {};
36 this.server = spdy.createServer({
37 windowSize: 1024 * 1024,
38 plain: true,
39 ssl: false
40 });
41 this.spdyServer = spdy.createServer({
42 windowSize: 1024 * 1024,
43 plain: true,
44 ssl: false
45 });
46
47 this.wss = new WebSocketServer({ noServer: true });
48 this.server.on('upgrade', function(request, socket, headers) {
49 function match(request) {
50 if(/^\/peers\/(.+)$/.exec(request.url)) {
51 return true;
52 } else if(request.url === '/peer-management') {
53 return true;
54 } else if(/^\/servers\/(.+)\/events/.exec(request.url)) {
55 return true;
56 } else if(/^\/events/.exec(request.url)) {
57 return true;
58 } else {
59 return false;
60 }
61 }
62
63 if(match(request)) {
64 self.wss.handleUpgrade(request, socket, headers, function(ws) {
65 var match = /^\/peers\/(.+)$/.exec(ws.upgradeReq.url);
66 if (match) {
67 var name = /^\/peers\/(.+)$/.exec(url.parse(ws.upgradeReq.url, true).pathname)[1];
68 name = decodeURI(name);
69 self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer "' + name + '" established.');
70
71 if (self.peers[name] && self.peers[name].state !== PeerSocket.DISCONNECTED) {
72 // peer already connected or connecting
73 ws.close(4000, 'peer already connected');
74 } else if (self.peers[name]) {
75 // peer has been disconnected but has connected before.
76 self.peers[name].init(ws);
77 } else {
78 var peer = new PeerSocket(ws, name, self.peerRegistry);
79 self.peers[name] = peer;
80
81 peer.on('connected', function() {
82 self.eventBroker.peer(peer);
83 self.zetta.log.emit('log', 'http_server', 'Peer connection established "' + name + '".');
84 self.zetta.pubsub.publish('_peer/connect', { peer: peer });
85 });
86
87 peer.on('error', function(err) {
88 self.zetta.log.emit('log', 'http_server', 'Peer connection failed for "' + name + '".');
89 self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
90 });
91
92 peer.on('end', function() {
93 self.zetta.log.emit('log', 'http_server', 'Peer connection closed for "' + name + '".');
94 self.zetta.pubsub.publish('_peer/disconnect', { peer: peer });
95 });
96 }
97 } else if (ws.upgradeReq.url === '/peer-management') {
98 var query = [
99 { name: self.zetta.id, topic: '_peer/connect' },
100 { name: self.zetta.id, topic: '_peer/disconnect' }];
101
102 var client = new EventSocket(ws, query);
103 self.eventBroker.client(client);
104 } else {
105 self.setupEventSocket(ws);
106 }
107 });
108 }
109 });
110
111 this.cloud = argo()
112 .use(titan)
113 .format({ engines: [Siren], override: { 'application/json': Siren }, directory: path.join(__dirname, './api_formats') })
114 .add(RootResource, zettaInstance)
115 .add(PeersManagementResource, zettaInstance)
116 .add(DevicesResource, zettaInstance)
117 .add(ServersResource, zettaInstance)
118 .allow({
119 methods: ['DELETE', 'PUT', 'PATCH', 'POST'],
120 origins: ['*'],
121 headers: ['accept', 'content-type'],
122 maxAge: '432000'
123 })
124 .use(function(handle) {
125 handle('request', function(env, next) {
126 if (env.request.headers['zetta-message-id']) {
127 env.response.setHeader('zetta-message-id', env.request.headers['zetta-message-id']);
128 }
129 next(env);
130 });
131 })
132 .use(function(handle) {
133 handle('request', function(env, next) {
134 if (env.request.method === 'OPTIONS') {
135 env.argo._routed = true;
136 }
137 next(env);
138 });
139 })
140 .use(function(handle) {
141 handle('request', function(env, next) {
142 // stop execution in argo for initiate peer requests, handled by peer_client
143 if (!(/^\/_initiate_peer\/(.+)$/.exec(env.request.url)) ) {
144 next(env);
145 }
146 });
147 });
148};
149
150ZettaHttpServer.prototype.init = function(cb) {
151 var self = this;
152
153 // handle http registration of device
154 this.cloud = this.cloud.use(this.httpRegistration.bind(this));
155 // handle proxying to peer
156 //this.cloud = this.cloud.route('*', this.proxyToPeer.bind(this));
157 // setup http servers request handler to argo routes
158 this.cloud = this.cloud.build();
159
160 this.server.on('request', this.cloud.run);
161 this.spdyServer.on('request', this.cloud.run);
162
163 if (cb) {
164 cb();
165 }
166};
167
168ZettaHttpServer.prototype.listen = function() {
169 this.server.listen.apply(this.server,arguments);
170 return this;
171};
172
173ZettaHttpServer.prototype.collector = function(name, collector) {
174 if(typeof name === 'function'){
175 collector = name;
176 name = '_logs';
177 }
178
179 if(!this._collectors[name]) {
180 this._collectors[name] = [];
181 }
182
183 this._collectors[name].push(collector);
184 return this;
185};
186
187function getCurrentProtocol(req) {
188 var xfp = req.headers['x-forwarded-proto'];
189 var protocol;
190
191 if (xfp && xfp.length) {
192 protocol = xfp.replace(/\s*/, '').split(',')[0];
193 } else {
194 protocol = req.connection.encrypted ? 'https' : 'http';
195 }
196
197 return protocol;
198}
199
200ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
201 ws._env = { helpers: {}};
202 ws._loader = { path: p };
203
204 ws._env.uri = function() {
205 var protocol = getCurrentProtocol(ws.upgradeReq);
206
207 if (!host) {
208 var address = ws.upgradeReq.connection.address();
209 host = address.address;
210 if (address.port) {
211 if (!(protocol === 'https' && address.port === 443) &&
212 !(protocol === 'http' && address.port === 80)) {
213 host += ':' + address.port
214 }
215 }
216 }
217
218 return protocol + '://' + path.join(host, ws.upgradeReq.url);
219 };
220
221 ws._env.helpers.url = {};
222 ws._env.helpers.url.path = function(pathname) {
223 var parsed = url.parse(ws._env.uri());
224 parsed.search = null;
225 parsed.pathname = pathname;
226 return url.format(parsed);
227 };
228};
229
230ZettaHttpServer.prototype.setupEventSocket = function(ws) {
231 var self = this;
232 var host = ws.upgradeReq.headers['host'];
233
234 if (/^\/events/.exec(ws.upgradeReq.url)) {
235 self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name);
236
237 var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
238
239 function copy(q) {
240 var c = {};
241 Object.keys(q).forEach(function(k) {
242 c[k] = q[k];
243 });
244
245 return c;
246 }
247
248 [self.zetta._name].concat(Object.keys(self.peers)).forEach(function(serverId) {
249 var q = copy(query);
250 q.name = serverId;
251
252 if (q.topic) {
253 var qt = querytopic.parse(query.topic);
254 if (qt) {
255 q.topic = querytopic.format(qt);
256 }
257 var client = new EventSocket(ws, q);
258 self.eventBroker.client(client);
259 }
260 });
261
262 function subscribeOnPeerConnect(e, obj) {
263 var q = copy(query);
264 q.name = obj.peer.name;
265
266 if (q.topic) {
267 var qt = querytopic.parse(query.topic);
268 if (qt) {
269 q.topic = querytopic.format(qt);
270 }
271
272 var client = new EventSocket(ws, q);
273 self.eventBroker.client(client);
274 }
275 }
276
277 ws.on('close', function() {
278 self.zetta.pubsub.unsubscribe('_peer/connect', subscribeOnPeerConnect);
279 });
280
281 self.zetta.pubsub.subscribe('_peer/connect', subscribeOnPeerConnect);
282 } else {
283 var match = /^\/servers\/(.+)\/events/.exec(ws.upgradeReq.url);
284 if(!match) {
285 ws.close(1001); // go away status code
286 return;
287 }
288
289 var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
290 query.serverId = match[1]; // set serverId on query
291
292 self.wireUpWebSocketForEvent(ws, host, '/servers/' + query.serverId);
293
294 var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
295 query.name = decodeURI(match[1]);
296
297 if (query.topic) {
298 var qt = querytopic.parse(query.topic);
299 if (qt) {
300 query.topic = querytopic.format(qt);
301 }
302 var client = new EventSocket(ws, query);
303 self.eventBroker.client(client);
304 }
305 }
306};
307
308ZettaHttpServer.prototype.httpRegistration = function(handle) {
309 handle('request', function(env, next) {
310 if (!(env.request.method === 'POST' && env.request.url === '/registration')) {
311 return next(env);
312 }
313
314 env.request.getBody(function(err, body) {
315 body = JSON.parse(body.toString());
316 var peer = self.peers[body.target];
317
318 if (!peer.agent) {
319 env.response.statusCode = 404;
320 return next(env);
321 }
322
323 env.request.body = new Buffer(JSON.stringify(body.device));
324 env.zettaAgent = peer.agent;
325 next(env);
326 });
327 });
328};
329
330ZettaHttpServer.prototype.proxyToPeers = function(peers, env, cb) {
331 var self = this;
332
333 var req = env.request;
334 var res = env.response;
335 var protocol = getCurrentProtocol(req);
336
337 var messageId = ++self.idCounter;
338 self.clients[messageId] = res;
339
340 var tasks = peers.map(function(p) {
341 var name = p.id;
342 return function(callback) {
343
344 var reqUrl = req.templateUrl.replace('{{peerName}}', encodeURIComponent(name));
345 var headers = {};
346
347 Object.keys(req.headers).forEach(function(key) {
348 headers[key] = req.headers[key];
349 });
350
351 headers['zetta-message-id'] = messageId;
352 headers['zetta-forwarded-server'] = name;
353
354 if (!req.isSpdy) {
355 headers['x-forwarded-proto'] = protocol;
356 }
357
358 var peer = self.peers[name];
359 if (!peer || peer.state !== PeerSocket.CONNECTED){
360 callback(null, { err: new Error('Peer does not exist.') });
361 return;
362 }
363
364 var agent = env.zettaAgent || peer.agent;
365
366 var opts = { method: req.method, headers: headers, path: reqUrl, agent: agent };
367 var request = http.request(opts, function(response) {
368 response.getBody(function(err, body) {
369 callback(null, { res: response, err: err, body: body });
370 });
371 }).on('error', function(err) {
372 return callback(null, { err: err });
373 });
374
375 if (req.body) {
376 request.end(req.body);
377 } else {
378 req.pipe(request);
379 }
380 };
381 });
382
383 async.parallelLimit(tasks, 5, function(err, results) {
384 cb(err, results, messageId);
385 });
386};
387
388ZettaHttpServer.prototype.proxyToPeer = function(env, next) {
389 var self = this;
390 var req = env.request;
391 var res = env.response;
392 var opts = env.proxyOpts || {};
393
394 var messageId = ++self.idCounter;
395
396 // change this to handle multiple fogs
397 self.clients[messageId] = res;//req.socket; Will need socket for event broadcast.
398
399 var parsed = url.parse(req.url);
400 var name = decodeURIComponent(parsed.pathname.split('/')[2]);
401
402 req.headers['zetta-message-id'] = messageId;
403 req.headers['zetta-forwarded-server'] = name;
404 if (!req.isSpdy) {
405 req.headers['x-forwarded-proto'] = getCurrentProtocol(req);
406 }
407
408 var peer = self.peers[name];
409 if (!peer){
410 res.statusCode = 404;
411 res.end();
412 return;
413 }
414
415 var agent = env.zettaAgent || peer.agent;
416
417 var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent };
418 var request = http.request(opts, function(response) {
419 var id = response.headers['zetta-message-id'];
420 var res = self.clients[id];
421
422 if (!res) {
423 response.statusCode = 404;
424 return;
425 }
426
427 Object.keys(response.headers).forEach(function(header) {
428 if (header !== 'zetta-message-id') {
429 res.setHeader(header, response.headers[header]);
430 }
431 });
432
433 res.statusCode = response.statusCode;
434 var buf = [];
435 if(!opts.pipe) {
436 response.on('data', function(d) {
437 buf += d;
438 });
439 } else {
440 response.pipe(res);
441 }
442
443 response.on('end', function() {
444 if(!opts.pipe) {
445 env.response.body = buf.toString();
446 }
447 delete self.clients[id];
448 next(env);
449 });
450
451 }).on('error', function(err) {
452 env.response.statusCode = 502;
453 return next(env);
454 });
455
456 if (req.body) {
457 request.end(req.body);
458 } else {
459 req.pipe(request);
460 }
461};