1 | var EventEmitter = require('events').EventEmitter;
|
2 | var util = require('util');
|
3 | var assert = require('assert');
|
4 | var http = require('http');
|
5 | var url = require('url');
|
6 | var querystring = require('querystring');
|
7 | var spdy = require('spdy');
|
8 | var ws = require('ws');
|
9 | var SpdyAgent = require('./spdy_agent');
|
10 | var Logger = require('./logger');
|
11 |
|
12 | var STATES = {
|
13 | 'DISCONNECTED' : 0,
|
14 | 'CONNECTING': 1,
|
15 | 'CONNECTED': 2
|
16 | };
|
17 |
|
18 | var PeerSocket = module.exports = function(ws, request, name, peerRegistry, opts) {
|
19 | EventEmitter.call(this);
|
20 |
|
21 | if (!opts) {
|
22 | opts = {};
|
23 | }
|
24 |
|
25 | var self = this;
|
26 | this.state = STATES.DISCONNECTED;
|
27 | this.name = name;
|
28 | this.agent = null;
|
29 | this.subscriptions = {};
|
30 | this.connectionId = null;
|
31 | this._pingTimer = null;
|
32 | this._pingTimeout = Number(opts.pingTimeout) || (10 * 1000);
|
33 | this._confirmationTimeout = Number(opts.confirmationTimeout) || 10 * 1000;
|
34 | this.peerRegistry = peerRegistry;
|
35 | this.logger = new Logger();
|
36 |
|
37 | this.on('connecting', function() {
|
38 | self.state = STATES.CONNECTING;
|
39 | });
|
40 |
|
41 | this.on('end', function() {
|
42 | self.state = STATES.DISCONNECTED;
|
43 | self._setRegistryStatus('disconnected');
|
44 | this._cleanup();
|
45 | });
|
46 |
|
47 | this.on('error', function(err) {
|
48 | self.state = STATES.DISCONNECTED;
|
49 | self._setRegistryStatus('failed', err);
|
50 | this._cleanup();
|
51 | });
|
52 |
|
53 | this.on('connected', function() {
|
54 | self.state = STATES.CONNECTED;
|
55 | self._setRegistryStatus('connected');
|
56 | });
|
57 |
|
58 | this.init(ws, request);
|
59 | };
|
60 | util.inherits(PeerSocket, EventEmitter);
|
61 |
|
62 | Object.keys(STATES).forEach(function(k) {
|
63 | module.exports[k] = STATES[k];
|
64 | });
|
65 |
|
66 | PeerSocket.prototype.properties = function() {
|
67 | return {
|
68 | id: this.name,
|
69 | connectionId: this.connectionId
|
70 | };
|
71 | };
|
72 |
|
73 | PeerSocket.prototype.close = function() {
|
74 | clearInterval(this._pingTimer);
|
75 |
|
76 |
|
77 |
|
78 |
|
79 |
|
80 |
|
81 |
|
82 |
|
83 |
|
84 |
|
85 |
|
86 |
|
87 |
|
88 |
|
89 |
|
90 |
|
91 | if (this.ws && this.ws._socket) {
|
92 | this.ws._socket.end();
|
93 | }
|
94 | };
|
95 |
|
96 | PeerSocket.prototype._cleanup = function() {
|
97 | if (!this.agent) {
|
98 | return;
|
99 | }
|
100 |
|
101 |
|
102 |
|
103 |
|
104 |
|
105 |
|
106 |
|
107 |
|
108 | this.agent.close();
|
109 | };
|
110 |
|
111 | PeerSocket.prototype.init = function(ws, request) {
|
112 | assert(ws);
|
113 | assert(request);
|
114 |
|
115 | var self = this;
|
116 | self.emit('connecting');
|
117 | this._initWs(ws, request);
|
118 |
|
119 |
|
120 | setImmediate(function() {
|
121 |
|
122 | self._setupConnection(function(err) {
|
123 | if (err) {
|
124 | self.close();
|
125 | self.emit('error', err);
|
126 | return;
|
127 | }
|
128 |
|
129 | if (self.ws.readyState !== ws.OPEN) {
|
130 |
|
131 | self.close();
|
132 | self.emit('error', new Error('Peer Socket: Setup connection finished but ws not opened for peer "' + self.name + '".'));
|
133 | return;
|
134 | }
|
135 |
|
136 | var subscriptions = self.subscriptions;
|
137 | self.subscriptions = {};
|
138 |
|
139 | Object.keys(subscriptions).forEach(function(event) {
|
140 | self.subscribe(event);
|
141 | });
|
142 |
|
143 | self._startPingTimer();
|
144 |
|
145 | self.emit('connected');
|
146 | });
|
147 | });
|
148 | };
|
149 |
|
150 | PeerSocket.prototype._setupConnection = function(cb, tries) {
|
151 | var self = this;
|
152 | var peerItem = {
|
153 | direction: 'acceptor',
|
154 | id: self.name,
|
155 | status: 'connecting'
|
156 | };
|
157 |
|
158 | self.peerRegistry.add(peerItem, function(err, newPeer) {
|
159 | if (err) {
|
160 | return cb(err);
|
161 | }
|
162 |
|
163 |
|
164 | self.confirmConnection(self.connectionId, cb);
|
165 | });
|
166 | };
|
167 |
|
168 | PeerSocket.prototype._initWs = function(ws, request) {
|
169 | var self = this;
|
170 |
|
171 | this.request = request;
|
172 | this.ws = ws;
|
173 |
|
174 | var u = url.parse(request.url, true);
|
175 | this.connectionId = u.query.connectionId;
|
176 | this.ws._socket.removeAllListeners('data');
|
177 |
|
178 | this.ws._socket.on('end', function() {
|
179 | clearInterval(self._pingTimer);
|
180 | self.emit('end');
|
181 | });
|
182 |
|
183 | this.ws.on('error', function(err) {
|
184 | clearInterval(self._pingTimer);
|
185 | self.emit('error', err);
|
186 | });
|
187 |
|
188 | this.agent = spdy.createAgent(SpdyAgent, {
|
189 |
|
190 |
|
191 |
|
192 | port: 80,
|
193 | socket: this.ws._socket,
|
194 | spdy: {
|
195 | plain: true,
|
196 | ssl: false,
|
197 | protocol: 'spdy/3.1'
|
198 | }
|
199 | });
|
200 |
|
201 |
|
202 |
|
203 |
|
204 |
|
205 | this.agent.once('_connect', function(err) {
|
206 | self.agent._spdyState.connection.once('error', function(err) {
|
207 | self.close();
|
208 | self.emit('error', err);
|
209 | })
|
210 | });
|
211 |
|
212 |
|
213 | this.agent.maxSockets = 150;
|
214 |
|
215 | this.agent.on('error', function(err) {
|
216 | self.close();
|
217 | self.emit('error', err);
|
218 | });
|
219 | };
|
220 |
|
221 | PeerSocket.prototype._startPingTimer = function() {
|
222 | var self = this;
|
223 | clearInterval(this._pingTimer);
|
224 | this._pingTimer = setInterval(function() {
|
225 | var timeout = setTimeout(function() {
|
226 | self.close();
|
227 | self.emit('error', new Error('Peer socket timed out'));
|
228 | }, self._pingTimeout)
|
229 |
|
230 | self.agent._spdyState.connection.ping(function(err) {
|
231 | if (timeout) {
|
232 | clearTimeout(timeout);
|
233 | }
|
234 | });
|
235 | }, self._pingTimeout);
|
236 |
|
237 | };
|
238 |
|
239 | PeerSocket.prototype._setRegistryStatus = function(status, err, cb) {
|
240 | var self = this;
|
241 |
|
242 | if (typeof err === 'function') {
|
243 | cb = err;
|
244 | err = undefined;
|
245 | }
|
246 |
|
247 | if (!cb) {
|
248 | cb = function(){};
|
249 | }
|
250 |
|
251 | this.peerRegistry.get(this.name, function(err, peer) {
|
252 | if (err) {
|
253 | return cb(err);
|
254 | }
|
255 |
|
256 | peer.status = status;
|
257 | peer.connectionId = self.connectionId;
|
258 | if (err) {
|
259 | peer.error = err;
|
260 | }
|
261 | self.peerRegistry.save(peer, cb);
|
262 | });
|
263 | };
|
264 |
|
265 | PeerSocket.prototype.onPushData = function(stream) {
|
266 |
|
267 |
|
268 | var streamUrl = stream.path.slice(1);
|
269 | var self = this;
|
270 |
|
271 |
|
272 |
|
273 |
|
274 | var checkTopic = streamUrl;
|
275 | if(!this.subscriptions[checkTopic]) {
|
276 |
|
277 |
|
278 |
|
279 | }
|
280 |
|
281 | var encoding = stream.headers['content-type'] || 'application/json';
|
282 |
|
283 | if (encoding.indexOf(';') !== -1) {
|
284 | encoding = encoding.split(';')[0].trim();
|
285 | }
|
286 | var length = Number(stream.headers['content-length']);
|
287 | var data = new Buffer(length);
|
288 | var idx = 0;
|
289 | var d = null;
|
290 | stream.on('readable', function() {
|
291 | while (d = stream.read()) {
|
292 | for (var i=0; i<d.length;i++) {
|
293 | data[idx++] = d[i];
|
294 | }
|
295 | };
|
296 | });
|
297 |
|
298 | stream.on('error', function(err) {
|
299 | console.error('error on push:', err);
|
300 | });
|
301 |
|
302 | stream.on('end', function() {
|
303 | var body = null;
|
304 | if (encoding === 'application/json') {
|
305 | try {
|
306 | body = JSON.parse(data.toString());
|
307 | } catch (err) {
|
308 | console.error('PeerSocket push data json parse error', err);
|
309 | }
|
310 | } else if(encoding === 'application/octet-stream') {
|
311 | body = data;
|
312 | }
|
313 |
|
314 | self.emit(streamUrl, body);
|
315 | self.emit('zetta-events', streamUrl, body)
|
316 |
|
317 |
|
318 |
|
319 | });
|
320 | };
|
321 |
|
322 | PeerSocket.prototype.subscribe = function(event, cb) {
|
323 | var self = this;
|
324 | if(!cb) {
|
325 | cb = function() {};
|
326 | }
|
327 |
|
328 |
|
329 |
|
330 |
|
331 |
|
332 | var callbackHasBeenCalled = false;
|
333 | var wrappedCallback = function(err) {
|
334 | if (!callbackHasBeenCalled) {
|
335 | callbackHasBeenCalled = true;
|
336 | cb();
|
337 | } else if (err && self.state == STATES.CONNECTED) {
|
338 | console.error('Subscription request returned an error after callback was called: ', err);
|
339 | }
|
340 | }
|
341 |
|
342 | var queryPrefix = 'query%2F';
|
343 | if (event && event.slice(0, queryPrefix.length) === queryPrefix) {
|
344 | event = decodeURIComponent(event);
|
345 | }
|
346 |
|
347 |
|
348 | if (this.subscriptions[event] === undefined) {
|
349 | this.subscriptions[event] = 0;
|
350 | }
|
351 | this.subscriptions[event]++;
|
352 |
|
353 |
|
354 | if (this.subscriptions[event] > 1) {
|
355 | wrappedCallback();
|
356 | return;
|
357 | }
|
358 |
|
359 | var host;
|
360 | if(this.ws && this.request) {
|
361 | host = this.request.headers.host
|
362 | } else {
|
363 | host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
|
364 | }
|
365 |
|
366 | var opts = {
|
367 | method: 'GET',
|
368 | headers: {
|
369 | 'Content-Type': 'application/x-www-form-urlencoded',
|
370 | 'Host': host
|
371 | },
|
372 | path: '/servers/' + encodeURIComponent(this.name)
|
373 | + '/events?topic=' + encodeURIComponent(event),
|
374 | agent: this.agent
|
375 | };
|
376 |
|
377 |
|
378 |
|
379 |
|
380 |
|
381 |
|
382 |
|
383 | var req = http.request(opts, function(res) {
|
384 |
|
385 | wrappedCallback();
|
386 | }).on('error', wrappedCallback);
|
387 |
|
388 |
|
389 | req.on('push', this.onPushData.bind(this));
|
390 |
|
391 | req.end();
|
392 | };
|
393 |
|
394 | PeerSocket.prototype.unsubscribe = function(event, cb) {
|
395 | if(!cb) {
|
396 | cb = function() {};
|
397 | }
|
398 |
|
399 | if (this.subscriptions[event] === undefined) {
|
400 | this.subscriptions[event] = 0;
|
401 | } else {
|
402 | this.subscriptions[event]--;
|
403 | if (this.subscriptions[event] < 0) {
|
404 | this.subscriptions[event] = 0;
|
405 | }
|
406 | }
|
407 |
|
408 |
|
409 | if (this.subscriptions[event] > 0) {
|
410 | return cb();
|
411 | }
|
412 |
|
413 | var host;
|
414 | if(this.ws && this.request) {
|
415 | host = this.request.headers.host
|
416 | } else {
|
417 | host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
|
418 | }
|
419 |
|
420 | var body = new Buffer('topic='+event);
|
421 | var opts = {
|
422 | method: 'POST',
|
423 | headers: {
|
424 | 'Content-Type': 'application/x-www-form-urlencoded',
|
425 | 'Host': host,
|
426 | 'Content-Length': body.length
|
427 | },
|
428 | path: '/servers/' + encodeURIComponent(this.name) + '/events/unsubscribe',
|
429 | agent: this.agent
|
430 | };
|
431 |
|
432 | var req = http.request(opts, function(res) {
|
433 |
|
434 | cb();
|
435 | }).on('error', cb);
|
436 | req.end(body);
|
437 | };
|
438 |
|
439 | PeerSocket.prototype.confirmConnection = function(connectionId, callback) {
|
440 | var timeout = setTimeout(function() {
|
441 | req.abort();
|
442 | callback(new Error('Confirm connection timeout reached.'));
|
443 | }, this._confirmationTimeout);
|
444 |
|
445 | var opts = { agent: this.agent, path: '/_initiate_peer/' + connectionId };
|
446 | var req = http.get(opts, function(res) {
|
447 | clearTimeout(timeout);
|
448 | if (res.statusCode !== 200) {
|
449 | return callback(new Error('Unexpected status code'));
|
450 | }
|
451 | callback();
|
452 | }).on('error', function(err) {
|
453 | clearTimeout(timeout);
|
454 | callback(err);
|
455 | });
|
456 | };
|
457 |
|
458 | PeerSocket.prototype.transition = function(action, args, cb) {
|
459 | var u = url.parse(action.href);
|
460 | var path = u.pathname;
|
461 |
|
462 | var body = new Buffer(querystring.stringify(args));
|
463 |
|
464 | var host;
|
465 | if(this.ws && this.request) {
|
466 | host = this.request.headers.host
|
467 | } else {
|
468 | host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
|
469 | }
|
470 |
|
471 | var opts = {
|
472 | agent: this.agent,
|
473 | path: path,
|
474 | method: action.method,
|
475 | headers: {
|
476 | 'Content-Type': 'application/x-www-form-urlencoded',
|
477 | 'Host': host,
|
478 | 'Content-Length': body.length,
|
479 | }
|
480 | };
|
481 |
|
482 | var req = http.request(opts, function(res) {
|
483 | var buffer = [];
|
484 | var len = 0;
|
485 | res.on('readable', function() {
|
486 | var data;
|
487 | while (data = res.read()) {
|
488 | buffer.push(data);
|
489 | len += data.length;
|
490 | }
|
491 | });
|
492 |
|
493 | res.on('end', function() {
|
494 | var buf = Buffer.concat(buffer, len);
|
495 | if (res.statusCode !== 200) {
|
496 | return cb(new Error(buf.toString()));
|
497 | }
|
498 |
|
499 | var jsonBody = null;
|
500 | try {
|
501 | jsonBody = JSON.parse(buf.toString());
|
502 | } catch(err) {
|
503 | return cb(new Error('Failed to parse body'));
|
504 | }
|
505 | return cb(null, jsonBody);
|
506 | });
|
507 | }).on('error', cb);
|
508 | req.end(body);
|
509 | };
|
510 |
|