UNPKG

9.85 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter;
2var util = require('util');
3var http = require('http');
4var url = require('url');
5var querystring = require('querystring');
6var spdy = require('spdy');
7var ws = require('ws');
8var SpdyAgent = require('./spdy_agent');
9var Logger = require('./logger');
10
11var STATES = {
12 'DISCONNECTED' : 0,
13 'CONNECTING': 1,
14 'CONNECTED': 2
15};
16
17var PeerSocket = module.exports = function(ws, name, peerRegistry) {
18 EventEmitter.call(this);
19
20 var self = this;
21 this.state = STATES.DISCONNECTED;
22 this.name = name; // peers local id
23 this.agent = null;
24 this.subscriptions = {}; // { <topic>: <subscribed_count> }
25 this.connectionId = null;
26 this._pingTimer = null;
27 this._pingTimeout = 10 * 1000;
28 this.peerRegistry = peerRegistry;
29 this.logger = new Logger();
30
31 this.on('connecting', function() {
32 self.state = STATES.CONNECTING;
33 });
34
35 this.on('end', function() {
36 self.state = STATES.DISCONNECTED;
37 self._setRegistryStatus('disconnected');
38 this._cleanup();
39 });
40
41 this.on('error', function(err) {
42 self.state = STATES.DISCONNECTED;
43 self._setRegistryStatus('failed', err);
44 this._cleanup();
45 });
46
47 this.on('connected', function() {
48 self.state = STATES.CONNECTED;
49 self._setRegistryStatus('connected');
50 });
51
52 this.init(ws);
53};
54util.inherits(PeerSocket, EventEmitter);
55
56Object.keys(STATES).forEach(function(k) {
57 module.exports[k] = STATES[k];
58});
59
60PeerSocket.prototype.properties = function() {
61 return {
62 id: this.name,
63 connectionId: this.connectionId
64 };
65};
66
67PeerSocket.prototype.close = function() {
68 clearInterval(this._pingTimer);
69 this.ws.close();
70};
71
72PeerSocket.prototype._cleanup = function() {
73 if (!this.agent) {
74 return;
75 }
76
77 var streams = this.agent._spdyState.connection._spdyState.streams;
78 Object.keys(streams).forEach(function(k) {
79 streams[k].destroy();
80 });
81
82 this.agent.close();
83};
84
85PeerSocket.prototype.init = function(ws) {
86 var self = this;
87 self.emit('connecting');
88
89 if (ws) {
90 this._initWs(ws);
91 }
92
93 // delay because ws/spdy may not be fully established
94 setImmediate(function() {
95 // setup connection
96 self._setupConnection(function(err) {
97 if (err) {
98 self.close();
99 self.emit('error', err);
100 return;
101 }
102
103 if (self.ws.readyState !== ws.OPEN) {
104 // dissconnected already, reset
105 self.close();
106 self.emit('error', new Error('Peer Socket: Setup connection finished but ws not opened for peer "' + self.name + '".'));
107 return;
108 }
109
110 var subscriptions = self.subscriptions;
111 self.subscriptions = {}; // clear it before resubscribing
112 // subscribe to all prev subscriptions
113 Object.keys(subscriptions).forEach(function(event) {
114 self.subscribe(event);
115 });
116
117 self._startPingTimer();
118 self.emit('connected');
119 });
120 });
121};
122
123PeerSocket.prototype._setupConnection = function(cb, tries) {
124 var self = this;
125 var peerItem = {
126 direction: 'acceptor',
127 id: self.name,
128 status: 'connecting'
129 };
130
131 self.peerRegistry.add(peerItem, function(err, newPeer) {
132 if (err) {
133 return cb(err);
134 }
135
136 // confirm connection with peer
137 self.confirmConnection(self.connectionId, cb);
138 });
139};
140
141PeerSocket.prototype._initWs = function(ws) {
142 var self = this;
143 var u = url.parse(ws.upgradeReq.url, true); // parse out connectionId
144 this.ws = ws;
145 this.connectionId = u.query.connectionId;
146 this.ws._socket.removeAllListeners('data'); // Remove WebSocket data handler.
147
148 this.ws._socket.on('end', function() {
149 clearInterval(self._pingTimer);
150 self.emit('end');
151 });
152
153 this.ws.on('error', function(err) {
154 clearInterval(self._pingTimer);
155 self.emit('error', err);
156 });
157
158
159 this.agent = spdy.createAgent(SpdyAgent, {
160 host: this.name,
161 port: 80,
162 socket: this.ws._socket,
163 spdy: {
164 plain: true,
165 ssl: false
166 }
167 });
168
169 // TODO: Remove this when bug in agent socket removal is fixed.
170 this.agent.maxSockets = 150;
171 this.agent.on('push', this.onPushData.bind(this));
172 this.agent.on('error', function(err) {
173 self.close();
174 self.emit('error', err);
175 });
176};
177
178PeerSocket.prototype._startPingTimer = function() {
179 var self = this;
180 clearInterval(this._pingTimer);
181 this._pingTimer = setInterval(function() {
182 var timeout = setTimeout(function() {
183 self.close();
184 self.emit('error', new Error('Peer socket timed out'));
185 }, self._pingTimeout)
186
187 self.agent.ping(function(err) {
188 if (timeout) {
189 clearTimeout(timeout);
190 }
191 });
192 }, self._pingTimeout);
193
194};
195
196PeerSocket.prototype._setRegistryStatus = function(status, err, cb) {
197 var self = this;
198
199 if (typeof err === 'function') {
200 cb = err;
201 err = undefined;
202 }
203
204 if (!cb) {
205 cb = function(){};
206 }
207
208 this.peerRegistry.get(this.name, function(err, peer) {
209 if (err) {
210 return cb(err);
211 }
212
213 peer.status = status;
214 peer.connectionId = self.connectionId;
215 if (err) {
216 peer.error = err;
217 }
218 self.peerRegistry.save(peer, cb);
219 });
220};
221
222PeerSocket.prototype.onPushData = function(stream) {
223 var streamUrl = stream.url.slice(1);
224 var self = this;
225
226 if(!this.subscriptions[streamUrl]) {
227 stream.connection.end();
228 }
229
230 var encoding = stream.headers['x-event-encoding'] || 'json';
231 var length = Number(stream.headers['content-length']);
232 var data = new Buffer(length);
233 var idx = 0;
234 var d = null;
235 stream.on('readable', function() {
236 while (d = stream.read()) {
237 for (var i=0; i<d.length;i++) {
238 data[idx++] = d[i];
239 }
240 };
241 });
242
243 stream.on('error', function(err) {
244 console.error('error on push:', err);
245 });
246
247 stream.on('end', function() {
248 var body = null;
249 if (encoding === 'json') {
250 try {
251 body = JSON.parse(data.toString());
252 } catch (err) {
253 console.error('PeerSocket push data json parse error', err);
254 }
255 } else if(encoding === 'buffer') {
256 body = data;
257 }
258 self.emit(streamUrl, body);
259 stream.connection.close();
260 });
261};
262
263PeerSocket.prototype.subscribe = function(event, cb) {
264 if(!cb) {
265 cb = function() {};
266 }
267
268 var queryPrefix = 'query%2F';
269 if (event && event.slice(0, queryPrefix.length) === queryPrefix) {
270 event = decodeURIComponent(event);
271 }
272
273 // keep track of number of subscriptions
274 if (this.subscriptions[event] === undefined) {
275 this.subscriptions[event] = 0;
276 }
277 this.subscriptions[event]++;
278
279 // if already subscribed ignore
280 if (this.subscriptions[event] > 1) {
281 cb();
282 return;
283 }
284
285 var host;
286 if(this.ws && this.ws.upgradeReq) {
287 host = this.ws.upgradeReq.headers.host
288 } else {
289 host = 'fog.argo.cx';
290 }
291
292 var opts = {
293 method: 'GET',
294 headers: {
295 'Content-Type': 'application/x-www-form-urlencoded',
296 'Host': host
297 },
298 path: '/servers/' + encodeURIComponent(this.name)
299 + '/events?topic=' + encodeURIComponent(event),
300 agent: this.agent
301 };
302
303 var req = http.request(opts, function(res) {
304 cb();
305 }).on('error', cb);
306 req.end();
307};
308
309PeerSocket.prototype.unsubscribe = function(event, cb) {
310 if(!cb) {
311 cb = function() {};
312 }
313
314 if (this.subscriptions[event] === undefined) {
315 this.subscriptions[event] = 0;
316 } else {
317 this.subscriptions[event]--;
318 if (this.subscriptions[event] < 0) {
319 this.subscriptions[event] = 0;
320 }
321 }
322
323 // only unsubscribe once all subscriptions count reaches 0
324 if (this.subscriptions[event] > 0) {
325 return cb();
326 }
327
328 var host;
329 if(this.ws && this.ws.upgradeReq) {
330 host = this.ws.upgradeReq.headers.host
331 } else {
332 host = 'fog.argo.cx';
333 }
334
335 var body = new Buffer('topic='+event);
336 var opts = {
337 method: 'POST',
338 headers: {
339 'Content-Type': 'application/x-www-form-urlencoded',
340 'Host': host,
341 'Content-Length': body.length
342 },
343 path: '/servers/' + encodeURIComponent(this.name) + '/events/unsubscribe',
344 agent: this.agent
345 };
346
347 var req = http.request(opts, function(res) {
348 cb();
349 }).on('error', cb);
350 req.end(body);
351};
352
353PeerSocket.prototype.confirmConnection = function(connectionId, callback) {
354 var timeout = setTimeout(function() {
355 req.abort();
356 callback(new Error('Confirm connection timeout reached.'));
357 }, this._pingTimeout);
358
359 var opts = { agent: this.agent, path: '/_initiate_peer/' + connectionId };
360 var req = http.get(opts, function(res) {
361 clearTimeout(timeout);
362 if (res.statusCode !== 200) {
363 return callback(new Error('Unexpected status code'));
364 }
365 callback();
366 }).on('error', function(err) {
367 clearTimeout(timeout);
368 callback(err);
369 });
370};
371
372PeerSocket.prototype.transition = function(action, args, cb) {
373 var u = url.parse(action.href);
374 var path = u.pathname;
375
376 var body = new Buffer(querystring.stringify(args));
377
378 var host;
379 if(this.ws && this.ws.upgradeReq) {
380 host = this.ws.upgradeReq.headers.host
381 } else {
382 host = 'fog.argo.cx';
383 }
384
385 var opts = {
386 agent: this.agent,
387 path: path,
388 method: action.method,
389 headers: {
390 'Content-Type': 'application/x-www-form-urlencoded',
391 'Host': host,
392 'Content-Length': body.length,
393 }
394 };
395
396 var req = http.request(opts, function(res) {
397 var buffer = [];
398 var len = 0;
399 res.on('readable', function() {
400 var data;
401 while (data = res.read()) {
402 buffer.push(data);
403 len += data.length;
404 }
405 });
406
407 res.on('end', function() {
408 var buf = Buffer.concat(buffer, len);
409 if (res.statusCode !== 200) {
410 return cb(new Error(buf.toString()));
411 }
412
413 var jsonBody = null;
414 try {
415 jsonBody = JSON.parse(buf.toString());
416 } catch(err) {
417 return cb(new Error('Failed to parse body'));
418 }
419 return cb(null, jsonBody);
420 });
421 }).on('error', cb);
422 req.end(body);
423};
424