UNPKG

2.21 kBJavaScriptView Raw
1const EventEmitter = require('events');
2const WS = require('ws');
3const BaseProvider = require('./BaseProvider');
4
5class WebsocketProvider extends BaseProvider {
6 constructor(url, options) {
7 super(url, options);
8 this.messageEvent = new EventEmitter();
9 }
10
11 async getWS() {
12 if (!this.ws || this.ws.readyState !== WS.OPEN) {
13 const ws = new WS(this.url);
14
15 // catch connecting error
16 ws.once('error', e => {
17 throw new Error(e);
18 });
19
20 // wait till open
21 await new Promise(resolve => {
22 ws.once('open', () => {
23 // ignore message error
24 ws.removeEventListener('error');
25 ws.on('error', () => {
26 if (ws.readyState === WS.OPEN) {
27 ws.terminate();
28 }
29 });
30 resolve();
31 });
32 });
33
34 // transfer message by id
35 ws.on('message', message => {
36 const body = JSON.parse(message);
37 this.messageEvent.emit(body.id, body);
38 });
39
40 // XXX: is the garbage collection will control the old `this.ws` ?
41 this.ws = ws;
42 }
43
44 return this.ws;
45 }
46
47 async call(method, ...params) {
48 const startTime = Date.now();
49 const data = { jsonrpc: '2.0', id: this.requestId(), method, params };
50
51 const ws = await this.getWS();
52
53 return new Promise((resolve, reject) => {
54 const timeoutHandle = setTimeout(
55 () => reject(new Error(`timeout when call ${method}(${params.join(',')}) after ${this.timeout} ms`)),
56 this.timeout,
57 );
58
59 this.messageEvent.once(data.id, ({ error, result }) => {
60 clearTimeout(timeoutHandle);
61
62 if (error) {
63 this.logger.error({ data, error, duration: Date.now() - startTime });
64 reject(new BaseProvider.RPCError(error));
65 } else {
66 this.logger.info({ data, result, duration: Date.now() - startTime });
67 resolve(result);
68 }
69 });
70
71 ws.send(JSON.stringify(data));
72 });
73 }
74
75 close() {
76 if (this.ws) {
77 this.ws.terminate();
78 this.messageEvent.removeAllListeners();
79 }
80 }
81}
82
83module.exports = WebsocketProvider;