UNPKG

5.69 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter;
2var path = require('path');
3var util = require('util');
4var uuid = require('node-uuid');
5var spdy = require('spdy');
6var Logger = require('./logger');
7var WebSocket = require('./web_socket');
8
9// monkey patch spdy connection to get access to ping event
10var originalPingHandler = spdy.Connection.prototype._handlePing;
11spdy.Connection.prototype._handlePing = function() {
12 this.socket.emit('spdyPing', this);
13 originalPingHandler.apply(this, arguments);
14};
15
16function calculatePeerUrl(url, name){
17 var wsUrl = url.replace(/^http/, 'ws');
18 var peerPath = '/peers/' + name;
19 if(wsUrl.indexOf('/', wsUrl.length - 1) === -1) {
20 wsUrl = wsUrl + peerPath;
21 } else {
22 wsUrl = wsUrl.slice(0, wsUrl.length - 1) + peerPath;
23 }
24 return wsUrl;
25}
26
27var PeerClient = module.exports = function(url, server) {
28 this.reconnect = {
29 min: 100,
30 max: 30000, // max amount of time allowed to backoff
31 maxRandomOffset: 1000, // max amount of time
32 };
33
34 // 3x the interval the peer pings down to the client
35 this.pingTimeout = 30000;
36
37 this.server = server.httpServer.spdyServer;
38 this.connected = false;
39 this.retryCount = 0;
40 this.log = server.log || new Logger();
41 this._backoffTimer = null;
42 this._stopped = false;
43
44 // keep a copy of zetta server for calculating it's peer name
45 this._zetta = server;
46
47 this.updateURL(url);
48
49 // create a unique connection id peer connection, used to associate initiaion request
50 this.connectionId = null;
51 this.ws = new WebSocket(this._createNewUrl(), {});
52
53 EventEmitter.call(this);
54};
55util.inherits(PeerClient, EventEmitter);
56
57PeerClient.calculatePeerUrl = calculatePeerUrl;
58
59PeerClient.prototype.updateURL = function(httpUrl) {
60 var wsUrl = calculatePeerUrl(httpUrl, this._zetta._name);
61 this.url = wsUrl;
62};
63
64PeerClient.prototype._createNewUrl = function() {
65 this.connectionId = uuid.v4();
66 return this.url + '?connectionId=' + this.connectionId;
67};
68
69PeerClient.prototype.properties = function() {
70 return {
71 url: this.url,
72 connectionId: this.connectionId,
73 };
74};
75
76PeerClient.prototype.start = function() {
77 this._stopped = false; // If previously closed, reset stopped flag
78 this._createSocket();
79};
80
81// Close and stop reconnecting
82PeerClient.prototype.close = function() {
83 clearTimeout(this._backoffTimer);
84 this._stopped = true;
85 this.ws.close();
86 this._stopPingTimeout();
87};
88
89PeerClient.prototype._resetPingTimeout = function() {
90 var self = this;
91 clearTimeout(this._pingTimer);
92 this._pingTimer = setTimeout(function() {
93 if (self.ws) {
94 self.log.emit('warn', 'peer-client', 'Communication to peer timed out. Reconnecting (' + self.url + ')');
95 self.emit('timeout');
96 self.ws.close();
97 }
98 }, this.pingTimeout);
99};
100
101PeerClient.prototype._stopPingTimeout = function() {
102 clearTimeout(this._pingTimer);
103};
104
105PeerClient.prototype._createSocket = function() {
106 var self = this;
107
108 if (this.backoffTimer) {
109 clearTimeout(this.backoffTimer);
110 }
111
112 // once peer is closed dont create new socket
113 if (this._stopped) {
114 return;
115 }
116
117 // stop ping timer until backoff finishes
118 self._stopPingTimeout();
119
120 var backoff = this.generateBackoff(this.retryCount);
121 this._backoffTimer = setTimeout(function(){
122
123 // start ping timer
124 self._resetPingTimeout();
125
126 // create a new connection id
127 self.ws.setAddress(self._createNewUrl());
128 if (self.retryCount === 0) {
129 self.ws.on('open', function onOpen(socket) {
130 self.checkServerReq();
131 self.emit('connecting');
132 self.server.emit('connection', socket);
133 socket.on('spdyPing', function(connection) {
134 // reset ping timer on a spdy ping from the peer
135 self._resetPingTimeout();
136 });
137 self.log.emit('log', 'peer-client', 'WebSocket to peer established (' + self.url + ')');
138 });
139
140 self.ws.on('error', function onError(err) {
141 self.connected = false;
142 self.log.emit('log', 'peer-client', 'Peer connection error (' + self.url + '): ' + err);
143 self.emit('error', err);
144 reconnect(err);
145 });
146
147 self.ws.on('close', function(code, message) {
148 //if (self.retryCount > 0) throw new Error('wtf');
149 self.connected = false;
150 self.log.emit('log', 'peer-client', 'Peer connection closed (' + self.url + '): ' + code + ' - ' + message);
151 self.emit('closed');
152 reconnect();
153 });
154 }
155
156 self.ws.start();
157 }, backoff);
158
159 function reconnect(err) {
160 self.retryCount++;
161 self._createSocket();
162 }
163};
164
165PeerClient.prototype.checkServerReq = function() {
166 var self = this;
167
168 // remove any previous request listeners
169 if (self.onRequest) {
170 this.server.removeListener('request', self.onRequest);
171 }
172
173 // /_initiate_peer/{connection-id}
174 this.onRequest = function(req, res) {
175 if (req.url === '/_initiate_peer/' + self.connectionId) {
176 self.connected = true;
177 self.retryCount = 0;
178 self.emit('connected');
179 self.log.emit('log', 'peer-client', 'Peer connection established (' + self.url + ')');
180
181 res.statusCode = 200;
182 res.end();
183
184 // remove request listener
185 self.server.removeListener('request', self.onRequest);
186
187 // set up exchange of reactive queries.
188 }
189 };
190
191 this.server.on('request', this.onRequest);
192};
193
194PeerClient.prototype.generateBackoff = function(attempt) {
195 if (attempt === 0) {
196 return 0;
197 }
198
199 var random = parseInt(Math.random() * this.reconnect.maxRandomOffset);
200 var backoff = (Math.pow(2, attempt) * this.reconnect.min);
201 if (backoff > this.reconnect.max) {
202 return this.reconnect.max + random;
203 } else {
204 return backoff + random;
205 }
206};