1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 | import WebSocket from 'ws'
|
21 |
|
22 | import type * as PUPPET from 'wechaty-puppet'
|
23 | import { StateSwitch } from 'state-switch'
|
24 |
|
25 | import * as jsonRpcPeer from 'json-rpc-peer'
|
26 |
|
27 | import type {
|
28 | MessageInterface,
|
29 | } from './user-modules/mod.js'
|
30 | import type { WechatyInterface } from './wechaty/mod.js'
|
31 |
|
32 | import {
|
33 | log,
|
34 | config,
|
35 | } from './config.js'
|
36 |
|
37 | import {
|
38 | getPeer,
|
39 | isJsonRpcRequest,
|
40 | } from './io-peer/io-peer.js'
|
41 |
|
42 | export interface IoOptions {
|
43 | wechaty : WechatyInterface,
|
44 | token : string,
|
45 | apihost? : string,
|
46 | protocol? : string,
|
47 | servicePort? : number,
|
48 | }
|
49 |
|
50 | export const IO_EVENT_DICT = {
|
51 | botie : 'tbw',
|
52 | error : 'tbw',
|
53 | heartbeat : 'tbw',
|
54 | jsonrpc : 'JSON RPC',
|
55 | login : 'tbw',
|
56 | logout : 'tbw',
|
57 | message : 'tbw',
|
58 | raw : 'tbw',
|
59 | reset : 'tbw',
|
60 | scan : 'tbw',
|
61 | shutdown : 'tbw',
|
62 | sys : 'tbw',
|
63 | update : 'tbw',
|
64 | }
|
65 |
|
66 | type IoEventName = keyof typeof IO_EVENT_DICT
|
67 |
|
68 | interface IoEventScan {
|
69 | name : 'scan',
|
70 | payload : PUPPET.payloads.EventScan,
|
71 | }
|
72 |
|
73 | interface IoEventJsonRpc {
|
74 | name : 'jsonrpc',
|
75 | payload : jsonRpcPeer.JsonRpcPayload,
|
76 | }
|
77 |
|
78 | interface IoEventAny {
|
79 | name: IoEventName,
|
80 | payload: any,
|
81 | }
|
82 |
|
83 | type IoEvent = IoEventScan | IoEventJsonRpc | IoEventAny
|
84 |
|
85 |
|
86 |
|
87 |
|
88 |
|
89 | const AsyncFunction = Object.getPrototypeOf(async () => null).constructor
|
90 |
|
91 |
|
92 |
|
93 |
|
94 |
|
95 |
|
96 |
|
97 |
|
98 |
|
99 | export class Io {
|
100 |
|
101 | protected readonly id : string
|
102 | protected readonly protocol : string
|
103 | protected eventBuffer : IoEvent[] = []
|
104 | protected ws : undefined | WebSocket
|
105 |
|
106 | protected readonly state = new StateSwitch('Io', { log })
|
107 |
|
108 | protected reconnectTimer? : ReturnType<typeof setTimeout>
|
109 | protected reconnectTimeout? : number
|
110 |
|
111 | protected lifeTimer? : ReturnType<typeof setTimeout>
|
112 |
|
113 | protected onMessage: undefined | Function
|
114 |
|
115 | protected scanPayload?: PUPPET.payloads.EventScan
|
116 |
|
117 | protected jsonRpc?: jsonRpcPeer.Peer
|
118 |
|
119 | constructor (
|
120 | protected options: IoOptions,
|
121 | ) {
|
122 | options.apihost = options.apihost || config.apihost
|
123 | options.protocol = options.protocol || config.default.DEFAULT_PROTOCOL
|
124 |
|
125 | this.id = options.wechaty.id
|
126 |
|
127 | this.protocol = options.protocol + '|' + options.wechaty.id + '|' + config.serviceIp + '|' + options.servicePort
|
128 | log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], cuid[%s]',
|
129 | options.apihost,
|
130 | options.token,
|
131 | options.protocol,
|
132 | this.id,
|
133 | )
|
134 |
|
135 | if (options.servicePort) {
|
136 | this.jsonRpc = getPeer({
|
137 | serviceGrpcPort: this.options.servicePort!,
|
138 | })
|
139 | }
|
140 |
|
141 | }
|
142 |
|
143 | public toString () {
|
144 | return `Io<${this.options.token}>`
|
145 | }
|
146 |
|
147 | private connected () {
|
148 | return this.ws && this.ws.readyState === WebSocket.OPEN
|
149 | }
|
150 |
|
151 | public async start (): Promise<void> {
|
152 | log.verbose('Io', 'start()')
|
153 |
|
154 | if (this.lifeTimer) {
|
155 | throw new Error('lifeTimer exist')
|
156 | }
|
157 |
|
158 | this.state.active('pending')
|
159 |
|
160 | try {
|
161 | this.initEventHook()
|
162 |
|
163 | this.ws = await this.initWebSocket()
|
164 |
|
165 | this.options.wechaty.on('login', () => { this.scanPayload = undefined })
|
166 | this.options.wechaty.on('scan', (qrcode, status) => {
|
167 | this.scanPayload = {
|
168 | ...this.scanPayload,
|
169 | qrcode,
|
170 | status,
|
171 | }
|
172 | })
|
173 |
|
174 | this.lifeTimer = setInterval(() => {
|
175 | if (this.ws && this.connected()) {
|
176 | log.silly('Io', 'start() setInterval() ws.ping()')
|
177 |
|
178 | this.ws.ping()
|
179 | }
|
180 | }, 1000 * 10)
|
181 |
|
182 | this.state.active(true)
|
183 |
|
184 | } catch (e) {
|
185 | log.warn('Io', 'start() exception: %s', (e as Error).message)
|
186 | this.state.inactive(true)
|
187 | throw e
|
188 | }
|
189 | }
|
190 |
|
191 | private initEventHook () {
|
192 | log.verbose('Io', 'initEventHook()')
|
193 | const wechaty = this.options.wechaty
|
194 |
|
195 | wechaty.on('error', error => this.send({ name: 'error', payload: error }))
|
196 | wechaty.on('heartbeat', data => this.send({ name: 'heartbeat', payload: { cuid: this.id, data } }))
|
197 | wechaty.on('login', user => this.send({ name: 'login', payload: wechaty.puppet.contactPayload(user.id) }))
|
198 | wechaty.on('logout', user => this.send({ name: 'logout', payload: wechaty.puppet.contactPayload(user.id) }))
|
199 | wechaty.on('message', message => this.ioMessage(message))
|
200 |
|
201 |
|
202 |
|
203 | wechaty.on('scan', (qrcode, status) => this.send({ name: 'scan', payload: { qrcode, status } } as IoEventScan))
|
204 | }
|
205 |
|
206 | private async initWebSocket (): Promise<WebSocket> {
|
207 | log.verbose('Io', 'initWebSocket()')
|
208 |
|
209 |
|
210 |
|
211 | const auth = 'Token ' + this.options.token
|
212 | const headers = { Authorization: auth }
|
213 |
|
214 | if (!this.options.apihost) {
|
215 | throw new Error('no apihost')
|
216 | }
|
217 | let endpoint = 'wss://' + this.options.apihost + '/v0/websocket'
|
218 |
|
219 |
|
220 |
|
221 | if (!/api\.chatie\.io/.test(this.options.apihost)) {
|
222 | endpoint = 'ws://' + this.options.apihost + '/v0/websocket'
|
223 | }
|
224 |
|
225 | const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })
|
226 |
|
227 | ws.on('open', () => this.wsOnOpen(ws))
|
228 | ws.on('message', data => this.wsOnMessage(data))
|
229 | ws.on('error', e => this.wsOnError(e))
|
230 | ws.on('close', (code, reason) => this.wsOnClose(ws, code, reason.toString()))
|
231 |
|
232 | await new Promise((resolve, reject) => {
|
233 | ws.once('open', resolve)
|
234 | ws.once('error', reject)
|
235 | ws.once('close', reject)
|
236 | })
|
237 |
|
238 | return ws
|
239 | }
|
240 |
|
241 | private wsOnOpen (ws: WebSocket): void {
|
242 | if (this.protocol !== ws.protocol) {
|
243 | log.error('Io', 'wsOnOpen() require protocol[%s] failed', this.protocol)
|
244 |
|
245 | }
|
246 | log.verbose('Io', 'wsOnOpen() connected with protocol [%s]', ws.protocol)
|
247 |
|
248 |
|
249 |
|
250 |
|
251 |
|
252 |
|
253 | this.reconnectTimeout = undefined
|
254 |
|
255 | const name = 'sys'
|
256 | const payload = 'Wechaty version ' + this.options.wechaty.version() + ` with CUID: ${this.id}`
|
257 |
|
258 | const initEvent: IoEvent = {
|
259 | name,
|
260 | payload,
|
261 | }
|
262 | this.send(initEvent).catch(console.error)
|
263 | }
|
264 |
|
265 | private wsOnMessage (data: WebSocket.Data): void {
|
266 | log.silly('Io', 'wsOnMessage() ws.on(message): %s', data)
|
267 | this.wsOnMessageAsync(data).catch(console.error)
|
268 | }
|
269 |
|
270 | private async wsOnMessageAsync (data: WebSocket.Data): Promise<void> {
|
271 | log.silly('Io', 'wsOnMessageAsync() ws.on(message): %s', data)
|
272 |
|
273 |
|
274 |
|
275 | let payload: string
|
276 |
|
277 | if (Array.isArray(data)) {
|
278 | payload = data[0]?.toString() ?? 'NODATA'
|
279 | } else {
|
280 | payload = data.toString()
|
281 | }
|
282 |
|
283 | const ioEvent: IoEvent = {
|
284 | name : 'raw',
|
285 | payload : data,
|
286 | }
|
287 |
|
288 | try {
|
289 | const obj = JSON.parse(payload)
|
290 | ioEvent.name = obj.name
|
291 | ioEvent.payload = obj.payload
|
292 | } catch (e) {
|
293 | log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
|
294 | }
|
295 |
|
296 | switch (ioEvent.name) {
|
297 | case 'botie':
|
298 | {
|
299 | const payload = ioEvent.payload
|
300 |
|
301 | const args = payload.args
|
302 | const source = payload.source
|
303 |
|
304 | try {
|
305 | if (args[0] === 'message' && args.length === 1) {
|
306 | const fn = new AsyncFunction(...args, source)
|
307 | this.onMessage = fn
|
308 | } else {
|
309 | log.warn('Io', 'server pushed function is invalid. args: %s', JSON.stringify(args))
|
310 | }
|
311 | } catch (e) {
|
312 | log.warn('Io', 'server pushed function exception: %s', e)
|
313 | this.options.wechaty.emitError(e)
|
314 | }
|
315 | }
|
316 | break
|
317 |
|
318 | case 'reset':
|
319 | log.verbose('Io', 'on(reset): %s', ioEvent.payload)
|
320 | this.options.wechaty.emitError(
|
321 | new Error(
|
322 | 'reset by server: '
|
323 | + JSON.stringify(ioEvent.payload),
|
324 | ),
|
325 | )
|
326 | await this.options.wechaty.reset()
|
327 | break
|
328 |
|
329 | case 'shutdown':
|
330 | log.info('Io', 'on(shutdown): %s', ioEvent.payload)
|
331 | process.exit(0)
|
332 |
|
333 | break
|
334 |
|
335 | case 'update':
|
336 | log.verbose('Io', 'on(update): %s', ioEvent.payload)
|
337 | {
|
338 | const wechaty = this.options.wechaty
|
339 | if (wechaty.isLoggedIn) {
|
340 | const loginEvent: IoEvent = {
|
341 | name : 'login',
|
342 | payload : await wechaty.puppet.contactPayload(
|
343 | wechaty.puppet.currentUserId,
|
344 | ),
|
345 | }
|
346 | await this.send(loginEvent)
|
347 | }
|
348 |
|
349 | if (this.scanPayload) {
|
350 | const scanEvent: IoEventScan = {
|
351 | name: 'scan',
|
352 | payload: this.scanPayload,
|
353 | }
|
354 | await this.send(scanEvent)
|
355 | }
|
356 | }
|
357 |
|
358 | break
|
359 |
|
360 | case 'sys':
|
361 |
|
362 | break
|
363 |
|
364 | case 'logout':
|
365 | log.info('Io', 'on(logout): %s', ioEvent.payload)
|
366 | await this.options.wechaty.logout()
|
367 | break
|
368 |
|
369 | case 'jsonrpc':
|
370 | log.info('Io', 'on(jsonrpc): %s', ioEvent.payload)
|
371 |
|
372 | try {
|
373 | const request = (ioEvent as IoEventJsonRpc).payload
|
374 | if (!isJsonRpcRequest(request)) {
|
375 | log.warn('Io', 'on(jsonrpc) payload is not a jsonrpc request: %s', JSON.stringify(request))
|
376 | return
|
377 | }
|
378 |
|
379 | if (!this.jsonRpc) {
|
380 | throw new Error('jsonRpc not initialized!')
|
381 | }
|
382 |
|
383 | const response = await this.jsonRpc.exec(request)
|
384 | if (!response) {
|
385 | log.warn('Io', 'on(jsonrpc) response is undefined.')
|
386 | return
|
387 | }
|
388 | const payload = jsonRpcPeer.parse(response) as jsonRpcPeer.JsonRpcPayloadResponse
|
389 |
|
390 | const jsonrpcEvent: IoEventJsonRpc = {
|
391 | name: 'jsonrpc',
|
392 | payload,
|
393 | }
|
394 |
|
395 | log.verbose('Io', 'on(jsonrpc) send(%s)', response)
|
396 | await this.send(jsonrpcEvent)
|
397 |
|
398 | } catch (e) {
|
399 | log.error('Io', 'on(jsonrpc): %s', e)
|
400 | }
|
401 |
|
402 | break
|
403 |
|
404 | default:
|
405 | log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
|
406 | break
|
407 | }
|
408 | }
|
409 |
|
410 |
|
411 |
|
412 | private wsOnError (e?: Error) {
|
413 | log.warn('Io', 'wsOnError() error event[%s]', e && e.message)
|
414 | if (!e) {
|
415 | return
|
416 | }
|
417 |
|
418 | if (!this.ws) {
|
419 | log.error('Io', 'wsOnError() ws.on(error) this.ws is `undefined`', e.message)
|
420 | return
|
421 | }
|
422 |
|
423 | if (this.ws.readyState === WebSocket.CONNECTING) {
|
424 | log.error('Io', 'wsOnError() ws.on(error) ws.readyState is CONNECTING: %s', e.message)
|
425 | return
|
426 | }
|
427 |
|
428 | if (this.ws.readyState === WebSocket.CLOSING) {
|
429 | log.error('Io', 'wsOnError() ws.on(error) ws.readyState is CLOSING: %s', e.message)
|
430 | return
|
431 | }
|
432 |
|
433 | this.options.wechaty.emitError(e)
|
434 |
|
435 |
|
436 |
|
437 |
|
438 |
|
439 |
|
440 | }
|
441 |
|
442 | private wsOnClose (
|
443 | ws : WebSocket,
|
444 | code : number,
|
445 | message : string,
|
446 | ): void {
|
447 | if (this.state.active()) {
|
448 | log.warn('Io', 'wsOnClose() close event[%d: %s]', code, message)
|
449 | ws.close()
|
450 | this.reconnect()
|
451 | }
|
452 | }
|
453 |
|
454 | private reconnect () {
|
455 | log.verbose('Io', 'reconnect()')
|
456 |
|
457 | if (this.state.inactive()) {
|
458 | log.warn('Io', 'reconnect() canceled because state.target() === offline')
|
459 | return
|
460 | }
|
461 |
|
462 | if (this.connected()) {
|
463 | log.warn('Io', 'reconnect() on a already connected io')
|
464 | return
|
465 | }
|
466 | if (this.reconnectTimer) {
|
467 | log.warn('Io', 'reconnect() on a already re-connecting io')
|
468 | return
|
469 | }
|
470 |
|
471 | if (!this.reconnectTimeout) {
|
472 | this.reconnectTimeout = 1
|
473 | } else if (this.reconnectTimeout < 10 * 1000) {
|
474 | this.reconnectTimeout *= 3
|
475 | }
|
476 |
|
477 | log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
|
478 | this.reconnectTimer = setTimeout(() => {
|
479 | this.reconnectTimer = undefined
|
480 | this.initWebSocket().catch(console.error)
|
481 | }, this.reconnectTimeout)
|
482 | }
|
483 |
|
484 | private async send (ioEvent?: IoEvent): Promise<void> {
|
485 | if (!this.ws) {
|
486 | throw new Error('no ws')
|
487 | }
|
488 |
|
489 | const ws = this.ws
|
490 |
|
491 | if (ioEvent) {
|
492 | log.silly('Io', 'send(%s)', JSON.stringify(ioEvent))
|
493 | this.eventBuffer.push(ioEvent)
|
494 | } else { log.silly('Io', 'send()') }
|
495 |
|
496 | if (!this.connected()) {
|
497 | log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
|
498 | return
|
499 | }
|
500 |
|
501 | const list: Array<Promise<any>> = []
|
502 | while (this.eventBuffer.length) {
|
503 | const data = JSON.stringify(
|
504 | this.eventBuffer.shift(),
|
505 | )
|
506 | const p = new Promise<void>((resolve, reject) => ws.send(
|
507 | data,
|
508 | (err: undefined | Error) => {
|
509 | if (err) {
|
510 | reject(err)
|
511 | } else {
|
512 | resolve()
|
513 | }
|
514 | },
|
515 | ))
|
516 | list.push(p)
|
517 | }
|
518 |
|
519 | try {
|
520 | await Promise.all(list)
|
521 | } catch (e) {
|
522 | log.error('Io', 'send() exception: %s', (e as Error).stack)
|
523 | throw e
|
524 | }
|
525 | }
|
526 |
|
527 | public async stop (): Promise<void> {
|
528 | log.verbose('Io', 'stop()')
|
529 |
|
530 | if (!this.ws) {
|
531 | throw new Error('no ws')
|
532 | }
|
533 |
|
534 | this.state.inactive('pending')
|
535 |
|
536 | // try to send IoEvents in buffer
|
537 | await this.send()
|
538 | this.eventBuffer = []
|
539 |
|
540 | if (this.reconnectTimer) {
|
541 | clearTimeout(this.reconnectTimer)
|
542 | this.reconnectTimer = undefined
|
543 | }
|
544 |
|
545 | if (this.lifeTimer) {
|
546 | clearInterval(this.lifeTimer)
|
547 | this.lifeTimer = undefined
|
548 | }
|
549 |
|
550 | this.ws.close()
|
551 | await new Promise<void>(resolve => {
|
552 | if (this.ws) {
|
553 | this.ws.once('close', resolve)
|
554 | } else {
|
555 | resolve()
|
556 | }
|
557 | })
|
558 | this.ws = undefined
|
559 |
|
560 | this.state.inactive(true)
|
561 | }
|
562 |
|
563 | /**
|
564 | *
|
565 | * Prepare to be overwritten by server setting
|
566 | *
|
567 | */
|
568 | private async ioMessage (m: MessageInterface): Promise<void> {
|
569 | log.silly('Io', 'ioMessage() is a nop function before be overwritten from cloud')
|
570 | if (typeof this.onMessage === 'function') {
|
571 | await this.onMessage(m)
|
572 | }
|
573 | }
|
574 |
|
575 | protected async syncMessage (m: MessageInterface): Promise<void> {
|
576 | log.silly('Io', 'syncMessage(%s)', m)
|
577 |
|
578 | const messageEvent: IoEvent = {
|
579 | name : 'message',
|
580 | payload : await m.wechaty.puppet.messagePayload(m.id),
|
581 | }
|
582 | await this.send(messageEvent)
|
583 | }
|
584 |
|
585 | }
|
586 |
|
\ | No newline at end of file |