UNPKG

10.2 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter;
2var util = require('util');
3var http = require('http');
4var url = require('url');
5var querystring = require('querystring');
6var spdy = require('spdy');
7var ws = require('ws');
8var SpdyAgent = require('./spdy_agent');
9var Logger = require('./logger');
10
11var STATES = {
12 'DISCONNECTED' : 0,
13 'CONNECTING': 1,
14 'CONNECTED': 2
15};
16
17var PeerSocket = module.exports = function(ws, name, peerRegistry) {
18 EventEmitter.call(this);
19
20 var self = this;
21 this.state = STATES.DISCONNECTED;
22 this.name = name; // peers local id
23 this.agent = null;
24 this.subscriptions = {}; // { <topic>: <subscribed_count> }
25 this.connectionId = null;
26 this._pingTimer = null;
27 this._pingTimeout = 10 * 1000;
28 this.peerRegistry = peerRegistry;
29 this.logger = new Logger();
30
31 this.on('connecting', function() {
32 self.state = STATES.CONNECTING;
33 });
34
35 this.on('end', function() {
36 self.state = STATES.DISCONNECTED;
37 self._setRegistryStatus('disconnected');
38 this._cleanup();
39 });
40
41 this.on('error', function(err) {
42 self.state = STATES.DISCONNECTED;
43 self._setRegistryStatus('failed', err);
44 this._cleanup();
45 });
46
47 this.on('connected', function() {
48 self.state = STATES.CONNECTED;
49 self._setRegistryStatus('connected');
50 });
51
52 this.init(ws);
53};
54util.inherits(PeerSocket, EventEmitter);
55
56Object.keys(STATES).forEach(function(k) {
57 module.exports[k] = STATES[k];
58});
59
60PeerSocket.prototype.properties = function() {
61 return {
62 id: this.name,
63 connectionId: this.connectionId
64 };
65};
66
67PeerSocket.prototype.close = function() {
68 clearInterval(this._pingTimer);
69 this.ws.close();
70};
71
72PeerSocket.prototype._cleanup = function() {
73 if (!this.agent) {
74 return;
75 }
76
77 var streams = this.agent._spdyState.connection._spdyState.streams;
78 Object.keys(streams).forEach(function(k) {
79 streams[k].destroy();
80 });
81
82 this.agent.close();
83};
84
85PeerSocket.prototype.init = function(ws) {
86 var self = this;
87 self.emit('connecting');
88
89 if (ws) {
90 this._initWs(ws);
91 }
92
93 // delay because ws/spdy may not be fully established
94 setImmediate(function() {
95 // setup connection
96 self._setupConnection(function(err) {
97 if (err) {
98 self.close();
99 self.emit('error', err);
100 return;
101 }
102
103 if (self.ws.readyState !== ws.OPEN) {
104 // dissconnected already, reset
105 self.close();
106 self.emit('error', new Error('Peer Socket: Setup connection finished but ws not opened for peer "' + self.name + '".'));
107 return;
108 }
109
110 var subscriptions = self.subscriptions;
111 self.subscriptions = {}; // clear it before resubscribing
112 // subscribe to all prev subscriptions
113 Object.keys(subscriptions).forEach(function(event) {
114 self.subscribe(event);
115 });
116
117 self._startPingTimer();
118 self.emit('connected');
119 });
120 });
121};
122
123PeerSocket.prototype._setupConnection = function(cb, tries) {
124 var self = this;
125 var peerItem = {
126 direction: 'acceptor',
127 id: self.name,
128 status: 'connecting'
129 };
130
131 self.peerRegistry.add(peerItem, function(err, newPeer) {
132 if (err) {
133 return cb(err);
134 }
135
136 // confirm connection with peer
137 self.confirmConnection(self.connectionId, cb);
138 });
139};
140
141PeerSocket.prototype._initWs = function(ws) {
142 var self = this;
143 var u = url.parse(ws.upgradeReq.url, true); // parse out connectionId
144 this.ws = ws;
145 this.connectionId = u.query.connectionId;
146 this.ws._socket.removeAllListeners('data'); // Remove WebSocket data handler.
147
148 this.ws._socket.on('end', function() {
149 clearInterval(self._pingTimer);
150 self.emit('end');
151 });
152
153 this.ws.on('error', function(err) {
154 clearInterval(self._pingTimer);
155 self.emit('error', err);
156 });
157
158
159 this.agent = spdy.createAgent(SpdyAgent, {
160 host: this.name,
161 port: 80,
162 socket: this.ws._socket,
163 spdy: {
164 plain: true,
165 ssl: false
166 }
167 });
168
169 // TODO: Remove this when bug in agent socket removal is fixed.
170 this.agent.maxSockets = 150;
171 this.agent.on('push', this.onPushData.bind(this));
172 this.agent.on('error', function(err) {
173 self.close();
174 self.emit('error', err);
175 });
176};
177
178PeerSocket.prototype._startPingTimer = function() {
179 var self = this;
180 clearInterval(this._pingTimer);
181 this._pingTimer = setInterval(function() {
182 var timeout = setTimeout(function() {
183 self.close();
184 self.emit('error', new Error('Peer socket timed out'));
185 }, self._pingTimeout)
186
187 self.agent.ping(function(err) {
188 if (timeout) {
189 clearTimeout(timeout);
190 }
191 });
192 }, self._pingTimeout);
193
194};
195
196PeerSocket.prototype._setRegistryStatus = function(status, err, cb) {
197 var self = this;
198
199 if (typeof err === 'function') {
200 cb = err;
201 err = undefined;
202 }
203
204 if (!cb) {
205 cb = function(){};
206 }
207
208 this.peerRegistry.get(this.name, function(err, peer) {
209 if (err) {
210 return cb(err);
211 }
212
213 peer.status = status;
214 peer.connectionId = self.connectionId;
215 if (err) {
216 peer.error = err;
217 }
218 self.peerRegistry.save(peer, cb);
219 });
220};
221
222PeerSocket.prototype.onPushData = function(stream) {
223 var streamUrl = stream.url.slice(1);
224 var self = this;
225
226 if(!this.subscriptions[streamUrl]) {
227 stream.connection.end();
228 }
229
230 var encoding = stream.headers['content-type'] || 'application/json';
231 // remove additional parameters such as in `application/json; charset=utf-8`
232 if (encoding.indexOf(';') !== -1) {
233 encoding = encoding.split(';')[0].trim();
234 }
235 var length = Number(stream.headers['content-length']);
236 var data = new Buffer(length);
237 var idx = 0;
238 var d = null;
239 stream.on('readable', function() {
240 while (d = stream.read()) {
241 for (var i=0; i<d.length;i++) {
242 data[idx++] = d[i];
243 }
244 };
245 });
246
247 stream.on('error', function(err) {
248 console.error('error on push:', err);
249 });
250
251 stream.on('end', function() {
252 var body = null;
253 if (encoding === 'application/json') {
254 try {
255 body = JSON.parse(data.toString());
256 } catch (err) {
257 console.error('PeerSocket push data json parse error', err);
258 }
259 } else if(encoding === 'application/octet-stream') {
260 body = data;
261 }
262
263 self.emit(streamUrl, body);
264 self.emit('zetta-events', streamUrl, body)
265 stream.connection.close();
266 });
267};
268
269PeerSocket.prototype.subscribe = function(event, cb) {
270 if(!cb) {
271 cb = function() {};
272 }
273
274 var queryPrefix = 'query%2F';
275 if (event && event.slice(0, queryPrefix.length) === queryPrefix) {
276 event = decodeURIComponent(event);
277 }
278
279 // keep track of number of subscriptions
280 if (this.subscriptions[event] === undefined) {
281 this.subscriptions[event] = 0;
282 }
283 this.subscriptions[event]++;
284
285 // if already subscribed ignore
286 if (this.subscriptions[event] > 1) {
287 cb();
288 return;
289 }
290
291 var host;
292 if(this.ws && this.ws.upgradeReq) {
293 host = this.ws.upgradeReq.headers.host
294 } else {
295 host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
296 }
297
298 var opts = {
299 method: 'GET',
300 headers: {
301 'Content-Type': 'application/x-www-form-urlencoded',
302 'Host': host
303 },
304 path: '/servers/' + encodeURIComponent(this.name)
305 + '/events?topic=' + encodeURIComponent(event),
306 agent: this.agent
307 };
308
309 var req = http.request(opts, function(res) {
310 cb();
311 }).on('error', cb);
312 req.end();
313};
314
315PeerSocket.prototype.unsubscribe = function(event, cb) {
316 if(!cb) {
317 cb = function() {};
318 }
319
320 if (this.subscriptions[event] === undefined) {
321 this.subscriptions[event] = 0;
322 } else {
323 this.subscriptions[event]--;
324 if (this.subscriptions[event] < 0) {
325 this.subscriptions[event] = 0;
326 }
327 }
328
329 // only unsubscribe once all subscriptions count reaches 0
330 if (this.subscriptions[event] > 0) {
331 return cb();
332 }
333
334 var host;
335 if(this.ws && this.ws.upgradeReq) {
336 host = this.ws.upgradeReq.headers.host
337 } else {
338 host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
339 }
340
341 var body = new Buffer('topic='+event);
342 var opts = {
343 method: 'POST',
344 headers: {
345 'Content-Type': 'application/x-www-form-urlencoded',
346 'Host': host,
347 'Content-Length': body.length
348 },
349 path: '/servers/' + encodeURIComponent(this.name) + '/events/unsubscribe',
350 agent: this.agent
351 };
352
353 var req = http.request(opts, function(res) {
354 cb();
355 }).on('error', cb);
356 req.end(body);
357};
358
359PeerSocket.prototype.confirmConnection = function(connectionId, callback) {
360 var timeout = setTimeout(function() {
361 req.abort();
362 callback(new Error('Confirm connection timeout reached.'));
363 }, this._pingTimeout);
364
365 var opts = { agent: this.agent, path: '/_initiate_peer/' + connectionId };
366 var req = http.get(opts, function(res) {
367 clearTimeout(timeout);
368 if (res.statusCode !== 200) {
369 return callback(new Error('Unexpected status code'));
370 }
371 callback();
372 }).on('error', function(err) {
373 clearTimeout(timeout);
374 callback(err);
375 });
376};
377
378PeerSocket.prototype.transition = function(action, args, cb) {
379 var u = url.parse(action.href);
380 var path = u.pathname;
381
382 var body = new Buffer(querystring.stringify(args));
383
384 var host;
385 if(this.ws && this.ws.upgradeReq) {
386 host = this.ws.upgradeReq.headers.host
387 } else {
388 host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
389 }
390
391 var opts = {
392 agent: this.agent,
393 path: path,
394 method: action.method,
395 headers: {
396 'Content-Type': 'application/x-www-form-urlencoded',
397 'Host': host,
398 'Content-Length': body.length,
399 }
400 };
401
402 var req = http.request(opts, function(res) {
403 var buffer = [];
404 var len = 0;
405 res.on('readable', function() {
406 var data;
407 while (data = res.read()) {
408 buffer.push(data);
409 len += data.length;
410 }
411 });
412
413 res.on('end', function() {
414 var buf = Buffer.concat(buffer, len);
415 if (res.statusCode !== 200) {
416 return cb(new Error(buf.toString()));
417 }
418
419 var jsonBody = null;
420 try {
421 jsonBody = JSON.parse(buf.toString());
422 } catch(err) {
423 return cb(new Error('Failed to parse body'));
424 }
425 return cb(null, jsonBody);
426 });
427 }).on('error', cb);
428 req.end(body);
429};
430