UNPKG

5.42 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter;
2var path = require('path');
3var util = require('util');
4var uuid = require('uuid');
5var spdy = require('spdy');
6var Logger = require('./logger');
7var WebSocket = require('./web_socket');
8
9function calculatePeerUrl(url, name){
10 var wsUrl = url.replace(/^http/, 'ws');
11 var peerPath = '/peers/' + name;
12 if(wsUrl.indexOf('/', wsUrl.length - 1) === -1) {
13 wsUrl = wsUrl + peerPath;
14 } else {
15 wsUrl = wsUrl.slice(0, wsUrl.length - 1) + peerPath;
16 }
17 return wsUrl;
18}
19
20var PeerClient = module.exports = function(url, server) {
21 this.reconnect = {
22 min: 100,
23 max: 30000, // max amount of time allowed to backoff
24 maxRandomOffset: 1000, // max amount of time
25 };
26
27 // 3x the interval the peer pings down to the client
28 this.pingTimeout = 30000;
29
30 this.server = server.httpServer.spdyServer;
31 this.connected = false;
32 this.retryCount = 0;
33 this.log = server.log || new Logger();
34 this._backoffTimer = null;
35 this._stopped = false;
36
37 // keep a copy of zetta server for calculating it's peer name
38 this._zetta = server;
39
40 this.updateURL(url);
41
42 // create a unique connection id peer connection, used to associate initiaion request
43 this.connectionId = null;
44 this.ws = new WebSocket(this._createNewUrl(), {});
45
46 EventEmitter.call(this);
47};
48util.inherits(PeerClient, EventEmitter);
49
50PeerClient.calculatePeerUrl = calculatePeerUrl;
51
52PeerClient.prototype.updateURL = function(httpUrl) {
53 var wsUrl = calculatePeerUrl(httpUrl, this._zetta._name);
54 this.url = wsUrl;
55};
56
57PeerClient.prototype._createNewUrl = function() {
58 this.connectionId = uuid.v4();
59 return this.url + '?connectionId=' + this.connectionId;
60};
61
62PeerClient.prototype.properties = function() {
63 return {
64 url: this.url,
65 connectionId: this.connectionId,
66 };
67};
68
69PeerClient.prototype.start = function() {
70 this._stopped = false; // If previously closed, reset stopped flag
71 this._createSocket();
72};
73
74// Close and stop reconnecting
75PeerClient.prototype.close = function() {
76 clearTimeout(this._backoffTimer);
77 this._stopped = true;
78 this.ws.close();
79 this._stopPingTimeout();
80};
81
82PeerClient.prototype._resetPingTimeout = function() {
83 var self = this;
84 clearTimeout(this._pingTimer);
85 this._pingTimer = setTimeout(function() {
86 if (self.ws) {
87 self.log.emit('warn', 'peer-client', 'Communication to peer timed out. Reconnecting (' + self.url + ')');
88 self.emit('timeout');
89 self.ws.close();
90 }
91 }, this.pingTimeout);
92};
93
94PeerClient.prototype._stopPingTimeout = function() {
95 clearTimeout(this._pingTimer);
96};
97
98PeerClient.prototype._createSocket = function() {
99 var self = this;
100
101 if (this.backoffTimer) {
102 clearTimeout(this.backoffTimer);
103 }
104
105 // once peer is closed dont create new socket
106 if (this._stopped) {
107 return;
108 }
109
110 // stop ping timer until backoff finishes
111 self._stopPingTimeout();
112
113 var backoff = this.generateBackoff(this.retryCount);
114 this._backoffTimer = setTimeout(function(){
115
116 // start ping timer
117 self._resetPingTimeout();
118
119 // create a new connection id
120 self.ws.setAddress(self._createNewUrl());
121 if (self.retryCount === 0) {
122 self.ws.on('open', function onOpen(socket) {
123 self.checkServerReq();
124 self.emit('connecting');
125 self.server.emit('connection', socket);
126
127 socket.on('spdyPing', function() {
128 // reset ping timer on a spdy ping from the peer
129 self._resetPingTimeout();
130 });
131
132 self.log.emit('log', 'peer-client', 'WebSocket to peer established (' + self.url + ')');
133 });
134
135 self.ws.on('error', function onError(err) {
136 self.connected = false;
137 self.log.emit('log', 'peer-client', 'Peer connection error (' + self.url + '): ' + err);
138 self.emit('error', err);
139 reconnect(err);
140 });
141
142 self.ws.on('close', function(code, message) {
143 //if (self.retryCount > 0) throw new Error('wtf');
144 self.connected = false;
145 self.log.emit('log', 'peer-client', 'Peer connection closed (' + self.url + '): ' + code + ' - ' + message);
146 self.emit('closed');
147 reconnect();
148 });
149 }
150
151 self.ws.start();
152 }, backoff);
153
154 function reconnect(err) {
155 self.retryCount++;
156 self._createSocket();
157 }
158};
159
160PeerClient.prototype.checkServerReq = function() {
161 var self = this;
162
163 // remove any previous request listeners
164 if (self.onRequest) {
165 this.server.removeListener('request', self.onRequest);
166 }
167
168 // /_initiate_peer/{connection-id}
169 this.onRequest = function(req, res) {
170 if (req.url === '/_initiate_peer/' + self.connectionId) {
171 self.connected = true;
172 self.retryCount = 0;
173 self.emit('connected');
174 self.log.emit('log', 'peer-client', 'Peer connection established (' + self.url + ')');
175
176 res.statusCode = 200;
177 res.end();
178
179 // remove request listener
180 self.server.removeListener('request', self.onRequest);
181
182 // set up exchange of reactive queries.
183 }
184 };
185
186 this.server.on('request', this.onRequest);
187};
188
189PeerClient.prototype.generateBackoff = function(attempt) {
190 if (attempt === 0) {
191 return 0;
192 }
193
194 var random = parseInt(Math.random() * this.reconnect.maxRandomOffset);
195 var backoff = (Math.pow(2, attempt) * this.reconnect.min);
196 if (backoff > this.reconnect.max) {
197 return this.reconnect.max + random;
198 } else {
199 return backoff + random;
200 }
201};