UNPKG

16 kBJavaScriptView Raw
1"use strict";
2var __importDefault = (this && this.__importDefault) || function (mod) {
3 return (mod && mod.__esModule) ? mod : { "default": mod };
4};
5Object.defineProperty(exports, "__esModule", { value: true });
6exports.Io = exports.IO_EVENT_DICT = void 0;
7/**
8 * Wechaty Chatbot SDK - https://github.com/wechaty/wechaty
9 *
10 * @copyright 2016 Huan LI (李卓桓) <https://github.com/huan>, and
11 * Wechaty Contributors <https://github.com/wechaty>.
12 *
13 * Licensed under the Apache License, Version 2.0 (the "License");
14 * you may not use this file except in compliance with the License.
15 * You may obtain a copy of the License at
16 *
17 * http://www.apache.org/licenses/LICENSE-2.0
18 *
19 * Unless required by applicable law or agreed to in writing, software
20 * distributed under the License is distributed on an "AS IS" BASIS,
21 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 * See the License for the specific language governing permissions and
23 * limitations under the License.
24 *
25 */
26const ws_1 = __importDefault(require("ws"));
27const wechaty_puppet_1 = require("wechaty-puppet");
28const json_rpc_peer_1 = require("json-rpc-peer");
29const config_1 = require("./config");
30const io_peer_1 = require("./io-peer/io-peer");
31exports.IO_EVENT_DICT = {
32 botie: 'tbw',
33 error: 'tbw',
34 heartbeat: 'tbw',
35 jsonrpc: 'JSON RPC',
36 login: 'tbw',
37 logout: 'tbw',
38 message: 'tbw',
39 raw: 'tbw',
40 reset: 'tbw',
41 scan: 'tbw',
42 shutdown: 'tbw',
43 sys: 'tbw',
44 update: 'tbw',
45};
46/**
47 * https://github.com/Chatie/botie/issues/2
48 * https://github.com/actions/github-script/blob/f035cea4677903b153fa754aa8c2bba66f8dc3eb/src/async-function.ts#L6
49 */
50const AsyncFunction = Object.getPrototypeOf(async () => null).constructor;
51// function callAsyncFunction<U extends {} = {}, V = unknown> (
52// args: U,
53// source: string
54// ): Promise<V> {
55// const fn = new AsyncFunction(...Object.keys(args), source)
56// return fn(...Object.values(args))
57// }
58class Io {
59 constructor(options) {
60 this.options = options;
61 this.eventBuffer = [];
62 this.state = new wechaty_puppet_1.StateSwitch('Io', { log: config_1.log });
63 options.apihost = options.apihost || config_1.config.apihost;
64 options.protocol = options.protocol || config_1.config.default.DEFAULT_PROTOCOL;
65 this.id = options.wechaty.id;
66 this.protocol = options.protocol + '|' + options.wechaty.id + '|' + config_1.config.serviceIp + '|' + options.servicePort;
67 config_1.log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], cuid[%s]', options.apihost, options.token, options.protocol, this.id);
68 if (options.servicePort) {
69 this.jsonRpc = io_peer_1.getPeer({
70 serviceGrpcPort: this.options.servicePort,
71 });
72 }
73 }
74 toString() {
75 return `Io<${this.options.token}>`;
76 }
77 connected() {
78 return this.ws && this.ws.readyState === ws_1.default.OPEN;
79 }
80 async start() {
81 config_1.log.verbose('Io', 'start()');
82 if (this.lifeTimer) {
83 throw new Error('lifeTimer exist');
84 }
85 this.state.on('pending');
86 try {
87 this.initEventHook();
88 this.ws = await this.initWebSocket();
89 this.options.wechaty.on('login', () => { this.scanPayload = undefined; });
90 this.options.wechaty.on('scan', (qrcode, status) => {
91 this.scanPayload = {
92 ...this.scanPayload,
93 qrcode,
94 status,
95 };
96 });
97 this.lifeTimer = setInterval(() => {
98 if (this.ws && this.connected()) {
99 config_1.log.silly('Io', 'start() setInterval() ws.ping()');
100 // TODO: check 'pong' event on ws
101 this.ws.ping();
102 }
103 }, 1000 * 10);
104 this.state.on(true);
105 }
106 catch (e) {
107 config_1.log.warn('Io', 'start() exception: %s', e.message);
108 this.state.off(true);
109 throw e;
110 }
111 }
112 initEventHook() {
113 config_1.log.verbose('Io', 'initEventHook()');
114 const wechaty = this.options.wechaty;
115 wechaty.on('error', error => this.send({ name: 'error', payload: error }));
116 wechaty.on('heartbeat', data => this.send({ name: 'heartbeat', payload: { cuid: this.id, data } }));
117 wechaty.on('login', user => this.send({ name: 'login', payload: user.payload }));
118 wechaty.on('logout', user => this.send({ name: 'logout', payload: user.payload }));
119 wechaty.on('message', message => this.ioMessage(message));
120 // FIXME: payload schema need to be defined universal
121 // wechaty.on('scan', (url, code) => this.send({ name: 'scan', payload: { url, code } }))
122 wechaty.on('scan', (qrcode, status) => this.send({ name: 'scan', payload: { qrcode, status } }));
123 }
124 async initWebSocket() {
125 config_1.log.verbose('Io', 'initWebSocket()');
126 // this.state.current('on', false)
127 // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
128 const auth = 'Token ' + this.options.token;
129 const headers = { Authorization: auth };
130 if (!this.options.apihost) {
131 throw new Error('no apihost');
132 }
133 let endpoint = 'wss://' + this.options.apihost + '/v0/websocket';
134 // XXX quick and dirty: use no ssl for API_HOST other than official
135 // FIXME: use a configurable VARIABLE for the domain name at here:
136 if (!/api\.chatie\.io/.test(this.options.apihost)) {
137 endpoint = 'ws://' + this.options.apihost + '/v0/websocket';
138 }
139 const ws = this.ws = new ws_1.default(endpoint, this.protocol, { headers });
140 ws.on('open', () => this.wsOnOpen(ws));
141 ws.on('message', data => this.wsOnMessage(data));
142 ws.on('error', e => this.wsOnError(e));
143 ws.on('close', (code, reason) => this.wsOnClose(ws, code, reason));
144 await new Promise((resolve, reject) => {
145 ws.once('open', resolve);
146 ws.once('error', reject);
147 ws.once('close', reject);
148 });
149 return ws;
150 }
151 async wsOnOpen(ws) {
152 if (this.protocol !== ws.protocol) {
153 config_1.log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol);
154 // XXX deal with error?
155 }
156 config_1.log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol);
157 // this.currentState('connected')
158 // this.state.current('on')
159 // FIXME: how to keep alive???
160 // ws._socket.setKeepAlive(true, 100)
161 this.reconnectTimeout = undefined;
162 const name = 'sys';
163 const payload = 'Wechaty version ' + this.options.wechaty.version() + ` with CUID: ${this.id}`;
164 const initEvent = {
165 name,
166 payload,
167 };
168 await this.send(initEvent);
169 }
170 async wsOnMessage(data) {
171 config_1.log.silly('Io', 'initWebSocket() ws.on(message): %s', data);
172 // flags.binary will be set if a binary data is received.
173 // flags.masked will be set if the data was masked.
174 if (typeof data !== 'string') {
175 throw new Error('data should be string...');
176 }
177 const ioEvent = {
178 name: 'raw',
179 payload: data,
180 };
181 try {
182 const obj = JSON.parse(data);
183 ioEvent.name = obj.name;
184 ioEvent.payload = obj.payload;
185 }
186 catch (e) {
187 config_1.log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data);
188 }
189 switch (ioEvent.name) {
190 case 'botie':
191 {
192 const payload = ioEvent.payload;
193 const args = payload.args;
194 const source = payload.source;
195 try {
196 if (args[0] === 'message' && args.length === 1) {
197 const fn = new AsyncFunction(...args, source);
198 this.onMessage = fn;
199 }
200 else {
201 config_1.log.warn('Io', 'server pushed function is invalid. args: %s', JSON.stringify(args));
202 }
203 }
204 catch (e) {
205 config_1.log.warn('Io', 'server pushed function exception: %s', e);
206 this.options.wechaty.emit('error', e);
207 }
208 }
209 break;
210 case 'reset':
211 config_1.log.verbose('Io', 'on(reset): %s', ioEvent.payload);
212 await this.options.wechaty.reset(ioEvent.payload);
213 break;
214 case 'shutdown':
215 config_1.log.info('Io', 'on(shutdown): %s', ioEvent.payload);
216 process.exit(0);
217 // eslint-disable-next-line
218 break;
219 case 'update':
220 config_1.log.verbose('Io', 'on(update): %s', ioEvent.payload);
221 {
222 const wechaty = this.options.wechaty;
223 if (wechaty.logonoff()) {
224 const loginEvent = {
225 name: 'login',
226 payload: wechaty.userSelf().payload,
227 };
228 await this.send(loginEvent);
229 }
230 if (this.scanPayload) {
231 const scanEvent = {
232 name: 'scan',
233 payload: this.scanPayload,
234 };
235 await this.send(scanEvent);
236 }
237 }
238 break;
239 case 'sys':
240 // do nothing
241 break;
242 case 'logout':
243 config_1.log.info('Io', 'on(logout): %s', ioEvent.payload);
244 await this.options.wechaty.logout();
245 break;
246 case 'jsonrpc':
247 config_1.log.info('Io', 'on(jsonrpc): %s', ioEvent.payload);
248 try {
249 const request = ioEvent.payload;
250 if (!io_peer_1.isJsonRpcRequest(request)) {
251 config_1.log.warn('Io', 'on(jsonrpc) payload is not a jsonrpc request: %s', JSON.stringify(request));
252 return;
253 }
254 if (!this.jsonRpc) {
255 throw new Error('jsonRpc not initialized!');
256 }
257 const response = await this.jsonRpc.exec(request);
258 if (!response) {
259 config_1.log.warn('Io', 'on(jsonrpc) response is undefined.');
260 return;
261 }
262 const payload = json_rpc_peer_1.parse(response);
263 const jsonrpcEvent = {
264 name: 'jsonrpc',
265 payload,
266 };
267 config_1.log.verbose('Io', 'on(jsonrpc) send(%s)', response);
268 await this.send(jsonrpcEvent);
269 }
270 catch (e) {
271 config_1.log.error('Io', 'on(jsonrpc): %s', e);
272 }
273 break;
274 default:
275 config_1.log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload);
276 break;
277 }
278 }
279 // FIXME: it seems the parameter `e` might be `undefined`.
280 // @types/ws might has bug for `ws.on('error', e => this.wsOnError(e))`
281 wsOnError(e) {
282 config_1.log.warn('Io', 'initWebSocket() error event[%s]', e && e.message);
283 if (!e) {
284 return;
285 }
286 this.options.wechaty.emit('error', e);
287 // when `error`, there must have already a `close` event
288 // we should not call this.reconnect() again
289 //
290 // this.close()
291 // this.reconnect()
292 }
293 wsOnClose(ws, code, message) {
294 if (this.state.on()) {
295 config_1.log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message);
296 ws.close();
297 this.reconnect();
298 }
299 }
300 reconnect() {
301 config_1.log.verbose('Io', 'reconnect()');
302 if (this.state.off()) {
303 config_1.log.warn('Io', 'reconnect() canceled because state.target() === offline');
304 return;
305 }
306 if (this.connected()) {
307 config_1.log.warn('Io', 'reconnect() on a already connected io');
308 return;
309 }
310 if (this.reconnectTimer) {
311 config_1.log.warn('Io', 'reconnect() on a already re-connecting io');
312 return;
313 }
314 if (!this.reconnectTimeout) {
315 this.reconnectTimeout = 1;
316 }
317 else if (this.reconnectTimeout < 10 * 1000) {
318 this.reconnectTimeout *= 3;
319 }
320 config_1.log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000));
321 this.reconnectTimer = setTimeout(async () => {
322 this.reconnectTimer = undefined;
323 await this.initWebSocket();
324 }, this.reconnectTimeout); // as any as NodeJS.Timer
325 }
326 async send(ioEvent) {
327 if (!this.ws) {
328 throw new Error('no ws');
329 }
330 const ws = this.ws;
331 if (ioEvent) {
332 config_1.log.silly('Io', 'send(%s)', JSON.stringify(ioEvent));
333 this.eventBuffer.push(ioEvent);
334 }
335 else {
336 config_1.log.silly('Io', 'send()');
337 }
338 if (!this.connected()) {
339 config_1.log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length);
340 return;
341 }
342 const list = [];
343 while (this.eventBuffer.length) {
344 const data = JSON.stringify(this.eventBuffer.shift());
345 const p = new Promise((resolve, reject) => ws.send(data, (err) => {
346 if (err) {
347 reject(err);
348 }
349 else {
350 resolve();
351 }
352 }));
353 list.push(p);
354 }
355 try {
356 await Promise.all(list);
357 }
358 catch (e) {
359 config_1.log.error('Io', 'send() exception: %s', e.stack);
360 throw e;
361 }
362 }
363 async stop() {
364 config_1.log.verbose('Io', 'stop()');
365 if (!this.ws) {
366 throw new Error('no ws');
367 }
368 this.state.off('pending');
369 // try to send IoEvents in buffer
370 await this.send();
371 this.eventBuffer = [];
372 if (this.reconnectTimer) {
373 clearTimeout(this.reconnectTimer);
374 this.reconnectTimer = undefined;
375 }
376 if (this.lifeTimer) {
377 clearInterval(this.lifeTimer);
378 this.lifeTimer = undefined;
379 }
380 this.ws.close();
381 await new Promise(resolve => {
382 if (this.ws) {
383 this.ws.once('close', resolve);
384 }
385 else {
386 resolve();
387 }
388 });
389 this.ws = undefined;
390 this.state.off(true);
391 }
392 /**
393 *
394 * Prepare to be overwritten by server setting
395 *
396 */
397 async ioMessage(m) {
398 config_1.log.silly('Io', 'ioMessage() is a nop function before be overwritten from cloud');
399 if (typeof this.onMessage === 'function') {
400 await this.onMessage(m);
401 }
402 }
403 async syncMessage(m) {
404 config_1.log.silly('Io', 'syncMessage(%s)', m);
405 const messageEvent = {
406 name: 'message',
407 payload: m.payload,
408 };
409 await this.send(messageEvent);
410 }
411}
412exports.Io = Io;
413//# sourceMappingURL=io.js.map
\No newline at end of file