1 | var EventEmitter = require('events').EventEmitter;
|
2 | var util = require('util');
|
3 | var http = require('http');
|
4 | var url = require('url');
|
5 | var querystring = require('querystring');
|
6 | var spdy = require('spdy');
|
7 | var ws = require('ws');
|
8 | var SpdyAgent = require('./spdy_agent');
|
9 | var Logger = require('./logger');
|
10 |
|
11 | var STATES = {
|
12 | 'DISCONNECTED' : 0,
|
13 | 'CONNECTING': 1,
|
14 | 'CONNECTED': 2
|
15 | };
|
16 |
|
17 | var PeerSocket = module.exports = function(ws, name, peerRegistry, opts) {
|
18 | EventEmitter.call(this);
|
19 |
|
20 | if (!opts) {
|
21 | opts = {};
|
22 | }
|
23 |
|
24 | var self = this;
|
25 | this.state = STATES.DISCONNECTED;
|
26 | this.name = name;
|
27 | this.agent = null;
|
28 | this.subscriptions = {};
|
29 | this.connectionId = null;
|
30 | this._pingTimer = null;
|
31 | this._pingTimeout = Number(opts.pingTimeout) || (10 * 1000);
|
32 | this._confirmationTimeout = Number(opts.confirmationTimeout) || 10 * 1000;
|
33 | this.peerRegistry = peerRegistry;
|
34 | this.logger = new Logger();
|
35 |
|
36 | this.on('connecting', function() {
|
37 | self.state = STATES.CONNECTING;
|
38 | });
|
39 |
|
40 | this.on('end', function() {
|
41 | self.state = STATES.DISCONNECTED;
|
42 | self._setRegistryStatus('disconnected');
|
43 | this._cleanup();
|
44 | });
|
45 |
|
46 | this.on('error', function(err) {
|
47 | self.state = STATES.DISCONNECTED;
|
48 | self._setRegistryStatus('failed', err);
|
49 | this._cleanup();
|
50 | });
|
51 |
|
52 | this.on('connected', function() {
|
53 | self.state = STATES.CONNECTED;
|
54 | self._setRegistryStatus('connected');
|
55 | });
|
56 |
|
57 | this.init(ws);
|
58 | };
|
59 | util.inherits(PeerSocket, EventEmitter);
|
60 |
|
61 | Object.keys(STATES).forEach(function(k) {
|
62 | module.exports[k] = STATES[k];
|
63 | });
|
64 |
|
65 | PeerSocket.prototype.properties = function() {
|
66 | return {
|
67 | id: this.name,
|
68 | connectionId: this.connectionId
|
69 | };
|
70 | };
|
71 |
|
72 | PeerSocket.prototype.close = function() {
|
73 | clearInterval(this._pingTimer);
|
74 | this.ws.close();
|
75 | };
|
76 |
|
77 | PeerSocket.prototype._cleanup = function() {
|
78 | if (!this.agent) {
|
79 | return;
|
80 | }
|
81 |
|
82 | var streams = this.agent._spdyState.connection._spdyState.streams;
|
83 | Object.keys(streams).forEach(function(k) {
|
84 | streams[k].destroy();
|
85 | });
|
86 |
|
87 | this.agent.close();
|
88 | };
|
89 |
|
90 | PeerSocket.prototype.init = function(ws) {
|
91 | var self = this;
|
92 | self.emit('connecting');
|
93 |
|
94 | if (ws) {
|
95 | this._initWs(ws);
|
96 | }
|
97 |
|
98 |
|
99 | setImmediate(function() {
|
100 |
|
101 | self._setupConnection(function(err) {
|
102 | if (err) {
|
103 | self.close();
|
104 | self.emit('error', err);
|
105 | return;
|
106 | }
|
107 |
|
108 | if (self.ws.readyState !== ws.OPEN) {
|
109 |
|
110 | self.close();
|
111 | self.emit('error', new Error('Peer Socket: Setup connection finished but ws not opened for peer "' + self.name + '".'));
|
112 | return;
|
113 | }
|
114 |
|
115 | var subscriptions = self.subscriptions;
|
116 | self.subscriptions = {};
|
117 |
|
118 | Object.keys(subscriptions).forEach(function(event) {
|
119 | self.subscribe(event);
|
120 | });
|
121 |
|
122 | self._startPingTimer();
|
123 | self.emit('connected');
|
124 | });
|
125 | });
|
126 | };
|
127 |
|
128 | PeerSocket.prototype._setupConnection = function(cb, tries) {
|
129 | var self = this;
|
130 | var peerItem = {
|
131 | direction: 'acceptor',
|
132 | id: self.name,
|
133 | status: 'connecting'
|
134 | };
|
135 |
|
136 | self.peerRegistry.add(peerItem, function(err, newPeer) {
|
137 | if (err) {
|
138 | return cb(err);
|
139 | }
|
140 |
|
141 |
|
142 | self.confirmConnection(self.connectionId, cb);
|
143 | });
|
144 | };
|
145 |
|
146 | PeerSocket.prototype._initWs = function(ws) {
|
147 | var self = this;
|
148 | var u = url.parse(ws.upgradeReq.url, true);
|
149 | this.ws = ws;
|
150 | this.connectionId = u.query.connectionId;
|
151 | this.ws._socket.removeAllListeners('data');
|
152 |
|
153 | this.ws._socket.on('end', function() {
|
154 | clearInterval(self._pingTimer);
|
155 | self.emit('end');
|
156 | });
|
157 |
|
158 | this.ws.on('error', function(err) {
|
159 | clearInterval(self._pingTimer);
|
160 | self.emit('error', err);
|
161 | });
|
162 |
|
163 |
|
164 | this.agent = spdy.createAgent(SpdyAgent, {
|
165 | host: this.name,
|
166 | port: 80,
|
167 | socket: this.ws._socket,
|
168 | spdy: {
|
169 | plain: true,
|
170 | ssl: false
|
171 | }
|
172 | });
|
173 |
|
174 |
|
175 | this.agent.maxSockets = 150;
|
176 | this.agent.on('push', this.onPushData.bind(this));
|
177 | this.agent.on('error', function(err) {
|
178 | self.close();
|
179 | self.emit('error', err);
|
180 | });
|
181 | };
|
182 |
|
183 | PeerSocket.prototype._startPingTimer = function() {
|
184 | var self = this;
|
185 | clearInterval(this._pingTimer);
|
186 | this._pingTimer = setInterval(function() {
|
187 | var timeout = setTimeout(function() {
|
188 | self.close();
|
189 | self.emit('error', new Error('Peer socket timed out'));
|
190 | }, self._pingTimeout)
|
191 |
|
192 | self.agent.ping(function(err) {
|
193 | if (timeout) {
|
194 | clearTimeout(timeout);
|
195 | }
|
196 | });
|
197 | }, self._pingTimeout);
|
198 |
|
199 | };
|
200 |
|
201 | PeerSocket.prototype._setRegistryStatus = function(status, err, cb) {
|
202 | var self = this;
|
203 |
|
204 | if (typeof err === 'function') {
|
205 | cb = err;
|
206 | err = undefined;
|
207 | }
|
208 |
|
209 | if (!cb) {
|
210 | cb = function(){};
|
211 | }
|
212 |
|
213 | this.peerRegistry.get(this.name, function(err, peer) {
|
214 | if (err) {
|
215 | return cb(err);
|
216 | }
|
217 |
|
218 | peer.status = status;
|
219 | peer.connectionId = self.connectionId;
|
220 | if (err) {
|
221 | peer.error = err;
|
222 | }
|
223 | self.peerRegistry.save(peer, cb);
|
224 | });
|
225 | };
|
226 |
|
227 | PeerSocket.prototype.onPushData = function(stream) {
|
228 | var streamUrl = stream.url.slice(1);
|
229 | var self = this;
|
230 |
|
231 | if(!this.subscriptions[streamUrl]) {
|
232 | stream.connection.end();
|
233 | }
|
234 |
|
235 | var encoding = stream.headers['content-type'] || 'application/json';
|
236 |
|
237 | if (encoding.indexOf(';') !== -1) {
|
238 | encoding = encoding.split(';')[0].trim();
|
239 | }
|
240 | var length = Number(stream.headers['content-length']);
|
241 | var data = new Buffer(length);
|
242 | var idx = 0;
|
243 | var d = null;
|
244 | stream.on('readable', function() {
|
245 | while (d = stream.read()) {
|
246 | for (var i=0; i<d.length;i++) {
|
247 | data[idx++] = d[i];
|
248 | }
|
249 | };
|
250 | });
|
251 |
|
252 | stream.on('error', function(err) {
|
253 | console.error('error on push:', err);
|
254 | });
|
255 |
|
256 | stream.on('end', function() {
|
257 | var body = null;
|
258 | if (encoding === 'application/json') {
|
259 | try {
|
260 | body = JSON.parse(data.toString());
|
261 | } catch (err) {
|
262 | console.error('PeerSocket push data json parse error', err);
|
263 | }
|
264 | } else if(encoding === 'application/octet-stream') {
|
265 | body = data;
|
266 | }
|
267 |
|
268 | self.emit(streamUrl, body);
|
269 | self.emit('zetta-events', streamUrl, body)
|
270 | stream.connection.close();
|
271 | });
|
272 | };
|
273 |
|
274 | PeerSocket.prototype.subscribe = function(event, cb) {
|
275 | if(!cb) {
|
276 | cb = function() {};
|
277 | }
|
278 |
|
279 | var queryPrefix = 'query%2F';
|
280 | if (event && event.slice(0, queryPrefix.length) === queryPrefix) {
|
281 | event = decodeURIComponent(event);
|
282 | }
|
283 |
|
284 |
|
285 | if (this.subscriptions[event] === undefined) {
|
286 | this.subscriptions[event] = 0;
|
287 | }
|
288 | this.subscriptions[event]++;
|
289 |
|
290 |
|
291 | if (this.subscriptions[event] > 1) {
|
292 | cb();
|
293 | return;
|
294 | }
|
295 |
|
296 | var host;
|
297 | if(this.ws && this.ws.upgradeReq) {
|
298 | host = this.ws.upgradeReq.headers.host
|
299 | } else {
|
300 | host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
|
301 | }
|
302 |
|
303 | var opts = {
|
304 | method: 'GET',
|
305 | headers: {
|
306 | 'Content-Type': 'application/x-www-form-urlencoded',
|
307 | 'Host': host
|
308 | },
|
309 | path: '/servers/' + encodeURIComponent(this.name)
|
310 | + '/events?topic=' + encodeURIComponent(event),
|
311 | agent: this.agent
|
312 | };
|
313 |
|
314 | var req = http.request(opts, function(res) {
|
315 | cb();
|
316 | }).on('error', cb);
|
317 | req.end();
|
318 | };
|
319 |
|
320 | PeerSocket.prototype.unsubscribe = function(event, cb) {
|
321 | if(!cb) {
|
322 | cb = function() {};
|
323 | }
|
324 |
|
325 | if (this.subscriptions[event] === undefined) {
|
326 | this.subscriptions[event] = 0;
|
327 | } else {
|
328 | this.subscriptions[event]--;
|
329 | if (this.subscriptions[event] < 0) {
|
330 | this.subscriptions[event] = 0;
|
331 | }
|
332 | }
|
333 |
|
334 |
|
335 | if (this.subscriptions[event] > 0) {
|
336 | return cb();
|
337 | }
|
338 |
|
339 | var host;
|
340 | if(this.ws && this.ws.upgradeReq) {
|
341 | host = this.ws.upgradeReq.headers.host
|
342 | } else {
|
343 | host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
|
344 | }
|
345 |
|
346 | var body = new Buffer('topic='+event);
|
347 | var opts = {
|
348 | method: 'POST',
|
349 | headers: {
|
350 | 'Content-Type': 'application/x-www-form-urlencoded',
|
351 | 'Host': host,
|
352 | 'Content-Length': body.length
|
353 | },
|
354 | path: '/servers/' + encodeURIComponent(this.name) + '/events/unsubscribe',
|
355 | agent: this.agent
|
356 | };
|
357 |
|
358 | var req = http.request(opts, function(res) {
|
359 | cb();
|
360 | }).on('error', cb);
|
361 | req.end(body);
|
362 | };
|
363 |
|
364 | PeerSocket.prototype.confirmConnection = function(connectionId, callback) {
|
365 | var timeout = setTimeout(function() {
|
366 | req.abort();
|
367 | callback(new Error('Confirm connection timeout reached.'));
|
368 | }, this._confirmationTimeout);
|
369 |
|
370 | var opts = { agent: this.agent, path: '/_initiate_peer/' + connectionId };
|
371 | var req = http.get(opts, function(res) {
|
372 | clearTimeout(timeout);
|
373 | if (res.statusCode !== 200) {
|
374 | return callback(new Error('Unexpected status code'));
|
375 | }
|
376 | callback();
|
377 | }).on('error', function(err) {
|
378 | clearTimeout(timeout);
|
379 | callback(err);
|
380 | });
|
381 | };
|
382 |
|
383 | PeerSocket.prototype.transition = function(action, args, cb) {
|
384 | var u = url.parse(action.href);
|
385 | var path = u.pathname;
|
386 |
|
387 | var body = new Buffer(querystring.stringify(args));
|
388 |
|
389 | var host;
|
390 | if(this.ws && this.ws.upgradeReq) {
|
391 | host = this.ws.upgradeReq.headers.host
|
392 | } else {
|
393 | host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
|
394 | }
|
395 |
|
396 | var opts = {
|
397 | agent: this.agent,
|
398 | path: path,
|
399 | method: action.method,
|
400 | headers: {
|
401 | 'Content-Type': 'application/x-www-form-urlencoded',
|
402 | 'Host': host,
|
403 | 'Content-Length': body.length,
|
404 | }
|
405 | };
|
406 |
|
407 | var req = http.request(opts, function(res) {
|
408 | var buffer = [];
|
409 | var len = 0;
|
410 | res.on('readable', function() {
|
411 | var data;
|
412 | while (data = res.read()) {
|
413 | buffer.push(data);
|
414 | len += data.length;
|
415 | }
|
416 | });
|
417 |
|
418 | res.on('end', function() {
|
419 | var buf = Buffer.concat(buffer, len);
|
420 | if (res.statusCode !== 200) {
|
421 | return cb(new Error(buf.toString()));
|
422 | }
|
423 |
|
424 | var jsonBody = null;
|
425 | try {
|
426 | jsonBody = JSON.parse(buf.toString());
|
427 | } catch(err) {
|
428 | return cb(new Error('Failed to parse body'));
|
429 | }
|
430 | return cb(null, jsonBody);
|
431 | });
|
432 | }).on('error', cb);
|
433 | req.end(body);
|
434 | };
|
435 |
|