UNPKG

16.1 kBPlain TextView Raw
1/**
2 * Wechaty Chatbot SDK - https://github.com/wechaty/wechaty
3 *
4 * @copyright 2016 Huan LI (李卓桓) <https://github.com/huan>, and
5 * Wechaty Contributors <https://github.com/wechaty>.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 */
20import WebSocket from 'ws'
21
22import type * as PUPPET from 'wechaty-puppet'
23import { StateSwitch } from 'state-switch'
24
25import * as jsonRpcPeer from 'json-rpc-peer'
26
27import type {
28 MessageInterface,
29} from './user-modules/mod.js'
30import type { WechatyInterface } from './wechaty/mod.js'
31
32import {
33 log,
34 config,
35} from './config.js'
36
37import {
38 getPeer,
39 isJsonRpcRequest,
40} from './io-peer/io-peer.js'
41
42export interface IoOptions {
43 wechaty : WechatyInterface,
44 token : string,
45 apihost? : string,
46 protocol? : string,
47 servicePort? : number,
48}
49
50export const IO_EVENT_DICT = {
51 botie : 'tbw',
52 error : 'tbw',
53 heartbeat : 'tbw',
54 jsonrpc : 'JSON RPC',
55 login : 'tbw',
56 logout : 'tbw',
57 message : 'tbw',
58 raw : 'tbw',
59 reset : 'tbw',
60 scan : 'tbw',
61 shutdown : 'tbw',
62 sys : 'tbw',
63 update : 'tbw',
64}
65
66type IoEventName = keyof typeof IO_EVENT_DICT
67
68interface IoEventScan {
69 name : 'scan',
70 payload : PUPPET.payloads.EventScan,
71}
72
73interface IoEventJsonRpc {
74 name : 'jsonrpc',
75 payload : jsonRpcPeer.JsonRpcPayload,
76}
77
78interface IoEventAny {
79 name: IoEventName,
80 payload: any,
81}
82
83type IoEvent = IoEventScan | IoEventJsonRpc | IoEventAny
84
85/**
86 * https://github.com/Chatie/botie/issues/2
87 * https://github.com/actions/github-script/blob/f035cea4677903b153fa754aa8c2bba66f8dc3eb/src/async-function.ts#L6
88 */
89const AsyncFunction = Object.getPrototypeOf(async () => null).constructor
90
91// function callAsyncFunction<U extends {} = {}, V = unknown> (
92// args: U,
93// source: string
94// ): Promise<V> {
95// const fn = new AsyncFunction(...Object.keys(args), source)
96// return fn(...Object.values(args))
97// }
98
99export class Io {
100
101 protected readonly id : string
102 protected readonly protocol : string
103 protected eventBuffer : IoEvent[] = []
104 protected ws : undefined | WebSocket
105
106 protected readonly state = new StateSwitch('Io', { log })
107
108 protected reconnectTimer? : ReturnType<typeof setTimeout>
109 protected reconnectTimeout? : number
110
111 protected lifeTimer? : ReturnType<typeof setTimeout>
112
113 protected onMessage: undefined | Function
114
115 protected scanPayload?: PUPPET.payloads.EventScan
116
117 protected jsonRpc?: jsonRpcPeer.Peer
118
119 constructor (
120 protected options: IoOptions,
121 ) {
122 options.apihost = options.apihost || config.apihost
123 options.protocol = options.protocol || config.default.DEFAULT_PROTOCOL
124
125 this.id = options.wechaty.id
126
127 this.protocol = options.protocol + '|' + options.wechaty.id + '|' + config.serviceIp + '|' + options.servicePort
128 log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], cuid[%s]',
129 options.apihost,
130 options.token,
131 options.protocol,
132 this.id,
133 )
134
135 if (options.servicePort) {
136 this.jsonRpc = getPeer({
137 serviceGrpcPort: this.options.servicePort!,
138 })
139 }
140
141 }
142
143 public toString () {
144 return `Io<${this.options.token}>`
145 }
146
147 private connected () {
148 return this.ws && this.ws.readyState === WebSocket.OPEN
149 }
150
151 public async start (): Promise<void> {
152 log.verbose('Io', 'start()')
153
154 if (this.lifeTimer) {
155 throw new Error('lifeTimer exist')
156 }
157
158 this.state.active('pending')
159
160 try {
161 this.initEventHook()
162
163 this.ws = await this.initWebSocket()
164
165 this.options.wechaty.on('login', () => { this.scanPayload = undefined })
166 this.options.wechaty.on('scan', (qrcode, status) => {
167 this.scanPayload = {
168 ...this.scanPayload,
169 qrcode,
170 status,
171 }
172 })
173
174 this.lifeTimer = setInterval(() => {
175 if (this.ws && this.connected()) {
176 log.silly('Io', 'start() setInterval() ws.ping()')
177 // TODO: check 'pong' event on ws
178 this.ws.ping()
179 }
180 }, 1000 * 10)
181
182 this.state.active(true)
183
184 } catch (e) {
185 log.warn('Io', 'start() exception: %s', (e as Error).message)
186 this.state.inactive(true)
187 throw e
188 }
189 }
190
191 private initEventHook () {
192 log.verbose('Io', 'initEventHook()')
193 const wechaty = this.options.wechaty
194
195 wechaty.on('error', error => this.send({ name: 'error', payload: error }))
196 wechaty.on('heartbeat', data => this.send({ name: 'heartbeat', payload: { cuid: this.id, data } }))
197 wechaty.on('login', user => this.send({ name: 'login', payload: wechaty.puppet.contactPayload(user.id) }))
198 wechaty.on('logout', user => this.send({ name: 'logout', payload: wechaty.puppet.contactPayload(user.id) }))
199 wechaty.on('message', message => this.ioMessage(message))
200
201 // FIXME: payload schema need to be defined universal
202 // wechaty.on('scan', (url, code) => this.send({ name: 'scan', payload: { url, code } }))
203 wechaty.on('scan', (qrcode, status) => this.send({ name: 'scan', payload: { qrcode, status } } as IoEventScan))
204 }
205
206 private async initWebSocket (): Promise<WebSocket> {
207 log.verbose('Io', 'initWebSocket()')
208 // this.state.current('on', false)
209
210 // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
211 const auth = 'Token ' + this.options.token
212 const headers = { Authorization: auth }
213
214 if (!this.options.apihost) {
215 throw new Error('no apihost')
216 }
217 let endpoint = 'wss://' + this.options.apihost + '/v0/websocket'
218
219 // XXX quick and dirty: use no ssl for API_HOST other than official
220 // FIXME: use a configurable VARIABLE for the domain name at here:
221 if (!/api\.chatie\.io/.test(this.options.apihost)) {
222 endpoint = 'ws://' + this.options.apihost + '/v0/websocket'
223 }
224
225 const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })
226
227 ws.on('open', () => this.wsOnOpen(ws))
228 ws.on('message', data => this.wsOnMessage(data))
229 ws.on('error', e => this.wsOnError(e))
230 ws.on('close', (code, reason) => this.wsOnClose(ws, code, reason.toString()))
231
232 await new Promise((resolve, reject) => {
233 ws.once('open', resolve)
234 ws.once('error', reject)
235 ws.once('close', reject)
236 })
237
238 return ws
239 }
240
241 private wsOnOpen (ws: WebSocket): void {
242 if (this.protocol !== ws.protocol) {
243 log.error('Io', 'wsOnOpen() require protocol[%s] failed', this.protocol)
244 // XXX deal with error?
245 }
246 log.verbose('Io', 'wsOnOpen() connected with protocol [%s]', ws.protocol)
247 // this.currentState('connected')
248 // this.state.current('on')
249
250 // FIXME: how to keep alive???
251 // ws._socket.setKeepAlive(true, 100)
252
253 this.reconnectTimeout = undefined
254
255 const name = 'sys'
256 const payload = 'Wechaty version ' + this.options.wechaty.version() + ` with CUID: ${this.id}`
257
258 const initEvent: IoEvent = {
259 name,
260 payload,
261 }
262 this.send(initEvent).catch(console.error)
263 }
264
265 private wsOnMessage (data: WebSocket.Data): void {
266 log.silly('Io', 'wsOnMessage() ws.on(message): %s', data)
267 this.wsOnMessageAsync(data).catch(console.error)
268 }
269
270 private async wsOnMessageAsync (data: WebSocket.Data): Promise<void> {
271 log.silly('Io', 'wsOnMessageAsync() ws.on(message): %s', data)
272
273 // flags.binary will be set if a binary data is received.
274 // flags.masked will be set if the data was masked.
275 let payload: string
276
277 if (Array.isArray(data)) {
278 payload = data[0]?.toString() ?? 'NODATA'
279 } else {
280 payload = data.toString()
281 }
282
283 const ioEvent: IoEvent = {
284 name : 'raw',
285 payload : data,
286 }
287
288 try {
289 const obj = JSON.parse(payload)
290 ioEvent.name = obj.name
291 ioEvent.payload = obj.payload
292 } catch (e) {
293 log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
294 }
295
296 switch (ioEvent.name) {
297 case 'botie':
298 {
299 const payload = ioEvent.payload
300
301 const args = payload.args
302 const source = payload.source
303
304 try {
305 if (args[0] === 'message' && args.length === 1) {
306 const fn = new AsyncFunction(...args, source)
307 this.onMessage = fn
308 } else {
309 log.warn('Io', 'server pushed function is invalid. args: %s', JSON.stringify(args))
310 }
311 } catch (e) {
312 log.warn('Io', 'server pushed function exception: %s', e)
313 this.options.wechaty.emitError(e)
314 }
315 }
316 break
317
318 case 'reset':
319 log.verbose('Io', 'on(reset): %s', ioEvent.payload)
320 this.options.wechaty.emitError(
321 new Error(
322 'reset by server: '
323 + JSON.stringify(ioEvent.payload),
324 ),
325 )
326 await this.options.wechaty.reset()
327 break
328
329 case 'shutdown':
330 log.info('Io', 'on(shutdown): %s', ioEvent.payload)
331 process.exit(0)
332 // eslint-disable-next-line
333 break
334
335 case 'update':
336 log.verbose('Io', 'on(update): %s', ioEvent.payload)
337 {
338 const wechaty = this.options.wechaty
339 if (wechaty.isLoggedIn) {
340 const loginEvent: IoEvent = {
341 name : 'login',
342 payload : await wechaty.puppet.contactPayload(
343 wechaty.puppet.currentUserId,
344 ),
345 }
346 await this.send(loginEvent)
347 }
348
349 if (this.scanPayload) {
350 const scanEvent: IoEventScan = {
351 name: 'scan',
352 payload: this.scanPayload,
353 }
354 await this.send(scanEvent)
355 }
356 }
357
358 break
359
360 case 'sys':
361 // do nothing
362 break
363
364 case 'logout':
365 log.info('Io', 'on(logout): %s', ioEvent.payload)
366 await this.options.wechaty.logout()
367 break
368
369 case 'jsonrpc':
370 log.info('Io', 'on(jsonrpc): %s', ioEvent.payload)
371
372 try {
373 const request = (ioEvent as IoEventJsonRpc).payload
374 if (!isJsonRpcRequest(request)) {
375 log.warn('Io', 'on(jsonrpc) payload is not a jsonrpc request: %s', JSON.stringify(request))
376 return
377 }
378
379 if (!this.jsonRpc) {
380 throw new Error('jsonRpc not initialized!')
381 }
382
383 const response = await this.jsonRpc.exec(request)
384 if (!response) {
385 log.warn('Io', 'on(jsonrpc) response is undefined.')
386 return
387 }
388 const payload = jsonRpcPeer.parse(response) as jsonRpcPeer.JsonRpcPayloadResponse
389
390 const jsonrpcEvent: IoEventJsonRpc = {
391 name: 'jsonrpc',
392 payload,
393 }
394
395 log.verbose('Io', 'on(jsonrpc) send(%s)', response)
396 await this.send(jsonrpcEvent)
397
398 } catch (e) {
399 log.error('Io', 'on(jsonrpc): %s', e)
400 }
401
402 break
403
404 default:
405 log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
406 break
407 }
408 }
409
410 // FIXME: it seems the parameter `e` might be `undefined`.
411 // @types/ws might has bug for `ws.on('error', e => this.wsOnError(e))`
412 private wsOnError (e?: Error) {
413 log.warn('Io', 'wsOnError() error event[%s]', e && e.message)
414 if (!e) {
415 return
416 }
417
418 if (!this.ws) {
419 log.error('Io', 'wsOnError() ws.on(error) this.ws is `undefined`', e.message)
420 return
421 }
422
423 if (this.ws.readyState === WebSocket.CONNECTING) {
424 log.error('Io', 'wsOnError() ws.on(error) ws.readyState is CONNECTING: %s', e.message)
425 return
426 }
427
428 if (this.ws.readyState === WebSocket.CLOSING) {
429 log.error('Io', 'wsOnError() ws.on(error) ws.readyState is CLOSING: %s', e.message)
430 return
431 }
432
433 this.options.wechaty.emitError(e)
434
435 // when `error`, there must have already a `close` event
436 // we should not call this.reconnect() again
437 //
438 // this.close()
439 // this.reconnect()
440 }
441
442 private wsOnClose (
443 ws : WebSocket,
444 code : number,
445 message : string,
446 ): void {
447 if (this.state.active()) {
448 log.warn('Io', 'wsOnClose() close event[%d: %s]', code, message)
449 ws.close()
450 this.reconnect()
451 }
452 }
453
454 private reconnect () {
455 log.verbose('Io', 'reconnect()')
456
457 if (this.state.inactive()) {
458 log.warn('Io', 'reconnect() canceled because state.target() === offline')
459 return
460 }
461
462 if (this.connected()) {
463 log.warn('Io', 'reconnect() on a already connected io')
464 return
465 }
466 if (this.reconnectTimer) {
467 log.warn('Io', 'reconnect() on a already re-connecting io')
468 return
469 }
470
471 if (!this.reconnectTimeout) {
472 this.reconnectTimeout = 1
473 } else if (this.reconnectTimeout < 10 * 1000) {
474 this.reconnectTimeout *= 3
475 }
476
477 log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
478 this.reconnectTimer = setTimeout(() => {
479 this.reconnectTimer = undefined
480 this.initWebSocket().catch(console.error)
481 }, this.reconnectTimeout)// as any as NodeJS.Timer
482 }
483
484 private async send (ioEvent?: IoEvent): Promise<void> {
485 if (!this.ws) {
486 throw new Error('no ws')
487 }
488
489 const ws = this.ws
490
491 if (ioEvent) {
492 log.silly('Io', 'send(%s)', JSON.stringify(ioEvent))
493 this.eventBuffer.push(ioEvent)
494 } else { log.silly('Io', 'send()') }
495
496 if (!this.connected()) {
497 log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
498 return
499 }
500
501 const list: Array<Promise<any>> = []
502 while (this.eventBuffer.length) {
503 const data = JSON.stringify(
504 this.eventBuffer.shift(),
505 )
506 const p = new Promise<void>((resolve, reject) => ws.send(
507 data,
508 (err: undefined | Error) => {
509 if (err) {
510 reject(err)
511 } else {
512 resolve()
513 }
514 },
515 ))
516 list.push(p)
517 }
518
519 try {
520 await Promise.all(list)
521 } catch (e) {
522 log.error('Io', 'send() exception: %s', (e as Error).stack)
523 throw e
524 }
525 }
526
527 public async stop (): Promise<void> {
528 log.verbose('Io', 'stop()')
529
530 if (!this.ws) {
531 throw new Error('no ws')
532 }
533
534 this.state.inactive('pending')
535
536 // try to send IoEvents in buffer
537 await this.send()
538 this.eventBuffer = []
539
540 if (this.reconnectTimer) {
541 clearTimeout(this.reconnectTimer)
542 this.reconnectTimer = undefined
543 }
544
545 if (this.lifeTimer) {
546 clearInterval(this.lifeTimer)
547 this.lifeTimer = undefined
548 }
549
550 this.ws.close()
551 await new Promise<void>(resolve => {
552 if (this.ws) {
553 this.ws.once('close', resolve)
554 } else {
555 resolve()
556 }
557 })
558 this.ws = undefined
559
560 this.state.inactive(true)
561 }
562
563 /**
564 *
565 * Prepare to be overwritten by server setting
566 *
567 */
568 private async ioMessage (m: MessageInterface): Promise<void> {
569 log.silly('Io', 'ioMessage() is a nop function before be overwritten from cloud')
570 if (typeof this.onMessage === 'function') {
571 await this.onMessage(m)
572 }
573 }
574
575 protected async syncMessage (m: MessageInterface): Promise<void> {
576 log.silly('Io', 'syncMessage(%s)', m)
577
578 const messageEvent: IoEvent = {
579 name : 'message',
580 payload : await m.wechaty.puppet.messagePayload(m.id),
581 }
582 await this.send(messageEvent)
583 }
584
585}
586
\No newline at end of file