1 | var EventEmitter = require('events').EventEmitter;
|
2 | var util = require('util');
|
3 | var http = require('http');
|
4 | var url = require('url');
|
5 | var querystring = require('querystring');
|
6 | var spdy = require('spdy');
|
7 | var ws = require('ws');
|
8 | var SpdyAgent = require('./spdy_agent');
|
9 | var Logger = require('./logger');
|
10 |
|
11 | var STATES = {
|
12 | 'DISCONNECTED' : 0,
|
13 | 'CONNECTING': 1,
|
14 | 'CONNECTED': 2
|
15 | };
|
16 |
|
17 | var PeerSocket = module.exports = function(ws, name, peerRegistry) {
|
18 | EventEmitter.call(this);
|
19 |
|
20 | var self = this;
|
21 | this.state = STATES.DISCONNECTED;
|
22 | this.name = name;
|
23 | this.agent = null;
|
24 | this.subscriptions = {};
|
25 | this.connectionId = null;
|
26 | this._pingTimer = null;
|
27 | this._pingTimeout = 10 * 1000;
|
28 | this.peerRegistry = peerRegistry;
|
29 | this.logger = new Logger();
|
30 |
|
31 | this.on('connecting', function() {
|
32 | self.state = STATES.CONNECTING;
|
33 | });
|
34 |
|
35 | this.on('end', function() {
|
36 | self.state = STATES.DISCONNECTED;
|
37 | self._setRegistryStatus('disconnected');
|
38 | this._cleanup();
|
39 | });
|
40 |
|
41 | this.on('error', function(err) {
|
42 | self.state = STATES.DISCONNECTED;
|
43 | self._setRegistryStatus('failed', err);
|
44 | this._cleanup();
|
45 | });
|
46 |
|
47 | this.on('connected', function() {
|
48 | self.state = STATES.CONNECTED;
|
49 | self._setRegistryStatus('connected');
|
50 | });
|
51 |
|
52 | this.init(ws);
|
53 | };
|
54 | util.inherits(PeerSocket, EventEmitter);
|
55 |
|
56 | Object.keys(STATES).forEach(function(k) {
|
57 | module.exports[k] = STATES[k];
|
58 | });
|
59 |
|
60 | PeerSocket.prototype.properties = function() {
|
61 | return {
|
62 | id: this.name,
|
63 | connectionId: this.connectionId
|
64 | };
|
65 | };
|
66 |
|
67 | PeerSocket.prototype.close = function() {
|
68 | clearInterval(this._pingTimer);
|
69 | this.ws.close();
|
70 | };
|
71 |
|
72 | PeerSocket.prototype._cleanup = function() {
|
73 | if (!this.agent) {
|
74 | return;
|
75 | }
|
76 |
|
77 | var streams = this.agent._spdyState.connection._spdyState.streams;
|
78 | Object.keys(streams).forEach(function(k) {
|
79 | streams[k].destroy();
|
80 | });
|
81 |
|
82 | this.agent.close();
|
83 | };
|
84 |
|
85 | PeerSocket.prototype.init = function(ws) {
|
86 | var self = this;
|
87 | self.emit('connecting');
|
88 |
|
89 | if (ws) {
|
90 | this._initWs(ws);
|
91 | }
|
92 |
|
93 |
|
94 | setImmediate(function() {
|
95 |
|
96 | self._setupConnection(function(err) {
|
97 | if (err) {
|
98 | self.close();
|
99 | self.emit('error', err);
|
100 | return;
|
101 | }
|
102 |
|
103 | if (self.ws.readyState !== ws.OPEN) {
|
104 |
|
105 | self.close();
|
106 | self.emit('error', new Error('Peer Socket: Setup connection finished but ws not opened for peer "' + self.name + '".'));
|
107 | return;
|
108 | }
|
109 |
|
110 | var subscriptions = self.subscriptions;
|
111 | self.subscriptions = {};
|
112 |
|
113 | Object.keys(subscriptions).forEach(function(event) {
|
114 | self.subscribe(event);
|
115 | });
|
116 |
|
117 | self._startPingTimer();
|
118 | self.emit('connected');
|
119 | });
|
120 | });
|
121 | };
|
122 |
|
123 | PeerSocket.prototype._setupConnection = function(cb, tries) {
|
124 | var self = this;
|
125 | var peerItem = {
|
126 | direction: 'acceptor',
|
127 | id: self.name,
|
128 | status: 'connecting'
|
129 | };
|
130 |
|
131 | self.peerRegistry.add(peerItem, function(err, newPeer) {
|
132 | if (err) {
|
133 | return cb(err);
|
134 | }
|
135 |
|
136 |
|
137 | self.confirmConnection(self.connectionId, cb);
|
138 | });
|
139 | };
|
140 |
|
141 | PeerSocket.prototype._initWs = function(ws) {
|
142 | var self = this;
|
143 | var u = url.parse(ws.upgradeReq.url, true);
|
144 | this.ws = ws;
|
145 | this.connectionId = u.query.connectionId;
|
146 | this.ws._socket.removeAllListeners('data');
|
147 |
|
148 | this.ws._socket.on('end', function() {
|
149 | clearInterval(self._pingTimer);
|
150 | self.emit('end');
|
151 | });
|
152 |
|
153 | this.ws.on('error', function(err) {
|
154 | clearInterval(self._pingTimer);
|
155 | self.emit('error', err);
|
156 | });
|
157 |
|
158 |
|
159 | this.agent = spdy.createAgent(SpdyAgent, {
|
160 | host: this.name,
|
161 | port: 80,
|
162 | socket: this.ws._socket,
|
163 | spdy: {
|
164 | plain: true,
|
165 | ssl: false
|
166 | }
|
167 | });
|
168 |
|
169 |
|
170 | this.agent.maxSockets = 150;
|
171 | this.agent.on('push', this.onPushData.bind(this));
|
172 | this.agent.on('error', function(err) {
|
173 | self.close();
|
174 | self.emit('error', err);
|
175 | });
|
176 | };
|
177 |
|
178 | PeerSocket.prototype._startPingTimer = function() {
|
179 | var self = this;
|
180 | clearInterval(this._pingTimer);
|
181 | this._pingTimer = setInterval(function() {
|
182 | var timeout = setTimeout(function() {
|
183 | self.close();
|
184 | self.emit('error', new Error('Peer socket timed out'));
|
185 | }, self._pingTimeout)
|
186 |
|
187 | self.agent.ping(function(err) {
|
188 | if (timeout) {
|
189 | clearTimeout(timeout);
|
190 | }
|
191 | });
|
192 | }, self._pingTimeout);
|
193 |
|
194 | };
|
195 |
|
196 | PeerSocket.prototype._setRegistryStatus = function(status, err, cb) {
|
197 | var self = this;
|
198 |
|
199 | if (typeof err === 'function') {
|
200 | cb = err;
|
201 | err = undefined;
|
202 | }
|
203 |
|
204 | if (!cb) {
|
205 | cb = function(){};
|
206 | }
|
207 |
|
208 | this.peerRegistry.get(this.name, function(err, peer) {
|
209 | if (err) {
|
210 | return cb(err);
|
211 | }
|
212 |
|
213 | peer.status = status;
|
214 | peer.connectionId = self.connectionId;
|
215 | if (err) {
|
216 | peer.error = err;
|
217 | }
|
218 | self.peerRegistry.save(peer, cb);
|
219 | });
|
220 | };
|
221 |
|
222 | PeerSocket.prototype.onPushData = function(stream) {
|
223 | var streamUrl = stream.url.slice(1);
|
224 | var self = this;
|
225 |
|
226 | if(!this.subscriptions[streamUrl]) {
|
227 | stream.connection.end();
|
228 | }
|
229 |
|
230 | var encoding = stream.headers['content-type'] || 'application/json';
|
231 |
|
232 | if (encoding.indexOf(';') !== -1) {
|
233 | encoding = encoding.split(';')[0].trim();
|
234 | }
|
235 | var length = Number(stream.headers['content-length']);
|
236 | var data = new Buffer(length);
|
237 | var idx = 0;
|
238 | var d = null;
|
239 | stream.on('readable', function() {
|
240 | while (d = stream.read()) {
|
241 | for (var i=0; i<d.length;i++) {
|
242 | data[idx++] = d[i];
|
243 | }
|
244 | };
|
245 | });
|
246 |
|
247 | stream.on('error', function(err) {
|
248 | console.error('error on push:', err);
|
249 | });
|
250 |
|
251 | stream.on('end', function() {
|
252 | var body = null;
|
253 | if (encoding === 'application/json') {
|
254 | try {
|
255 | body = JSON.parse(data.toString());
|
256 | } catch (err) {
|
257 | console.error('PeerSocket push data json parse error', err);
|
258 | }
|
259 | } else if(encoding === 'application/octet-stream') {
|
260 | body = data;
|
261 | }
|
262 |
|
263 | self.emit(streamUrl, body);
|
264 | self.emit('zetta-events', streamUrl, body)
|
265 | stream.connection.close();
|
266 | });
|
267 | };
|
268 |
|
269 | PeerSocket.prototype.subscribe = function(event, cb) {
|
270 | if(!cb) {
|
271 | cb = function() {};
|
272 | }
|
273 |
|
274 | var queryPrefix = 'query%2F';
|
275 | if (event && event.slice(0, queryPrefix.length) === queryPrefix) {
|
276 | event = decodeURIComponent(event);
|
277 | }
|
278 |
|
279 |
|
280 | if (this.subscriptions[event] === undefined) {
|
281 | this.subscriptions[event] = 0;
|
282 | }
|
283 | this.subscriptions[event]++;
|
284 |
|
285 |
|
286 | if (this.subscriptions[event] > 1) {
|
287 | cb();
|
288 | return;
|
289 | }
|
290 |
|
291 | var host;
|
292 | if(this.ws && this.ws.upgradeReq) {
|
293 | host = this.ws.upgradeReq.headers.host
|
294 | } else {
|
295 | host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
|
296 | }
|
297 |
|
298 | var opts = {
|
299 | method: 'GET',
|
300 | headers: {
|
301 | 'Content-Type': 'application/x-www-form-urlencoded',
|
302 | 'Host': host
|
303 | },
|
304 | path: '/servers/' + encodeURIComponent(this.name)
|
305 | + '/events?topic=' + encodeURIComponent(event),
|
306 | agent: this.agent
|
307 | };
|
308 |
|
309 | var req = http.request(opts, function(res) {
|
310 | cb();
|
311 | }).on('error', cb);
|
312 | req.end();
|
313 | };
|
314 |
|
315 | PeerSocket.prototype.unsubscribe = function(event, cb) {
|
316 | if(!cb) {
|
317 | cb = function() {};
|
318 | }
|
319 |
|
320 | if (this.subscriptions[event] === undefined) {
|
321 | this.subscriptions[event] = 0;
|
322 | } else {
|
323 | this.subscriptions[event]--;
|
324 | if (this.subscriptions[event] < 0) {
|
325 | this.subscriptions[event] = 0;
|
326 | }
|
327 | }
|
328 |
|
329 |
|
330 | if (this.subscriptions[event] > 0) {
|
331 | return cb();
|
332 | }
|
333 |
|
334 | var host;
|
335 | if(this.ws && this.ws.upgradeReq) {
|
336 | host = this.ws.upgradeReq.headers.host
|
337 | } else {
|
338 | host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
|
339 | }
|
340 |
|
341 | var body = new Buffer('topic='+event);
|
342 | var opts = {
|
343 | method: 'POST',
|
344 | headers: {
|
345 | 'Content-Type': 'application/x-www-form-urlencoded',
|
346 | 'Host': host,
|
347 | 'Content-Length': body.length
|
348 | },
|
349 | path: '/servers/' + encodeURIComponent(this.name) + '/events/unsubscribe',
|
350 | agent: this.agent
|
351 | };
|
352 |
|
353 | var req = http.request(opts, function(res) {
|
354 | cb();
|
355 | }).on('error', cb);
|
356 | req.end(body);
|
357 | };
|
358 |
|
359 | PeerSocket.prototype.confirmConnection = function(connectionId, callback) {
|
360 | var timeout = setTimeout(function() {
|
361 | req.abort();
|
362 | callback(new Error('Confirm connection timeout reached.'));
|
363 | }, this._pingTimeout);
|
364 |
|
365 | var opts = { agent: this.agent, path: '/_initiate_peer/' + connectionId };
|
366 | var req = http.get(opts, function(res) {
|
367 | clearTimeout(timeout);
|
368 | if (res.statusCode !== 200) {
|
369 | return callback(new Error('Unexpected status code'));
|
370 | }
|
371 | callback();
|
372 | }).on('error', function(err) {
|
373 | clearTimeout(timeout);
|
374 | callback(err);
|
375 | });
|
376 | };
|
377 |
|
378 | PeerSocket.prototype.transition = function(action, args, cb) {
|
379 | var u = url.parse(action.href);
|
380 | var path = u.pathname;
|
381 |
|
382 | var body = new Buffer(querystring.stringify(args));
|
383 |
|
384 | var host;
|
385 | if(this.ws && this.ws.upgradeReq) {
|
386 | host = this.ws.upgradeReq.headers.host
|
387 | } else {
|
388 | host = encodeURIComponent(this.name) + '.unreachable.zettajs.io';
|
389 | }
|
390 |
|
391 | var opts = {
|
392 | agent: this.agent,
|
393 | path: path,
|
394 | method: action.method,
|
395 | headers: {
|
396 | 'Content-Type': 'application/x-www-form-urlencoded',
|
397 | 'Host': host,
|
398 | 'Content-Length': body.length,
|
399 | }
|
400 | };
|
401 |
|
402 | var req = http.request(opts, function(res) {
|
403 | var buffer = [];
|
404 | var len = 0;
|
405 | res.on('readable', function() {
|
406 | var data;
|
407 | while (data = res.read()) {
|
408 | buffer.push(data);
|
409 | len += data.length;
|
410 | }
|
411 | });
|
412 |
|
413 | res.on('end', function() {
|
414 | var buf = Buffer.concat(buffer, len);
|
415 | if (res.statusCode !== 200) {
|
416 | return cb(new Error(buf.toString()));
|
417 | }
|
418 |
|
419 | var jsonBody = null;
|
420 | try {
|
421 | jsonBody = JSON.parse(buf.toString());
|
422 | } catch(err) {
|
423 | return cb(new Error('Failed to parse body'));
|
424 | }
|
425 | return cb(null, jsonBody);
|
426 | });
|
427 | }).on('error', cb);
|
428 | req.end(body);
|
429 | };
|
430 |
|