UNPKG

14 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter;
2var util = require('util');
3var assert = require('assert');
4var http = require('http');
5var url = require('url');
6var querystring = require('querystring');
7var spdy = require('spdy');
8var ws = require('ws');
9var SpdyAgent = require('./spdy_agent');
10var Logger = require('./logger');
11
12var STATES = {
13 'DISCONNECTED' : 0,
14 'CONNECTING': 1,
15 'CONNECTED': 2
16};
17
18var PeerSocket = module.exports = function(ws, request, name, peerRegistry, opts) {
19 EventEmitter.call(this);
20
21 if (!opts) {
22 opts = {};
23 }
24
25 var self = this;
26 this.state = STATES.DISCONNECTED;
27 this.name = name; // peers local id
28 this.agent = null;
29 this.subscriptions = {}; // { <topic>: <subscribed_count> }
30 this.connectionId = null;
31 this._pingTimer = null;
32 this._pingTimeout = Number(opts.pingTimeout) || (10 * 1000);
33 this._confirmationTimeout = Number(opts.confirmationTimeout) || 10 * 1000;
34 this.peerRegistry = peerRegistry;
35 this.logger = new Logger();
36
37 this.on('connecting', function() {
38 self.state = STATES.CONNECTING;
39 });
40
41 this.on('end', function() {
42 self.state = STATES.DISCONNECTED;
43 self._setRegistryStatus('disconnected');
44 this._cleanup();
45 });
46
47 this.on('error', function(err) {
48 self.state = STATES.DISCONNECTED;
49 self._setRegistryStatus('failed', err);
50 this._cleanup();
51 });
52
53 this.on('connected', function() {
54 self.state = STATES.CONNECTED;
55 self._setRegistryStatus('connected');
56 });
57
58 this.init(ws, request);
59};
60util.inherits(PeerSocket, EventEmitter);
61
62Object.keys(STATES).forEach(function(k) {
63 module.exports[k] = STATES[k];
64});
65
66PeerSocket.prototype.properties = function() {
67 return {
68 id: this.name,
69 connectionId: this.connectionId
70 };
71};
72
73PeerSocket.prototype.close = function() {
74 clearInterval(this._pingTimer);
75
76 // TODO(adammagaluk): ws.close() is not propagating the connection closing.
77 // _cleanup is not getting called. agent.close() does close the connection. But
78 // we want the actual connection closed as well. Failing in Websocket where the
79 // close frame hasn't been received.
80 // eg. ws/Websocket.js if (this._closeFrameReceived) this._socket.end();
81 // This makes sense as the connection is not speaking the websocket protocol any
82 // longer after the connection is established. At this point we should be sending
83 // a SPDY/H2 close frame. Not a WS or at least just sending the TCP close frame.
84 // Right now it will not close but setup a timeout waiting on the frame and eventually
85 // close the connection.
86 //this.ws.close();
87
88 // End the TCP Connection from the peer.
89 // TODO(adammagaluk): Why is test 'peer connects should be the same peer object on the cloud with reconnect with timing issue'
90 // causing ws._socket to be null sometimes.
91 if (this.ws && this.ws._socket) {
92 this.ws._socket.end();
93 }
94};
95
96PeerSocket.prototype._cleanup = function() {
97 if (!this.agent) {
98 return;
99 }
100
101 // Removing use of internals of spdy
102 // TODO: validate memory leaks in new spdy library.
103 //var streams = this.agent._spdyState.connection._spdyState.streams;
104 //Object.keys(streams).forEach(function(k) {
105 // streams[k].destroy();
106 //});
107
108 this.agent.close();
109};
110
111PeerSocket.prototype.init = function(ws, request) {
112 assert(ws);
113 assert(request);
114
115 var self = this;
116 self.emit('connecting');
117 this._initWs(ws, request);
118
119 // delay because ws/spdy may not be fully established.
120 setImmediate(function() {
121 // setup connection
122 self._setupConnection(function(err) {
123 if (err) {
124 self.close();
125 self.emit('error', err);
126 return;
127 }
128
129 if (self.ws.readyState !== ws.OPEN) {
130 // dissconnected already, reset
131 self.close();
132 self.emit('error', new Error('Peer Socket: Setup connection finished but ws not opened for peer "' + self.name + '".'));
133 return;
134 }
135
136 var subscriptions = self.subscriptions;
137 self.subscriptions = {}; // clear it before resubscribing
138 // subscribe to all prev subscriptions
139 Object.keys(subscriptions).forEach(function(event) {
140 self.subscribe(event);
141 });
142
143 self._startPingTimer();
144
145 self.emit('connected');
146 });
147 });
148};
149
150PeerSocket.prototype._setupConnection = function(cb, tries) {
151 var self = this;
152 var peerItem = {
153 direction: 'acceptor',
154 id: self.name,
155 status: 'connecting'
156 };
157
158 self.peerRegistry.add(peerItem, function(err, newPeer) {
159 if (err) {
160 return cb(err);
161 }
162
163 // confirm connection with peer
164 self.confirmConnection(self.connectionId, cb);
165 });
166};
167
168PeerSocket.prototype._initWs = function(ws, request) {
169 var self = this;
170 // Need to keep a copy of the orignal request and websocket.
171 this.request = request;
172 this.ws = ws;
173
174 var u = url.parse(request.url, true); // parse out connectionId
175 this.connectionId = u.query.connectionId;
176 this.ws._socket.removeAllListeners('data'); // Remove WebSocket data handler.
177
178 this.ws._socket.on('end', function() {
179 clearInterval(self._pingTimer);
180 self.emit('end');
181 });
182
183 this.ws.on('error', function(err) {
184 clearInterval(self._pingTimer);
185 self.emit('error', err);
186 });
187
188 this.agent = spdy.createAgent(SpdyAgent, {
189 // If host: is set it overides all headers for host set causing issues.
190 // TODO: Remove after verifying spdy docs/source
191 //host: this.name,
192 port: 80,
193 socket: this.ws._socket,
194 spdy: {
195 plain: true,
196 ssl: false,
197 protocol: 'spdy/3.1'
198 }
199 });
200
201 // Catch errors on the the Spdy Connection event emitter. In some cases they
202 // emit only on that object and do not pass the `error` event into the agent.
203 // spdyState.conneciton is set before calling the `_connect` event.
204 // https://github.com/spdy-http2/node-spdy/blob/657c20d35906a058199c8b8d721c902ef6cdc4d6/lib/spdy/agent.js#L65-L71
205 this.agent.once('_connect', function(err) {
206 self.agent._spdyState.connection.once('error', function(err) {
207 self.close();
208 self.emit('error', err);
209 })
210 });
211
212 // TODO: Remove this when bug in agent socket removal is fixed.
213 this.agent.maxSockets = 150;
214
215 this.agent.on('error', function(err) {
216 self.close();
217 self.emit('error', err);
218 });
219};
220
221PeerSocket.prototype._startPingTimer = function() {
222 var self = this;
223 clearInterval(this._pingTimer);
224 this._pingTimer = setInterval(function() {
225 var timeout = setTimeout(function() {
226 self.close();
227 self.emit('error', new Error('Peer socket timed out'));
228 }, self._pingTimeout)
229
230 self.agent._spdyState.connection.ping(function(err) {
231 if (timeout) {
232 clearTimeout(timeout);
233 }
234 });
235 }, self._pingTimeout);
236
237};
238
239PeerSocket.prototype._setRegistryStatus = function(status, err, cb) {
240 var self = this;
241
242 if (typeof err === 'function') {
243 cb = err;
244 err = undefined;
245 }
246
247 if (!cb) {
248 cb = function(){};
249 }
250
251 this.peerRegistry.get(this.name, function(err, peer) {
252 if (err) {
253 return cb(err);
254 }
255
256 peer.status = status;
257 peer.connectionId = self.connectionId;
258 if (err) {
259 peer.error = err;
260 }
261 self.peerRegistry.save(peer, cb);
262 });
263};
264
265PeerSocket.prototype.onPushData = function(stream) {
266 // url -> path for whatever reason...
267 // TODO: validate docs/source in spdy repo
268 var streamUrl = stream.path.slice(1);
269 var self = this;
270
271 // TODO: this fixes an issue where streamURL is empty. Might
272 // have always been empty was previously not closing the connection
273 // when stream.connection.end was called.
274 var checkTopic = streamUrl; //stream.headers['topic'] ||
275 if(!this.subscriptions[checkTopic]) {
276 // TODO: There's some cases where this is needed and others
277 // where the topic is missing but we don't want to close the connection.
278 //stream.connection.end();
279 }
280
281 var encoding = stream.headers['content-type'] || 'application/json';
282 // remove additional parameters such as in `application/json; charset=utf-8`
283 if (encoding.indexOf(';') !== -1) {
284 encoding = encoding.split(';')[0].trim();
285 }
286 var length = Number(stream.headers['content-length']);
287 var data = new Buffer(length);
288 var idx = 0;
289 var d = null;
290 stream.on('readable', function() {
291 while (d = stream.read()) {
292 for (var i=0; i<d.length;i++) {
293 data[idx++] = d[i];
294 }
295 };
296 });
297
298 stream.on('error', function(err) {
299 console.error('error on push:', err);
300 });
301
302 stream.on('end', function() {
303 var body = null;
304 if (encoding === 'application/json') {
305 try {
306 body = JSON.parse(data.toString());
307 } catch (err) {
308 console.error('PeerSocket push data json parse error', err);
309 }
310 } else if(encoding === 'application/octet-stream') {
311 body = data;
312 }
313
314 self.emit(streamUrl, body);
315 self.emit('zetta-events', streamUrl, body)
316
317 //TODO(adammagaluk): verify any memory leaks without closing
318 //stream.connection.close();
319 });
320};
321
322PeerSocket.prototype.subscribe = function(event, cb) {
323 var self = this;
324 if(!cb) {
325 cb = function() {};
326 }
327
328 // TODO(adammagaluk): Is there a better way to handle
329 // the case. Ensure we only ever call the cb() once
330 // since on network failures the request will emit `error`
331 // after the response has been received.
332 var callbackHasBeenCalled = false;
333 var wrappedCallback = function(err) {
334 if (!callbackHasBeenCalled) {
335 callbackHasBeenCalled = true;
336 cb();
337 } else if (err && self.state == STATES.CONNECTED) { // Ignore the error is the peer is disconnected.
338 console.error('Subscription request returned an error after callback was called: ', err);
339 }
340 }
341
342 var queryPrefix = 'query%2F';
343 if (event && event.slice(0, queryPrefix.length) === queryPrefix) {
344 event = decodeURIComponent(event);
345 }
346
347 // keep track of number of subscriptions
348 if (this.subscriptions[event] === undefined) {
349 this.subscriptions[event] = 0;
350 }
351 this.subscriptions[event]++;
352
353 // if already subscribed ignore
354 if (this.subscriptions[event] > 1) {
355 wrappedCallback();
356 return;
357 }
358
359 var host;
360 if(this.ws && this.request) {
361 host = this.request.headers.host
362 } else {
363 host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
364 }
365
366 var opts = {
367 method: 'GET',
368 headers: {
369 'Content-Type': 'application/x-www-form-urlencoded',
370 'Host': host
371 },
372 path: '/servers/' + encodeURIComponent(this.name)
373 + '/events?topic=' + encodeURIComponent(event),
374 agent: this.agent
375 };
376
377 // TODO(adammagaluk):
378 // The request is long lived for the duration
379 // of the subscription. Once cb() is fired for the
380 // subscription, we need to ensure that it is not
381 // fired again. It could fire again on network
382 // failures etc...
383 var req = http.request(opts, function(res) {
384 // TODO(adammagaluk): We aren't handling status codes.
385 wrappedCallback();
386 }).on('error', wrappedCallback);
387
388 // Push event now happens on the request object.
389 req.on('push', this.onPushData.bind(this));
390
391 req.end();
392};
393
394PeerSocket.prototype.unsubscribe = function(event, cb) {
395 if(!cb) {
396 cb = function() {};
397 }
398
399 if (this.subscriptions[event] === undefined) {
400 this.subscriptions[event] = 0;
401 } else {
402 this.subscriptions[event]--;
403 if (this.subscriptions[event] < 0) {
404 this.subscriptions[event] = 0;
405 }
406 }
407
408 // only unsubscribe once all subscriptions count reaches 0
409 if (this.subscriptions[event] > 0) {
410 return cb();
411 }
412
413 var host;
414 if(this.ws && this.request) {
415 host = this.request.headers.host
416 } else {
417 host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
418 }
419
420 var body = new Buffer('topic='+event);
421 var opts = {
422 method: 'POST',
423 headers: {
424 'Content-Type': 'application/x-www-form-urlencoded',
425 'Host': host,
426 'Content-Length': body.length
427 },
428 path: '/servers/' + encodeURIComponent(this.name) + '/events/unsubscribe',
429 agent: this.agent
430 };
431
432 var req = http.request(opts, function(res) {
433 // TODO(adammagaluk): We aren't handling status codes or the body.
434 cb();
435 }).on('error', cb);
436 req.end(body);
437};
438
439PeerSocket.prototype.confirmConnection = function(connectionId, callback) {
440 var timeout = setTimeout(function() {
441 req.abort();
442 callback(new Error('Confirm connection timeout reached.'));
443 }, this._confirmationTimeout);
444
445 var opts = { agent: this.agent, path: '/_initiate_peer/' + connectionId };
446 var req = http.get(opts, function(res) {
447 clearTimeout(timeout);
448 if (res.statusCode !== 200) {
449 return callback(new Error('Unexpected status code'));
450 }
451 callback();
452 }).on('error', function(err) {
453 clearTimeout(timeout);
454 callback(err);
455 });
456};
457
458PeerSocket.prototype.transition = function(action, args, cb) {
459 var u = url.parse(action.href);
460 var path = u.pathname;
461
462 var body = new Buffer(querystring.stringify(args));
463
464 var host;
465 if(this.ws && this.request) {
466 host = this.request.headers.host
467 } else {
468 host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
469 }
470
471 var opts = {
472 agent: this.agent,
473 path: path,
474 method: action.method,
475 headers: {
476 'Content-Type': 'application/x-www-form-urlencoded',
477 'Host': host,
478 'Content-Length': body.length,
479 }
480 };
481
482 var req = http.request(opts, function(res) {
483 var buffer = [];
484 var len = 0;
485 res.on('readable', function() {
486 var data;
487 while (data = res.read()) {
488 buffer.push(data);
489 len += data.length;
490 }
491 });
492
493 res.on('end', function() {
494 var buf = Buffer.concat(buffer, len);
495 if (res.statusCode !== 200) {
496 return cb(new Error(buf.toString()));
497 }
498
499 var jsonBody = null;
500 try {
501 jsonBody = JSON.parse(buf.toString());
502 } catch(err) {
503 return cb(new Error('Failed to parse body'));
504 }
505 return cb(null, jsonBody);
506 });
507 }).on('error', cb);
508 req.end(body);
509};
510