UNPKG

8.7 kBPlain TextView Raw
1// Copyright 2017-2018 @polkadot/api-provider authors & contributors
2// This software may be modified and distributed under the terms
3// of the ISC license. See the LICENSE file for details.
4
5import { Logger } from '@polkadot/util/types';
6import { RpcCoder } from '../coder/json/types';
7import { JsonRpcResponse, ProviderInterface, ProviderInterface$Callback, ProviderInterface$Emitted, ProviderInterface$EmitCb } from '../types';
8
9import './polyfill';
10
11import E3 from 'eventemitter3';
12import assert from '@polkadot/util/assert';
13import isNull from '@polkadot/util/is/null';
14import isUndefined from '@polkadot/util/is/undefined';
15import logger from '@polkadot/util/logger';
16
17import coder from '../coder/json';
18
19type SubscriptionHandler = {
20 callback: ProviderInterface$Callback,
21 type: string
22};
23
24type WsState$Awaiting = {
25 callback: ProviderInterface$Callback,
26 method: string,
27 params: Array<any>,
28 subscription?: SubscriptionHandler
29};
30
31type WsState$Subscription = SubscriptionHandler & {
32 method: string,
33 params: Array<any>
34};
35
36interface WSProviderInterface extends ProviderInterface {
37 connect (): void;
38}
39
40/**
41 * The WebSocket Provider allows sending requests using WebSocket. Unlike the [[HttpProvider]],
42 * it does support subscriptions and allows listening to events such as new blocks or balance changes.
43 *
44 * @example
45 * import createApi from '@polkadot/api';
46 * import WsProvider from '@polkadot/api-provider/ws';
47 * const provider = new WsProvider('ws://127.0.0.1:9944');
48 * const api = createApi(provider);
49 *
50 * @see [[HttpProvider]]
51 */
52export default class WsProvider extends E3.EventEmitter implements WSProviderInterface {
53 private autoConnect: boolean;
54 private coder: RpcCoder;
55 private endpoint: string;
56 private handlers: {
57 [index: number]: WsState$Awaiting
58 };
59 private _isConnected: boolean;
60 private l: Logger;
61 private queued: {
62 [index: string]: string
63 };
64 private subscriptions: {
65 [index: string]: WsState$Subscription
66 };
67 private websocket: WebSocket | null;
68
69 /**
70 * @param {string} endpoint The endpoint url. Usually `ws://ip:9944` or `wss://ip:9944`
71 * @param {boolean = true} autoConnect Whether to connect automatically or not.
72 */
73 constructor (endpoint: string, autoConnect: boolean = true) {
74 super();
75
76 assert(/^(wss|ws):\/\//.test(endpoint), `Endpoint should start with 'ws://', received '${endpoint}'`);
77
78 this.autoConnect = autoConnect;
79 this.coder = coder();
80 this.endpoint = endpoint;
81 this._isConnected = false;
82 this.handlers = {};
83 this.l = logger('api-ws');
84 this.queued = {};
85 this.subscriptions = {};
86 this.websocket = null;
87
88 if (autoConnect) {
89 this.connect();
90 }
91 }
92
93 /**
94 * The [[WsProvider]] connects automatically by default. if you decided otherwise, you may
95 * connect manually using this method.
96 */
97 connect (): void {
98 try {
99 this.websocket = new WebSocket(this.endpoint);
100
101 this.websocket.onclose = this.onSocketClose;
102 this.websocket.onerror = this.onSocketError;
103 this.websocket.onmessage = this.onSocketMessage;
104 this.websocket.onopen = this.onSocketOpen;
105 } catch (error) {
106 this.l.error(error);
107 }
108 }
109
110 /**
111 * Whether the node is connected or not.
112 * @return {boolean} true if connected
113 */
114 isConnected (): boolean {
115 return this._isConnected;
116 }
117
118 /**
119 * Listens on events after having subscribed using the [[subscribe]] function.
120 * @param {ProviderInterface$Emitted} type Event
121 * @param {ProviderInterface$EmitCb} sub Callback
122 * @return {this} [description]
123 */
124 on (type: ProviderInterface$Emitted, sub: ProviderInterface$EmitCb): this {
125 return super.on(type, sub);
126 }
127
128 async send (method: string, params: Array<any>, subscription?: SubscriptionHandler): Promise<any> {
129 return new Promise((resolve, reject): void => {
130 try {
131 const json = this.coder.encodeJson(method, params);
132 const id = this.coder.getId();
133 const callback = (error: Error | null, result: any) => {
134 if (error) {
135 reject(error);
136 } else {
137 resolve(result);
138 }
139 };
140
141 this.l.debug(() => ['calling', method, params, json, !!subscription]);
142
143 this.handlers[id] = {
144 callback,
145 method,
146 params,
147 subscription
148 };
149
150 if (this.isConnected() && !isNull(this.websocket)) {
151 this.websocket.send(json);
152 } else {
153 this.queued[id] = json;
154 }
155 } catch (error) {
156 reject(error);
157 }
158 });
159 }
160
161 /**
162 * Allows subscribing to a specific event.
163 * @param {string} type Subscription type
164 * @param {string} method Subscription method
165 * @param {Array<any>} params Parameters
166 * @param {ProviderInterface$Callback} callback Callback
167 * @return {Promise<number>} Promise resolving to the dd of the subscription you can use with [[unsubscribe]].
168 *
169 * @example
170 * const provider = new WsProvider('ws://127.0.0.1:9944');
171 * const api = createApi(provider);
172 *
173 * api.state.storage([[storage.balances.freeBalance, <Address>]], (_, values) => {
174 * console.log(values)
175 * }).then((subscriptionId) => {
176 * console.log('balance changes subscription id: ', subscriptionId)
177 * })
178 */
179 async subscribe (type: string, method: string, params: Array<any>, callback: ProviderInterface$Callback): Promise<number> {
180 const id = await this.send(method, params, { callback, type });
181
182 return id as number;
183 }
184
185 /**
186 * Allows unsubscribing to subscriptions made with [[subscribe]].
187 */
188 async unsubscribe (type: string, method: string, id: number): Promise<boolean> {
189 const subscription = `${type}::${id}`;
190
191 assert(!isUndefined(this.subscriptions[subscription]), `Unable to find active subscription=${subscription}`);
192
193 delete this.subscriptions[subscription];
194
195 const result = await this.send(method, [id]);
196
197 return result as boolean;
198 }
199
200 private onSocketClose = (): void => {
201 this.l.debug(() => ['disconnected from', this.endpoint]);
202
203 this._isConnected = false;
204 this.emit('disconnected');
205
206 if (this.autoConnect) {
207 setTimeout(() => {
208 this.connect();
209 }, 1000);
210 }
211 }
212
213 private onSocketError = (error: Event): void => {
214 this.l.error(error);
215 }
216
217 private onSocketMessage = (message: MessageEvent): void => {
218 this.l.debug(() => ['received', message.data]);
219
220 const response: JsonRpcResponse = JSON.parse(message.data as string);
221
222 return isUndefined(response.method)
223 ? this.onSocketMessageResult(response)
224 : this.onSocketMessageSubscribe(response);
225 }
226
227 private onSocketMessageResult = (response: JsonRpcResponse): void => {
228 this.l.debug(() => ['handling: response =', response, 'id =', response.id]);
229
230 const handler = this.handlers[response.id];
231
232 if (!handler) {
233 this.l.error(`Unable to find handler for id=${response.id}`);
234 return;
235 }
236
237 try {
238 const { method, params, subscription } = handler;
239 const result = this.coder.decodeResponse(response);
240
241 if (subscription) {
242 this.subscriptions[`${subscription.type}::${result}`] = {
243 ...subscription,
244 method,
245 params
246 };
247 }
248
249 handler.callback(null, result);
250 } catch (error) {
251 handler.callback(error, undefined);
252 }
253
254 delete this.handlers[response.id];
255 }
256
257 private onSocketMessageSubscribe = (response: JsonRpcResponse): void => {
258 const subscription = `${response.method}::${response.params.subscription}`;
259
260 this.l.debug(() => ['handling: response =', response, 'subscription =', subscription]);
261
262 const handler = this.subscriptions[subscription];
263
264 if (!handler) {
265 this.l.error(`Unable to find handler for subscription=${subscription}`);
266 return;
267 }
268
269 try {
270 const result = this.coder.decodeResponse(response);
271
272 handler.callback(null, result);
273 } catch (error) {
274 handler.callback(error, undefined);
275 }
276 }
277
278 private onSocketOpen = (): boolean => {
279 assert(!isNull(this.websocket), 'WebSocket cannot be null in onOpen');
280
281 this.l.debug(() => ['connected to', this.endpoint]);
282
283 this._isConnected = true;
284 this.emit('connected');
285
286 Object.keys(this.queued).forEach((id) => {
287 try {
288 // @ts-ignore checked above
289 this.websocket.send(
290 this.queued[id]
291 );
292
293 delete this.queued[id];
294 } catch (error) {
295 this.l.error(error);
296 }
297 });
298
299 return true;
300 }
301}