UNPKG

8.75 kBJavaScriptView Raw
1// WebSocket with auto reconnecting feature, backup endpoint and EventEmitter interface.
2
3import d from 'debug';
4import EventEmitter from 'eventemitter3';
5import StateMachine from 'javascript-state-machine';
6
7import WebSocket from 'ws';
8
9import { ensureArray, tryAll, global } from './utils';
10
11const debug = d('LC:WebSocketPlus');
12
13export const OPEN = 'open';
14export const DISCONNECT = 'disconnect';
15export const RECONNECT = 'reconnect';
16export const RETRY = 'retry';
17export const SCHEDULE = 'schedule';
18export const OFFLINE = 'offline';
19export const ONLINE = 'online';
20export const ERROR = 'error';
21export const MESSAGE = 'message';
22
23const HEARTBEAT_TIME = 180000;
24const TIMEOUT_TIME = 380000;
25
26const DEFAULT_RETRY_STRATEGY = attempt => Math.min(1000 * 2 ** attempt, 300000);
27
28const requireConnected = (target, name, descriptor) =>
29 Object.assign({}, descriptor, {
30 value: function requireConnectedWrapper(...args) {
31 this.checkConnectionAvailability(name);
32 return descriptor.value.call(this, ...args);
33 },
34 });
35
36class WebSocketPlus extends EventEmitter {
37 get urls() {
38 return this._urls;
39 }
40
41 set urls(urls) {
42 this._urls = ensureArray(urls);
43 }
44
45 constructor(getUrls, protocol) {
46 if (typeof WebSocket === 'undefined') {
47 throw new Error(
48 'WebSocket is undefined. Polyfill is required in this runtime.'
49 );
50 }
51 super();
52 this.init();
53 this._protocol = protocol;
54 Promise.resolve(typeof getUrls === 'function' ? getUrls() : getUrls)
55 .then(ensureArray)
56 .then(urls => {
57 this._urls = urls;
58 return this._open();
59 })
60 .then(() => {
61 this.__postponeTimeoutTimer = this._postponeTimeoutTimer.bind(this);
62 if (global.addEventListener) {
63 this.__pause = () => {
64 if (this.can('pause')) this.pause();
65 };
66 this.__resume = () => {
67 if (this.can('resume')) this.resume();
68 };
69 global.addEventListener('offline', this.__pause);
70 global.addEventListener('online', this.__resume);
71 }
72 this.open();
73 })
74 .catch(this.throw.bind(this));
75 }
76
77 _open() {
78 return this._createWs(this._urls, this._protocol).then(ws => {
79 const [first, ...reset] = this._urls;
80 this._urls = [...reset, first];
81 return ws;
82 });
83 }
84
85 _createWs(urls, protocol) {
86 return tryAll(
87 urls.map(url => (resolve, reject) => {
88 debug(`connect [${url}] ${protocol}`);
89 const ws = protocol ? new WebSocket(url, protocol) : new WebSocket(url);
90 ws.binaryType = this.binaryType || 'arraybuffer';
91 ws.onopen = () => resolve(ws);
92 ws.onclose = error => {
93 if (error instanceof Error) {
94 return reject(error);
95 }
96 // in browser, error event is useless
97 return reject(new Error(`Failed to connect [${url}]`));
98 };
99 ws.onerror = ws.onclose;
100 })
101 ).then(ws => {
102 this._ws = ws;
103 this._ws.onclose = this._handleClose.bind(this);
104 this._ws.onmessage = this._handleMessage.bind(this);
105 return ws;
106 });
107 }
108
109 _destroyWs() {
110 const ws = this._ws;
111 if (!ws) return;
112 ws.onopen = null;
113 ws.onclose = null;
114 ws.onerror = null;
115 ws.onmessage = null;
116 this._ws = null;
117 ws.close();
118 }
119
120 // eslint-disable-next-line class-methods-use-this
121 onbeforeevent(event, from, to, ...payload) {
122 debug(`${event}: ${from} -> ${to} %o`, payload);
123 }
124
125 onopen() {
126 this.emit(OPEN);
127 }
128
129 onconnected() {
130 this._startConnectionKeeper();
131 }
132
133 onleaveconnected(event, from, to) {
134 this._stopConnectionKeeper();
135 this._destroyWs();
136 if (to === 'offline' || to === 'disconnected') {
137 this.emit(DISCONNECT);
138 }
139 }
140
141 onpause() {
142 this.emit(OFFLINE);
143 }
144
145 onbeforeresume() {
146 this.emit(ONLINE);
147 }
148
149 onreconnect() {
150 this.emit(RECONNECT);
151 }
152
153 ondisconnected(event, from, to, attempt = 0) {
154 const delay = DEFAULT_RETRY_STRATEGY.call(null, attempt);
155 debug(`schedule attempt=${attempt} delay=${delay}`);
156 this.emit(SCHEDULE, attempt, delay);
157 if (this.__scheduledRetry) {
158 clearTimeout(this.__scheduledRetry);
159 }
160 this.__scheduledRetry = setTimeout(() => {
161 if (this.is('disconnected')) {
162 this.retry(attempt);
163 }
164 }, delay);
165 }
166
167 onretry(event, from, to, attempt = 0) {
168 this.emit(RETRY, attempt);
169 this._open().then(
170 () => (this.can('reconnect') ? this.reconnect() : this._destroyWs()),
171 () => this.can('fail') && this.fail(attempt + 1)
172 );
173 }
174
175 onerror(event, from, to, error) {
176 this.emit(ERROR, error);
177 }
178
179 onclose() {
180 if (global.removeEventListener) {
181 if (this.__pause) global.removeEventListener('offline', this.__pause);
182 if (this.__resume) global.removeEventListener('online', this.__resume);
183 }
184 }
185
186 checkConnectionAvailability(name = 'API') {
187 if (!this.is('connected')) {
188 const currentState = this.current;
189 console.warn(
190 `${name} should not be called when the connection is ${currentState}`
191 );
192 if (this.is('disconnected') || this.is('reconnecting')) {
193 console.warn(
194 'disconnect and reconnect event should be handled to avoid such calls.'
195 );
196 }
197 throw new Error('Connection unavailable');
198 }
199 }
200
201 // jsdoc-ignore-start
202 @requireConnected
203 // jsdoc-ignore-end
204 _ping() {
205 debug('ping');
206 try {
207 this.ping();
208 } catch (error) {
209 console.warn(`websocket ping error: ${error.message}`);
210 }
211 }
212
213 ping() {
214 if (this._ws.ping) {
215 this._ws.ping();
216 } else {
217 console.warn(`The WebSocket implement does not support sending ping frame.
218 Override ping method to use application defined ping/pong mechanism.`);
219 }
220 }
221
222 _postponeTimeoutTimer() {
223 debug('_postponeTimeoutTimer');
224 this._clearTimeoutTimers();
225 this._timeoutTimer = setTimeout(() => {
226 debug('timeout');
227 this.disconnect();
228 }, TIMEOUT_TIME);
229 }
230
231 _clearTimeoutTimers() {
232 if (this._timeoutTimer) {
233 clearTimeout(this._timeoutTimer);
234 }
235 }
236
237 _startConnectionKeeper() {
238 debug('start connection keeper');
239 this._heartbeatTimer = setInterval(this._ping.bind(this), HEARTBEAT_TIME);
240 const addListener = this._ws.addListener || this._ws.addEventListener;
241 if (!addListener) {
242 debug('connection keeper disabled due to the lack of #addEventListener.');
243 return;
244 }
245 addListener.call(this._ws, 'message', this.__postponeTimeoutTimer);
246 addListener.call(this._ws, 'pong', this.__postponeTimeoutTimer);
247 this._postponeTimeoutTimer();
248 }
249
250 _stopConnectionKeeper() {
251 debug('stop connection keeper');
252 // websockets/ws#489
253 const removeListener =
254 this._ws.removeListener || this._ws.removeEventListener;
255 if (removeListener) {
256 removeListener.call(this._ws, 'message', this.__postponeTimeoutTimer);
257 removeListener.call(this._ws, 'pong', this.__postponeTimeoutTimer);
258 this._clearTimeoutTimers();
259 }
260 if (this._heartbeatTimer) {
261 clearInterval(this._heartbeatTimer);
262 }
263 }
264
265 _handleClose(event) {
266 debug(`ws closed [${event.code}] ${event.reason}`);
267 // socket closed manually, ignore close event.
268 if (this.isFinished()) return;
269 this.handleClose(event);
270 }
271
272 handleClose() {
273 // reconnect
274 this.disconnect();
275 }
276
277 // jsdoc-ignore-start
278 @requireConnected
279 // jsdoc-ignore-end
280 send(data) {
281 debug('send', data);
282 this._ws.send(data);
283 }
284
285 _handleMessage(event) {
286 debug('message', event.data);
287 this.handleMessage(event.data);
288 }
289
290 handleMessage(message) {
291 this.emit(MESSAGE, message);
292 }
293}
294
295StateMachine.create({
296 target: WebSocketPlus.prototype,
297 initial: {
298 state: 'initialized',
299 event: 'init',
300 defer: true,
301 },
302 terminal: 'closed',
303 events: [
304 {
305 name: 'open',
306 from: 'initialized',
307 to: 'connected',
308 },
309 {
310 name: 'disconnect',
311 from: 'connected',
312 to: 'disconnected',
313 },
314 {
315 name: 'retry',
316 from: 'disconnected',
317 to: 'reconnecting',
318 },
319 {
320 name: 'fail',
321 from: 'reconnecting',
322 to: 'disconnected',
323 },
324 {
325 name: 'reconnect',
326 from: 'reconnecting',
327 to: 'connected',
328 },
329 {
330 name: 'pause',
331 from: ['connected', 'disconnected', 'reconnecting'],
332 to: 'offline',
333 },
334 {},
335 {
336 name: 'resume',
337 from: 'offline',
338 to: 'disconnected',
339 },
340 {
341 name: 'close',
342 from: ['connected', 'disconnected', 'reconnecting', 'offline'],
343 to: 'closed',
344 },
345 {
346 name: 'throw',
347 from: '*',
348 to: 'error',
349 },
350 ],
351});
352
353export default WebSocketPlus;