UNPKG

10.4 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter;
2var util = require('util');
3var http = require('http');
4var url = require('url');
5var querystring = require('querystring');
6var spdy = require('spdy');
7var ws = require('ws');
8var SpdyAgent = require('./spdy_agent');
9var Logger = require('./logger');
10
11var STATES = {
12 'DISCONNECTED' : 0,
13 'CONNECTING': 1,
14 'CONNECTED': 2
15};
16
17var PeerSocket = module.exports = function(ws, name, peerRegistry, opts) {
18 EventEmitter.call(this);
19
20 if (!opts) {
21 opts = {};
22 }
23
24 var self = this;
25 this.state = STATES.DISCONNECTED;
26 this.name = name; // peers local id
27 this.agent = null;
28 this.subscriptions = {}; // { <topic>: <subscribed_count> }
29 this.connectionId = null;
30 this._pingTimer = null;
31 this._pingTimeout = Number(opts.pingTimeout) || (10 * 1000);
32 this._confirmationTimeout = Number(opts.confirmationTimeout) || 10 * 1000;
33 this.peerRegistry = peerRegistry;
34 this.logger = new Logger();
35
36 this.on('connecting', function() {
37 self.state = STATES.CONNECTING;
38 });
39
40 this.on('end', function() {
41 self.state = STATES.DISCONNECTED;
42 self._setRegistryStatus('disconnected');
43 this._cleanup();
44 });
45
46 this.on('error', function(err) {
47 self.state = STATES.DISCONNECTED;
48 self._setRegistryStatus('failed', err);
49 this._cleanup();
50 });
51
52 this.on('connected', function() {
53 self.state = STATES.CONNECTED;
54 self._setRegistryStatus('connected');
55 });
56
57 this.init(ws);
58};
59util.inherits(PeerSocket, EventEmitter);
60
61Object.keys(STATES).forEach(function(k) {
62 module.exports[k] = STATES[k];
63});
64
65PeerSocket.prototype.properties = function() {
66 return {
67 id: this.name,
68 connectionId: this.connectionId
69 };
70};
71
72PeerSocket.prototype.close = function() {
73 clearInterval(this._pingTimer);
74 this.ws.close();
75};
76
77PeerSocket.prototype._cleanup = function() {
78 if (!this.agent) {
79 return;
80 }
81
82 var streams = this.agent._spdyState.connection._spdyState.streams;
83 Object.keys(streams).forEach(function(k) {
84 streams[k].destroy();
85 });
86
87 this.agent.close();
88};
89
90PeerSocket.prototype.init = function(ws) {
91 var self = this;
92 self.emit('connecting');
93
94 if (ws) {
95 this._initWs(ws);
96 }
97
98 // delay because ws/spdy may not be fully established
99 setImmediate(function() {
100 // setup connection
101 self._setupConnection(function(err) {
102 if (err) {
103 self.close();
104 self.emit('error', err);
105 return;
106 }
107
108 if (self.ws.readyState !== ws.OPEN) {
109 // dissconnected already, reset
110 self.close();
111 self.emit('error', new Error('Peer Socket: Setup connection finished but ws not opened for peer "' + self.name + '".'));
112 return;
113 }
114
115 var subscriptions = self.subscriptions;
116 self.subscriptions = {}; // clear it before resubscribing
117 // subscribe to all prev subscriptions
118 Object.keys(subscriptions).forEach(function(event) {
119 self.subscribe(event);
120 });
121
122 self._startPingTimer();
123 self.emit('connected');
124 });
125 });
126};
127
128PeerSocket.prototype._setupConnection = function(cb, tries) {
129 var self = this;
130 var peerItem = {
131 direction: 'acceptor',
132 id: self.name,
133 status: 'connecting'
134 };
135
136 self.peerRegistry.add(peerItem, function(err, newPeer) {
137 if (err) {
138 return cb(err);
139 }
140
141 // confirm connection with peer
142 self.confirmConnection(self.connectionId, cb);
143 });
144};
145
146PeerSocket.prototype._initWs = function(ws) {
147 var self = this;
148 var u = url.parse(ws.upgradeReq.url, true); // parse out connectionId
149 this.ws = ws;
150 this.connectionId = u.query.connectionId;
151 this.ws._socket.removeAllListeners('data'); // Remove WebSocket data handler.
152
153 this.ws._socket.on('end', function() {
154 clearInterval(self._pingTimer);
155 self.emit('end');
156 });
157
158 this.ws.on('error', function(err) {
159 clearInterval(self._pingTimer);
160 self.emit('error', err);
161 });
162
163
164 this.agent = spdy.createAgent(SpdyAgent, {
165 host: this.name,
166 port: 80,
167 socket: this.ws._socket,
168 spdy: {
169 plain: true,
170 ssl: false
171 }
172 });
173
174 // TODO: Remove this when bug in agent socket removal is fixed.
175 this.agent.maxSockets = 150;
176 this.agent.on('push', this.onPushData.bind(this));
177 this.agent.on('error', function(err) {
178 self.close();
179 self.emit('error', err);
180 });
181};
182
183PeerSocket.prototype._startPingTimer = function() {
184 var self = this;
185 clearInterval(this._pingTimer);
186 this._pingTimer = setInterval(function() {
187 var timeout = setTimeout(function() {
188 self.close();
189 self.emit('error', new Error('Peer socket timed out'));
190 }, self._pingTimeout)
191
192 self.agent.ping(function(err) {
193 if (timeout) {
194 clearTimeout(timeout);
195 }
196 });
197 }, self._pingTimeout);
198
199};
200
201PeerSocket.prototype._setRegistryStatus = function(status, err, cb) {
202 var self = this;
203
204 if (typeof err === 'function') {
205 cb = err;
206 err = undefined;
207 }
208
209 if (!cb) {
210 cb = function(){};
211 }
212
213 this.peerRegistry.get(this.name, function(err, peer) {
214 if (err) {
215 return cb(err);
216 }
217
218 peer.status = status;
219 peer.connectionId = self.connectionId;
220 if (err) {
221 peer.error = err;
222 }
223 self.peerRegistry.save(peer, cb);
224 });
225};
226
227PeerSocket.prototype.onPushData = function(stream) {
228 var streamUrl = stream.url.slice(1);
229 var self = this;
230
231 if(!this.subscriptions[streamUrl]) {
232 stream.connection.end();
233 }
234
235 var encoding = stream.headers['content-type'] || 'application/json';
236 // remove additional parameters such as in `application/json; charset=utf-8`
237 if (encoding.indexOf(';') !== -1) {
238 encoding = encoding.split(';')[0].trim();
239 }
240 var length = Number(stream.headers['content-length']);
241 var data = new Buffer(length);
242 var idx = 0;
243 var d = null;
244 stream.on('readable', function() {
245 while (d = stream.read()) {
246 for (var i=0; i<d.length;i++) {
247 data[idx++] = d[i];
248 }
249 };
250 });
251
252 stream.on('error', function(err) {
253 console.error('error on push:', err);
254 });
255
256 stream.on('end', function() {
257 var body = null;
258 if (encoding === 'application/json') {
259 try {
260 body = JSON.parse(data.toString());
261 } catch (err) {
262 console.error('PeerSocket push data json parse error', err);
263 }
264 } else if(encoding === 'application/octet-stream') {
265 body = data;
266 }
267
268 self.emit(streamUrl, body);
269 self.emit('zetta-events', streamUrl, body)
270 stream.connection.close();
271 });
272};
273
274PeerSocket.prototype.subscribe = function(event, cb) {
275 if(!cb) {
276 cb = function() {};
277 }
278
279 var queryPrefix = 'query%2F';
280 if (event && event.slice(0, queryPrefix.length) === queryPrefix) {
281 event = decodeURIComponent(event);
282 }
283
284 // keep track of number of subscriptions
285 if (this.subscriptions[event] === undefined) {
286 this.subscriptions[event] = 0;
287 }
288 this.subscriptions[event]++;
289
290 // if already subscribed ignore
291 if (this.subscriptions[event] > 1) {
292 cb();
293 return;
294 }
295
296 var host;
297 if(this.ws && this.ws.upgradeReq) {
298 host = this.ws.upgradeReq.headers.host
299 } else {
300 host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
301 }
302
303 var opts = {
304 method: 'GET',
305 headers: {
306 'Content-Type': 'application/x-www-form-urlencoded',
307 'Host': host
308 },
309 path: '/servers/' + encodeURIComponent(this.name)
310 + '/events?topic=' + encodeURIComponent(event),
311 agent: this.agent
312 };
313
314 var req = http.request(opts, function(res) {
315 cb();
316 }).on('error', cb);
317 req.end();
318};
319
320PeerSocket.prototype.unsubscribe = function(event, cb) {
321 if(!cb) {
322 cb = function() {};
323 }
324
325 if (this.subscriptions[event] === undefined) {
326 this.subscriptions[event] = 0;
327 } else {
328 this.subscriptions[event]--;
329 if (this.subscriptions[event] < 0) {
330 this.subscriptions[event] = 0;
331 }
332 }
333
334 // only unsubscribe once all subscriptions count reaches 0
335 if (this.subscriptions[event] > 0) {
336 return cb();
337 }
338
339 var host;
340 if(this.ws && this.ws.upgradeReq) {
341 host = this.ws.upgradeReq.headers.host
342 } else {
343 host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
344 }
345
346 var body = new Buffer('topic='+event);
347 var opts = {
348 method: 'POST',
349 headers: {
350 'Content-Type': 'application/x-www-form-urlencoded',
351 'Host': host,
352 'Content-Length': body.length
353 },
354 path: '/servers/' + encodeURIComponent(this.name) + '/events/unsubscribe',
355 agent: this.agent
356 };
357
358 var req = http.request(opts, function(res) {
359 cb();
360 }).on('error', cb);
361 req.end(body);
362};
363
364PeerSocket.prototype.confirmConnection = function(connectionId, callback) {
365 var timeout = setTimeout(function() {
366 req.abort();
367 callback(new Error('Confirm connection timeout reached.'));
368 }, this._confirmationTimeout);
369
370 var opts = { agent: this.agent, path: '/_initiate_peer/' + connectionId };
371 var req = http.get(opts, function(res) {
372 clearTimeout(timeout);
373 if (res.statusCode !== 200) {
374 return callback(new Error('Unexpected status code'));
375 }
376 callback();
377 }).on('error', function(err) {
378 clearTimeout(timeout);
379 callback(err);
380 });
381};
382
383PeerSocket.prototype.transition = function(action, args, cb) {
384 var u = url.parse(action.href);
385 var path = u.pathname;
386
387 var body = new Buffer(querystring.stringify(args));
388
389 var host;
390 if(this.ws && this.ws.upgradeReq) {
391 host = this.ws.upgradeReq.headers.host
392 } else {
393 host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
394 }
395
396 var opts = {
397 agent: this.agent,
398 path: path,
399 method: action.method,
400 headers: {
401 'Content-Type': 'application/x-www-form-urlencoded',
402 'Host': host,
403 'Content-Length': body.length,
404 }
405 };
406
407 var req = http.request(opts, function(res) {
408 var buffer = [];
409 var len = 0;
410 res.on('readable', function() {
411 var data;
412 while (data = res.read()) {
413 buffer.push(data);
414 len += data.length;
415 }
416 });
417
418 res.on('end', function() {
419 var buf = Buffer.concat(buffer, len);
420 if (res.statusCode !== 200) {
421 return cb(new Error(buf.toString()));
422 }
423
424 var jsonBody = null;
425 try {
426 jsonBody = JSON.parse(buf.toString());
427 } catch(err) {
428 return cb(new Error('Failed to parse body'));
429 }
430 return cb(null, jsonBody);
431 });
432 }).on('error', cb);
433 req.end(body);
434};
435