1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 | import WebSocket from 'ws'
|
21 |
|
22 | import {
|
23 | Message,
|
24 | } from './user/mod'
|
25 |
|
26 | import {
|
27 | StateSwitch,
|
28 |
|
29 | EventScanPayload,
|
30 | } from 'wechaty-puppet'
|
31 |
|
32 | import Peer, {
|
33 | JsonRpcPayload,
|
34 | JsonRpcPayloadResponse,
|
35 | parse,
|
36 | } from 'json-rpc-peer'
|
37 |
|
38 | import {
|
39 | config,
|
40 | log,
|
41 | } from './config'
|
42 | import {
|
43 | AnyFunction,
|
44 | } from './types'
|
45 | import {
|
46 | Wechaty,
|
47 | } from './wechaty'
|
48 |
|
49 | import {
|
50 | getPeer,
|
51 | isJsonRpcRequest,
|
52 | } from './io-peer/io-peer'
|
53 |
|
54 | export interface IoOptions {
|
55 | wechaty : Wechaty,
|
56 | token : string,
|
57 | apihost? : string,
|
58 | protocol? : string,
|
59 | servicePort? : number,
|
60 | }
|
61 |
|
62 | export const IO_EVENT_DICT = {
|
63 | botie : 'tbw',
|
64 | error : 'tbw',
|
65 | heartbeat : 'tbw',
|
66 | jsonrpc : 'JSON RPC',
|
67 | login : 'tbw',
|
68 | logout : 'tbw',
|
69 | message : 'tbw',
|
70 | raw : 'tbw',
|
71 | reset : 'tbw',
|
72 | scan : 'tbw',
|
73 | shutdown : 'tbw',
|
74 | sys : 'tbw',
|
75 | update : 'tbw',
|
76 | }
|
77 |
|
78 | type IoEventName = keyof typeof IO_EVENT_DICT
|
79 |
|
80 | interface IoEventScan {
|
81 | name : 'scan',
|
82 | payload : EventScanPayload,
|
83 | }
|
84 |
|
85 | interface IoEventJsonRpc {
|
86 | name: 'jsonrpc',
|
87 | payload: JsonRpcPayload,
|
88 | }
|
89 |
|
90 | interface IoEventAny {
|
91 | name: IoEventName,
|
92 | payload: any,
|
93 | }
|
94 |
|
95 | type IoEvent = IoEventScan | IoEventJsonRpc | IoEventAny
|
96 |
|
97 |
|
98 |
|
99 |
|
100 |
|
101 | const AsyncFunction = Object.getPrototypeOf(async () => null).constructor
|
102 |
|
103 |
|
104 |
|
105 |
|
106 |
|
107 |
|
108 |
|
109 |
|
110 |
|
111 | export class Io {
|
112 |
|
113 | private readonly id : string
|
114 | private readonly protocol : string
|
115 | private eventBuffer : IoEvent[] = []
|
116 | private ws : undefined | WebSocket
|
117 |
|
118 | private readonly state = new StateSwitch('Io', { log })
|
119 |
|
120 | private reconnectTimer? : NodeJS.Timer
|
121 | private reconnectTimeout? : number
|
122 |
|
123 | private lifeTimer? : NodeJS.Timer
|
124 |
|
125 | private onMessage: undefined | AnyFunction
|
126 |
|
127 | private scanPayload?: EventScanPayload
|
128 |
|
129 | protected jsonRpc?: Peer
|
130 |
|
131 | constructor (
|
132 | private options: IoOptions,
|
133 | ) {
|
134 | options.apihost = options.apihost || config.apihost
|
135 | options.protocol = options.protocol || config.default.DEFAULT_PROTOCOL
|
136 |
|
137 | this.id = options.wechaty.id
|
138 |
|
139 | this.protocol = options.protocol + '|' + options.wechaty.id + '|' + config.serviceIp + '|' + options.servicePort
|
140 | log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], cuid[%s]',
|
141 | options.apihost,
|
142 | options.token,
|
143 | options.protocol,
|
144 | this.id,
|
145 | )
|
146 |
|
147 | if (options.servicePort) {
|
148 | this.jsonRpc = getPeer({
|
149 | serviceGrpcPort: this.options.servicePort!,
|
150 | })
|
151 | }
|
152 |
|
153 | }
|
154 |
|
155 | public toString () {
|
156 | return `Io<${this.options.token}>`
|
157 | }
|
158 |
|
159 | private connected () {
|
160 | return this.ws && this.ws.readyState === WebSocket.OPEN
|
161 | }
|
162 |
|
163 | public async start (): Promise<void> {
|
164 | log.verbose('Io', 'start()')
|
165 |
|
166 | if (this.lifeTimer) {
|
167 | throw new Error('lifeTimer exist')
|
168 | }
|
169 |
|
170 | this.state.on('pending')
|
171 |
|
172 | try {
|
173 | this.initEventHook()
|
174 |
|
175 | this.ws = await this.initWebSocket()
|
176 |
|
177 | this.options.wechaty.on('login', () => { this.scanPayload = undefined })
|
178 | this.options.wechaty.on('scan', (qrcode, status) => {
|
179 | this.scanPayload = {
|
180 | ...this.scanPayload,
|
181 | qrcode,
|
182 | status,
|
183 | }
|
184 | })
|
185 |
|
186 | this.lifeTimer = setInterval(() => {
|
187 | if (this.ws && this.connected()) {
|
188 | log.silly('Io', 'start() setInterval() ws.ping()')
|
189 |
|
190 | this.ws.ping()
|
191 | }
|
192 | }, 1000 * 10)
|
193 |
|
194 | this.state.on(true)
|
195 |
|
196 | } catch (e) {
|
197 | log.warn('Io', 'start() exception: %s', e.message)
|
198 | this.state.off(true)
|
199 | throw e
|
200 | }
|
201 | }
|
202 |
|
203 | private initEventHook () {
|
204 | log.verbose('Io', 'initEventHook()')
|
205 | const wechaty = this.options.wechaty
|
206 |
|
207 | wechaty.on('error', error => this.send({ name: 'error', payload: error }))
|
208 | wechaty.on('heartbeat', data => this.send({ name: 'heartbeat', payload: { cuid: this.id, data } }))
|
209 | wechaty.on('login', user => this.send({ name: 'login', payload: user.payload }))
|
210 | wechaty.on('logout', user => this.send({ name: 'logout', payload: user.payload }))
|
211 | wechaty.on('message', message => this.ioMessage(message))
|
212 |
|
213 |
|
214 |
|
215 | wechaty.on('scan', (qrcode, status) => this.send({ name: 'scan', payload: { qrcode, status } } as IoEventScan))
|
216 | }
|
217 |
|
218 | private async initWebSocket (): Promise<WebSocket> {
|
219 | log.verbose('Io', 'initWebSocket()')
|
220 |
|
221 |
|
222 |
|
223 | const auth = 'Token ' + this.options.token
|
224 | const headers = { Authorization: auth }
|
225 |
|
226 | if (!this.options.apihost) {
|
227 | throw new Error('no apihost')
|
228 | }
|
229 | let endpoint = 'wss://' + this.options.apihost + '/v0/websocket'
|
230 |
|
231 |
|
232 |
|
233 | if (!/api\.chatie\.io/.test(this.options.apihost)) {
|
234 | endpoint = 'ws://' + this.options.apihost + '/v0/websocket'
|
235 | }
|
236 |
|
237 | const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })
|
238 |
|
239 | ws.on('open', () => this.wsOnOpen(ws))
|
240 | ws.on('message', data => this.wsOnMessage(data))
|
241 | ws.on('error', e => this.wsOnError(e))
|
242 | ws.on('close', (code, reason) => this.wsOnClose(ws, code, reason))
|
243 |
|
244 | await new Promise((resolve, reject) => {
|
245 | ws.once('open', resolve)
|
246 | ws.once('error', reject)
|
247 | ws.once('close', reject)
|
248 | })
|
249 |
|
250 | return ws
|
251 | }
|
252 |
|
253 | private async wsOnOpen (ws: WebSocket): Promise<void> {
|
254 | if (this.protocol !== ws.protocol) {
|
255 | log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
|
256 |
|
257 | }
|
258 | log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
|
259 |
|
260 |
|
261 |
|
262 |
|
263 |
|
264 |
|
265 | this.reconnectTimeout = undefined
|
266 |
|
267 | const name = 'sys'
|
268 | const payload = 'Wechaty version ' + this.options.wechaty.version() + ` with CUID: ${this.id}`
|
269 |
|
270 | const initEvent: IoEvent = {
|
271 | name,
|
272 | payload,
|
273 | }
|
274 | await this.send(initEvent)
|
275 | }
|
276 |
|
277 | private async wsOnMessage (data: WebSocket.Data) {
|
278 | log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
|
279 |
|
280 |
|
281 |
|
282 | if (typeof data !== 'string') {
|
283 | throw new Error('data should be string...')
|
284 | }
|
285 |
|
286 | const ioEvent: IoEvent = {
|
287 | name : 'raw',
|
288 | payload : data,
|
289 | }
|
290 |
|
291 | try {
|
292 | const obj = JSON.parse(data)
|
293 | ioEvent.name = obj.name
|
294 | ioEvent.payload = obj.payload
|
295 | } catch (e) {
|
296 | log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
|
297 | }
|
298 |
|
299 | switch (ioEvent.name) {
|
300 | case 'botie':
|
301 | {
|
302 | const payload = ioEvent.payload
|
303 |
|
304 | const args = payload.args
|
305 | const source = payload.source
|
306 |
|
307 | try {
|
308 | if (args[0] === 'message' && args.length === 1) {
|
309 | const fn = new AsyncFunction(...args, source)
|
310 | this.onMessage = fn
|
311 | } else {
|
312 | log.warn('Io', 'server pushed function is invalid. args: %s', JSON.stringify(args))
|
313 | }
|
314 | } catch (e) {
|
315 | log.warn('Io', 'server pushed function exception: %s', e)
|
316 | this.options.wechaty.emit('error', e)
|
317 | }
|
318 | }
|
319 | break
|
320 |
|
321 | case 'reset':
|
322 | log.verbose('Io', 'on(reset): %s', ioEvent.payload)
|
323 | await this.options.wechaty.reset(ioEvent.payload)
|
324 | break
|
325 |
|
326 | case 'shutdown':
|
327 | log.info('Io', 'on(shutdown): %s', ioEvent.payload)
|
328 | process.exit(0)
|
329 |
|
330 | break
|
331 |
|
332 | case 'update':
|
333 | log.verbose('Io', 'on(update): %s', ioEvent.payload)
|
334 | {
|
335 | const wechaty = this.options.wechaty
|
336 | if (wechaty.logonoff()) {
|
337 | const loginEvent: IoEvent = {
|
338 | name : 'login',
|
339 | payload : (wechaty.userSelf() as any).payload,
|
340 | }
|
341 | await this.send(loginEvent)
|
342 | }
|
343 |
|
344 | if (this.scanPayload) {
|
345 | const scanEvent: IoEventScan = {
|
346 | name: 'scan',
|
347 | payload: this.scanPayload,
|
348 | }
|
349 | await this.send(scanEvent)
|
350 | }
|
351 | }
|
352 |
|
353 | break
|
354 |
|
355 | case 'sys':
|
356 |
|
357 | break
|
358 |
|
359 | case 'logout':
|
360 | log.info('Io', 'on(logout): %s', ioEvent.payload)
|
361 | await this.options.wechaty.logout()
|
362 | break
|
363 |
|
364 | case 'jsonrpc':
|
365 | log.info('Io', 'on(jsonrpc): %s', ioEvent.payload)
|
366 |
|
367 | try {
|
368 | const request = (ioEvent as IoEventJsonRpc).payload
|
369 | if (!isJsonRpcRequest(request)) {
|
370 | log.warn('Io', 'on(jsonrpc) payload is not a jsonrpc request: %s', JSON.stringify(request))
|
371 | return
|
372 | }
|
373 |
|
374 | if (!this.jsonRpc) {
|
375 | throw new Error('jsonRpc not initialized!')
|
376 | }
|
377 |
|
378 | const response = await this.jsonRpc.exec(request)
|
379 | if (!response) {
|
380 | log.warn('Io', 'on(jsonrpc) response is undefined.')
|
381 | return
|
382 | }
|
383 | const payload = parse(response) as JsonRpcPayloadResponse
|
384 |
|
385 | const jsonrpcEvent: IoEventJsonRpc = {
|
386 | name: 'jsonrpc',
|
387 | payload,
|
388 | }
|
389 |
|
390 | log.verbose('Io', 'on(jsonrpc) send(%s)', response)
|
391 | await this.send(jsonrpcEvent)
|
392 |
|
393 | } catch (e) {
|
394 | log.error('Io', 'on(jsonrpc): %s', e)
|
395 | }
|
396 |
|
397 | break
|
398 |
|
399 | default:
|
400 | log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
|
401 | break
|
402 | }
|
403 | }
|
404 |
|
405 |
|
406 |
|
407 | private wsOnError (e?: Error) {
|
408 | log.warn('Io', 'initWebSocket() error event[%s]', e && e.message)
|
409 | if (!e) {
|
410 | return
|
411 | }
|
412 | this.options.wechaty.emit('error', e)
|
413 |
|
414 |
|
415 |
|
416 |
|
417 |
|
418 |
|
419 | }
|
420 |
|
421 | private wsOnClose (
|
422 | ws : WebSocket,
|
423 | code : number,
|
424 | message : string,
|
425 | ): void {
|
426 | if (this.state.on()) {
|
427 | log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
|
428 | ws.close()
|
429 | this.reconnect()
|
430 | }
|
431 | }
|
432 |
|
433 | private reconnect () {
|
434 | log.verbose('Io', 'reconnect()')
|
435 |
|
436 | if (this.state.off()) {
|
437 | log.warn('Io', 'reconnect() canceled because state.target() === offline')
|
438 | return
|
439 | }
|
440 |
|
441 | if (this.connected()) {
|
442 | log.warn('Io', 'reconnect() on a already connected io')
|
443 | return
|
444 | }
|
445 | if (this.reconnectTimer) {
|
446 | log.warn('Io', 'reconnect() on a already re-connecting io')
|
447 | return
|
448 | }
|
449 |
|
450 | if (!this.reconnectTimeout) {
|
451 | this.reconnectTimeout = 1
|
452 | } else if (this.reconnectTimeout < 10 * 1000) {
|
453 | this.reconnectTimeout *= 3
|
454 | }
|
455 |
|
456 | log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
|
457 | this.reconnectTimer = setTimeout(async () => {
|
458 | this.reconnectTimer = undefined
|
459 | await this.initWebSocket()
|
460 | }, this.reconnectTimeout)
|
461 | }
|
462 |
|
463 | private async send (ioEvent?: IoEvent): Promise<void> {
|
464 | if (!this.ws) {
|
465 | throw new Error('no ws')
|
466 | }
|
467 |
|
468 | const ws = this.ws
|
469 |
|
470 | if (ioEvent) {
|
471 | log.silly('Io', 'send(%s)', JSON.stringify(ioEvent))
|
472 | this.eventBuffer.push(ioEvent)
|
473 | } else { log.silly('Io', 'send()') }
|
474 |
|
475 | if (!this.connected()) {
|
476 | log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
|
477 | return
|
478 | }
|
479 |
|
480 | const list: Array<Promise<any>> = []
|
481 | while (this.eventBuffer.length) {
|
482 | const data = JSON.stringify(
|
483 | this.eventBuffer.shift(),
|
484 | )
|
485 | const p = new Promise<void>((resolve, reject) => ws.send(
|
486 | data,
|
487 | (err: undefined | Error) => {
|
488 | if (err) {
|
489 | reject(err)
|
490 | } else {
|
491 | resolve()
|
492 | }
|
493 | },
|
494 | ))
|
495 | list.push(p)
|
496 | }
|
497 |
|
498 | try {
|
499 | await Promise.all(list)
|
500 | } catch (e) {
|
501 | log.error('Io', 'send() exception: %s', e.stack)
|
502 | throw e
|
503 | }
|
504 | }
|
505 |
|
506 | public async stop (): Promise<void> {
|
507 | log.verbose('Io', 'stop()')
|
508 |
|
509 | if (!this.ws) {
|
510 | throw new Error('no ws')
|
511 | }
|
512 |
|
513 | this.state.off('pending')
|
514 |
|
515 | // try to send IoEvents in buffer
|
516 | await this.send()
|
517 | this.eventBuffer = []
|
518 |
|
519 | if (this.reconnectTimer) {
|
520 | clearTimeout(this.reconnectTimer)
|
521 | this.reconnectTimer = undefined
|
522 | }
|
523 |
|
524 | if (this.lifeTimer) {
|
525 | clearInterval(this.lifeTimer)
|
526 | this.lifeTimer = undefined
|
527 | }
|
528 |
|
529 | this.ws.close()
|
530 | await new Promise<void>(resolve => {
|
531 | if (this.ws) {
|
532 | this.ws.once('close', resolve)
|
533 | } else {
|
534 | resolve()
|
535 | }
|
536 | })
|
537 | this.ws = undefined
|
538 |
|
539 | this.state.off(true)
|
540 | }
|
541 |
|
542 | /**
|
543 | *
|
544 | * Prepare to be overwritten by server setting
|
545 | *
|
546 | */
|
547 | private async ioMessage (m: Message): Promise<void> {
|
548 | log.silly('Io', 'ioMessage() is a nop function before be overwritten from cloud')
|
549 | if (typeof this.onMessage === 'function') {
|
550 | await this.onMessage(m)
|
551 | }
|
552 | }
|
553 |
|
554 | protected async syncMessage (m: Message): Promise<void> {
|
555 | log.silly('Io', 'syncMessage(%s)', m)
|
556 |
|
557 | const messageEvent: IoEvent = {
|
558 | name : 'message',
|
559 | payload : (m as any).payload,
|
560 | }
|
561 | await this.send(messageEvent)
|
562 | }
|
563 |
|
564 | }
|
565 |
|
\ | No newline at end of file |