1 | var EventEmitter = require('events').EventEmitter;
|
2 | var util = require('util');
|
3 | var http = require('http');
|
4 | var url = require('url');
|
5 | var querystring = require('querystring');
|
6 | var spdy = require('spdy');
|
7 | var ws = require('ws');
|
8 | var SpdyAgent = require('./spdy_agent');
|
9 | var Logger = require('./logger');
|
10 |
|
11 | var STATES = {
|
12 | 'DISCONNECTED' : 0,
|
13 | 'CONNECTING': 1,
|
14 | 'CONNECTED': 2
|
15 | };
|
16 |
|
17 | var PeerSocket = module.exports = function(ws, name, peerRegistry) {
|
18 | EventEmitter.call(this);
|
19 |
|
20 | var self = this;
|
21 | this.state = STATES.DISCONNECTED;
|
22 | this.name = name;
|
23 | this.agent = null;
|
24 | this.subscriptions = {};
|
25 | this.connectionId = null;
|
26 | this._pingTimer = null;
|
27 | this._pingTimeout = 10 * 1000;
|
28 | this.peerRegistry = peerRegistry;
|
29 | this.logger = new Logger();
|
30 |
|
31 | this.on('connecting', function() {
|
32 | self.state = STATES.CONNECTING;
|
33 | });
|
34 |
|
35 | this.on('end', function() {
|
36 | self.state = STATES.DISCONNECTED;
|
37 | self._setRegistryStatus('disconnected');
|
38 | this._cleanup();
|
39 | });
|
40 |
|
41 | this.on('error', function(err) {
|
42 | self.state = STATES.DISCONNECTED;
|
43 | self._setRegistryStatus('failed', err);
|
44 | this._cleanup();
|
45 | });
|
46 |
|
47 | this.on('connected', function() {
|
48 | self.state = STATES.CONNECTED;
|
49 | self._setRegistryStatus('connected');
|
50 | });
|
51 |
|
52 | this.init(ws);
|
53 | };
|
54 | util.inherits(PeerSocket, EventEmitter);
|
55 |
|
56 | Object.keys(STATES).forEach(function(k) {
|
57 | module.exports[k] = STATES[k];
|
58 | });
|
59 |
|
60 | PeerSocket.prototype.properties = function() {
|
61 | return {
|
62 | id: this.name,
|
63 | connectionId: this.connectionId
|
64 | };
|
65 | };
|
66 |
|
67 | PeerSocket.prototype.close = function() {
|
68 | clearInterval(this._pingTimer);
|
69 | this.ws.close();
|
70 | };
|
71 |
|
72 | PeerSocket.prototype._cleanup = function() {
|
73 | if (!this.agent) {
|
74 | return;
|
75 | }
|
76 |
|
77 | var streams = this.agent._spdyState.connection._spdyState.streams;
|
78 | Object.keys(streams).forEach(function(k) {
|
79 | streams[k].destroy();
|
80 | });
|
81 |
|
82 | this.agent.close();
|
83 | };
|
84 |
|
85 | PeerSocket.prototype.init = function(ws) {
|
86 | var self = this;
|
87 | self.emit('connecting');
|
88 |
|
89 | if (ws) {
|
90 | this._initWs(ws);
|
91 | }
|
92 |
|
93 |
|
94 | setImmediate(function() {
|
95 |
|
96 | self._setupConnection(function(err) {
|
97 | if (err) {
|
98 | self.close();
|
99 | self.emit('error', err);
|
100 | return;
|
101 | }
|
102 |
|
103 | if (self.ws.readyState !== ws.OPEN) {
|
104 |
|
105 | self.close();
|
106 | self.emit('error', new Error('Peer Socket: Setup connection finished but ws not opened for peer "' + self.name + '".'));
|
107 | return;
|
108 | }
|
109 |
|
110 | var subscriptions = self.subscriptions;
|
111 | self.subscriptions = {};
|
112 |
|
113 | Object.keys(subscriptions).forEach(function(event) {
|
114 | self.subscribe(event);
|
115 | });
|
116 |
|
117 | self._startPingTimer();
|
118 | self.emit('connected');
|
119 | });
|
120 | });
|
121 | };
|
122 |
|
123 | PeerSocket.prototype._setupConnection = function(cb, tries) {
|
124 | var self = this;
|
125 | var peerItem = {
|
126 | direction: 'acceptor',
|
127 | id: self.name,
|
128 | status: 'connecting'
|
129 | };
|
130 |
|
131 | self.peerRegistry.add(peerItem, function(err, newPeer) {
|
132 | if (err) {
|
133 | return cb(err);
|
134 | }
|
135 |
|
136 |
|
137 | self.confirmConnection(self.connectionId, cb);
|
138 | });
|
139 | };
|
140 |
|
141 | PeerSocket.prototype._initWs = function(ws) {
|
142 | var self = this;
|
143 | var u = url.parse(ws.upgradeReq.url, true);
|
144 | this.ws = ws;
|
145 | this.connectionId = u.query.connectionId;
|
146 | this.ws._socket.removeAllListeners('data');
|
147 |
|
148 | this.ws._socket.on('end', function() {
|
149 | clearInterval(self._pingTimer);
|
150 | self.emit('end');
|
151 | });
|
152 |
|
153 | this.ws.on('error', function(err) {
|
154 | clearInterval(self._pingTimer);
|
155 | self.emit('error', err);
|
156 | });
|
157 |
|
158 |
|
159 | this.agent = spdy.createAgent(SpdyAgent, {
|
160 | host: this.name,
|
161 | port: 80,
|
162 | socket: this.ws._socket,
|
163 | spdy: {
|
164 | plain: true,
|
165 | ssl: false
|
166 | }
|
167 | });
|
168 |
|
169 |
|
170 | this.agent.maxSockets = 150;
|
171 | this.agent.on('push', this.onPushData.bind(this));
|
172 | this.agent.on('error', function(err) {
|
173 | self.close();
|
174 | self.emit('error', err);
|
175 | });
|
176 | };
|
177 |
|
178 | PeerSocket.prototype._startPingTimer = function() {
|
179 | var self = this;
|
180 | clearInterval(this._pingTimer);
|
181 | this._pingTimer = setInterval(function() {
|
182 | var timeout = setTimeout(function() {
|
183 | self.close();
|
184 | self.emit('error', new Error('Peer socket timed out'));
|
185 | }, self._pingTimeout)
|
186 |
|
187 | self.agent.ping(function(err) {
|
188 | if (timeout) {
|
189 | clearTimeout(timeout);
|
190 | }
|
191 | });
|
192 | }, self._pingTimeout);
|
193 |
|
194 | };
|
195 |
|
196 | PeerSocket.prototype._setRegistryStatus = function(status, err, cb) {
|
197 | var self = this;
|
198 |
|
199 | if (typeof err === 'function') {
|
200 | cb = err;
|
201 | err = undefined;
|
202 | }
|
203 |
|
204 | if (!cb) {
|
205 | cb = function(){};
|
206 | }
|
207 |
|
208 | this.peerRegistry.get(this.name, function(err, peer) {
|
209 | if (err) {
|
210 | return cb(err);
|
211 | }
|
212 |
|
213 | peer.status = status;
|
214 | peer.connectionId = self.connectionId;
|
215 | if (err) {
|
216 | peer.error = err;
|
217 | }
|
218 | self.peerRegistry.save(peer, cb);
|
219 | });
|
220 | };
|
221 |
|
222 | PeerSocket.prototype.onPushData = function(stream) {
|
223 | var streamUrl = stream.url.slice(1);
|
224 | var self = this;
|
225 |
|
226 | if(!this.subscriptions[streamUrl]) {
|
227 | stream.connection.end();
|
228 | }
|
229 |
|
230 | var encoding = stream.headers['x-event-encoding'] || 'json';
|
231 | var length = Number(stream.headers['content-length']);
|
232 | var data = new Buffer(length);
|
233 | var idx = 0;
|
234 | var d = null;
|
235 | stream.on('readable', function() {
|
236 | while (d = stream.read()) {
|
237 | for (var i=0; i<d.length;i++) {
|
238 | data[idx++] = d[i];
|
239 | }
|
240 | };
|
241 | });
|
242 |
|
243 | stream.on('error', function(err) {
|
244 | console.error('error on push:', err);
|
245 | });
|
246 |
|
247 | stream.on('end', function() {
|
248 | var body = null;
|
249 | if (encoding === 'json') {
|
250 | try {
|
251 | body = JSON.parse(data.toString());
|
252 | } catch (err) {
|
253 | console.error('PeerSocket push data json parse error', err);
|
254 | }
|
255 | } else if(encoding === 'buffer') {
|
256 | body = data;
|
257 | }
|
258 | self.emit(streamUrl, body);
|
259 | stream.connection.close();
|
260 | });
|
261 | };
|
262 |
|
263 | PeerSocket.prototype.subscribe = function(event, cb) {
|
264 | if(!cb) {
|
265 | cb = function() {};
|
266 | }
|
267 |
|
268 | var queryPrefix = 'query%2F';
|
269 | if (event && event.slice(0, queryPrefix.length) === queryPrefix) {
|
270 | event = decodeURIComponent(event);
|
271 | }
|
272 |
|
273 |
|
274 | if (this.subscriptions[event] === undefined) {
|
275 | this.subscriptions[event] = 0;
|
276 | }
|
277 | this.subscriptions[event]++;
|
278 |
|
279 |
|
280 | if (this.subscriptions[event] > 1) {
|
281 | cb();
|
282 | return;
|
283 | }
|
284 |
|
285 | var host;
|
286 | if(this.ws && this.ws.upgradeReq) {
|
287 | host = this.ws.upgradeReq.headers.host
|
288 | } else {
|
289 | host = 'fog.argo.cx';
|
290 | }
|
291 |
|
292 | var opts = {
|
293 | method: 'GET',
|
294 | headers: {
|
295 | 'Content-Type': 'application/x-www-form-urlencoded',
|
296 | 'Host': host
|
297 | },
|
298 | path: '/servers/' + encodeURIComponent(this.name)
|
299 | + '/events?topic=' + encodeURIComponent(event),
|
300 | agent: this.agent
|
301 | };
|
302 |
|
303 | var req = http.request(opts, function(res) {
|
304 | cb();
|
305 | }).on('error', cb);
|
306 | req.end();
|
307 | };
|
308 |
|
309 | PeerSocket.prototype.unsubscribe = function(event, cb) {
|
310 | if(!cb) {
|
311 | cb = function() {};
|
312 | }
|
313 |
|
314 | if (this.subscriptions[event] === undefined) {
|
315 | this.subscriptions[event] = 0;
|
316 | } else {
|
317 | this.subscriptions[event]--;
|
318 | if (this.subscriptions[event] < 0) {
|
319 | this.subscriptions[event] = 0;
|
320 | }
|
321 | }
|
322 |
|
323 |
|
324 | if (this.subscriptions[event] > 0) {
|
325 | return cb();
|
326 | }
|
327 |
|
328 | var host;
|
329 | if(this.ws && this.ws.upgradeReq) {
|
330 | host = this.ws.upgradeReq.headers.host
|
331 | } else {
|
332 | host = 'fog.argo.cx';
|
333 | }
|
334 |
|
335 | var body = new Buffer('topic='+event);
|
336 | var opts = {
|
337 | method: 'POST',
|
338 | headers: {
|
339 | 'Content-Type': 'application/x-www-form-urlencoded',
|
340 | 'Host': host,
|
341 | 'Content-Length': body.length
|
342 | },
|
343 | path: '/servers/' + encodeURIComponent(this.name) + '/events/unsubscribe',
|
344 | agent: this.agent
|
345 | };
|
346 |
|
347 | var req = http.request(opts, function(res) {
|
348 | cb();
|
349 | }).on('error', cb);
|
350 | req.end(body);
|
351 | };
|
352 |
|
353 | PeerSocket.prototype.confirmConnection = function(connectionId, callback) {
|
354 | var timeout = setTimeout(function() {
|
355 | req.abort();
|
356 | callback(new Error('Confirm connection timeout reached.'));
|
357 | }, this._pingTimeout);
|
358 |
|
359 | var opts = { agent: this.agent, path: '/_initiate_peer/' + connectionId };
|
360 | var req = http.get(opts, function(res) {
|
361 | clearTimeout(timeout);
|
362 | if (res.statusCode !== 200) {
|
363 | return callback(new Error('Unexpected status code'));
|
364 | }
|
365 | callback();
|
366 | }).on('error', function(err) {
|
367 | clearTimeout(timeout);
|
368 | callback(err);
|
369 | });
|
370 | };
|
371 |
|
372 | PeerSocket.prototype.transition = function(action, args, cb) {
|
373 | var u = url.parse(action.href);
|
374 | var path = u.pathname;
|
375 |
|
376 | var body = new Buffer(querystring.stringify(args));
|
377 |
|
378 | var host;
|
379 | if(this.ws && this.ws.upgradeReq) {
|
380 | host = this.ws.upgradeReq.headers.host
|
381 | } else {
|
382 | host = 'fog.argo.cx';
|
383 | }
|
384 |
|
385 | var opts = {
|
386 | agent: this.agent,
|
387 | path: path,
|
388 | method: action.method,
|
389 | headers: {
|
390 | 'Content-Type': 'application/x-www-form-urlencoded',
|
391 | 'Host': host,
|
392 | 'Content-Length': body.length,
|
393 | }
|
394 | };
|
395 |
|
396 | var req = http.request(opts, function(res) {
|
397 | var buffer = [];
|
398 | var len = 0;
|
399 | res.on('readable', function() {
|
400 | var data;
|
401 | while (data = res.read()) {
|
402 | buffer.push(data);
|
403 | len += data.length;
|
404 | }
|
405 | });
|
406 |
|
407 | res.on('end', function() {
|
408 | var buf = Buffer.concat(buffer, len);
|
409 | if (res.statusCode !== 200) {
|
410 | return cb(new Error(buf.toString()));
|
411 | }
|
412 |
|
413 | var jsonBody = null;
|
414 | try {
|
415 | jsonBody = JSON.parse(buf.toString());
|
416 | } catch(err) {
|
417 | return cb(new Error('Failed to parse body'));
|
418 | }
|
419 | return cb(null, jsonBody);
|
420 | });
|
421 | }).on('error', cb);
|
422 | req.end(body);
|
423 | };
|
424 |
|