1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 | import { StateSwitch } from 'state-switch'
|
21 | import WebSocket from 'ws'
|
22 |
|
23 | import {
|
24 | Message,
|
25 | } from './user/mod'
|
26 |
|
27 | import {
|
28 | EventScanPayload,
|
29 | } from 'wechaty-puppet'
|
30 |
|
31 | import Peer, {
|
32 | JsonRpcPayload,
|
33 | JsonRpcPayloadResponse,
|
34 | parse,
|
35 | } from 'json-rpc-peer'
|
36 |
|
37 | import {
|
38 | config,
|
39 | log,
|
40 | } from './config'
|
41 | import {
|
42 | AnyFunction,
|
43 | } from './types'
|
44 | import {
|
45 | Wechaty,
|
46 | } from './wechaty'
|
47 |
|
48 | import {
|
49 | getPeer,
|
50 | isJsonRpcRequest,
|
51 | } from './io-peer/io-peer'
|
52 |
|
53 | export interface IoOptions {
|
54 | wechaty: Wechaty,
|
55 | token: string,
|
56 | apihost?: string,
|
57 | protocol?: string,
|
58 | hostiePort?:number,
|
59 | }
|
60 |
|
61 | export const IO_EVENT_DICT = {
|
62 | botie : 'tbw',
|
63 | error : 'tbw',
|
64 | heartbeat : 'tbw',
|
65 | jsonrpc : 'JSON RPC',
|
66 | login : 'tbw',
|
67 | logout : 'tbw',
|
68 | message : 'tbw',
|
69 | raw : 'tbw',
|
70 | reset : 'tbw',
|
71 | scan : 'tbw',
|
72 | shutdown : 'tbw',
|
73 | sys : 'tbw',
|
74 | update : 'tbw',
|
75 | }
|
76 |
|
77 | type IoEventName = keyof typeof IO_EVENT_DICT
|
78 |
|
79 | interface IoEventScan {
|
80 | name : 'scan',
|
81 | payload : EventScanPayload,
|
82 | }
|
83 |
|
84 | interface IoEventJsonRpc {
|
85 | name: 'jsonrpc',
|
86 | payload: JsonRpcPayload,
|
87 | }
|
88 |
|
89 | interface IoEventAny {
|
90 | name: IoEventName,
|
91 | payload: any,
|
92 | }
|
93 |
|
94 | type IoEvent = IoEventScan | IoEventJsonRpc | IoEventAny
|
95 |
|
96 | export class Io {
|
97 |
|
98 | private readonly id : string
|
99 | private readonly protocol : string
|
100 | private eventBuffer : IoEvent[] = []
|
101 | private ws : undefined | WebSocket
|
102 |
|
103 | private readonly state = new StateSwitch('Io', { log })
|
104 |
|
105 | private reconnectTimer? : NodeJS.Timer
|
106 | private reconnectTimeout? : number
|
107 |
|
108 | private lifeTimer? : NodeJS.Timer
|
109 |
|
110 | private onMessage: undefined | AnyFunction
|
111 |
|
112 | private scanPayload?: EventScanPayload
|
113 |
|
114 | protected jsonRpc?: Peer
|
115 |
|
116 | constructor (
|
117 | private options: IoOptions,
|
118 | ) {
|
119 | options.apihost = options.apihost || config.apihost
|
120 | options.protocol = options.protocol || config.default.DEFAULT_PROTOCOL
|
121 |
|
122 | this.id = options.wechaty.id
|
123 |
|
124 | this.protocol = options.protocol + '|' + options.wechaty.id
|
125 | log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], cuid[%s]',
|
126 | options.apihost,
|
127 | options.token,
|
128 | options.protocol,
|
129 | this.id,
|
130 | )
|
131 |
|
132 | if (options.hostiePort) {
|
133 | this.jsonRpc = getPeer({
|
134 | hostieGrpcPort: this.options.hostiePort!,
|
135 | })
|
136 | }
|
137 |
|
138 | }
|
139 |
|
140 | public toString () {
|
141 | return `Io<${this.options.token}>`
|
142 | }
|
143 |
|
144 | private connected () {
|
145 | return this.ws && this.ws.readyState === WebSocket.OPEN
|
146 | }
|
147 |
|
148 | public async start (): Promise<void> {
|
149 | log.verbose('Io', 'start()')
|
150 |
|
151 | if (this.lifeTimer) {
|
152 | throw new Error('lifeTimer exist')
|
153 | }
|
154 |
|
155 | this.state.on('pending')
|
156 |
|
157 | try {
|
158 | this.initEventHook()
|
159 |
|
160 | this.ws = await this.initWebSocket()
|
161 |
|
162 | this.options.wechaty.on('scan', (qrcode, status) => {
|
163 | this.scanPayload = {
|
164 | ...this.scanPayload,
|
165 | qrcode,
|
166 | status,
|
167 | }
|
168 | })
|
169 |
|
170 | this.lifeTimer = setInterval(() => {
|
171 | if (this.ws && this.connected()) {
|
172 | log.silly('Io', 'start() setInterval() ws.ping()')
|
173 |
|
174 | this.ws.ping()
|
175 | }
|
176 | }, 1000 * 10)
|
177 |
|
178 | this.state.on(true)
|
179 |
|
180 | } catch (e) {
|
181 | log.warn('Io', 'start() exception: %s', e.message)
|
182 | this.state.off(true)
|
183 | throw e
|
184 | }
|
185 | }
|
186 |
|
187 | private initEventHook () {
|
188 | log.verbose('Io', 'initEventHook()')
|
189 | const wechaty = this.options.wechaty
|
190 |
|
191 | wechaty.on('error', error => this.send({ name: 'error', payload: error }))
|
192 | wechaty.on('heartbeat', data => this.send({ name: 'heartbeat', payload: { cuid: this.id, data } }))
|
193 | wechaty.on('login', user => this.send({ name: 'login', payload: user }))
|
194 | wechaty.on('logout', user => this.send({ name: 'logout', payload: user }))
|
195 | wechaty.on('message', message => this.ioMessage(message))
|
196 |
|
197 |
|
198 |
|
199 | wechaty.on('scan', (qrcode, status) => this.send({ name: 'scan', payload: { qrcode, status } } as IoEventScan))
|
200 | }
|
201 |
|
202 | private async initWebSocket (): Promise<WebSocket> {
|
203 | log.verbose('Io', 'initWebSocket()')
|
204 |
|
205 |
|
206 |
|
207 | const auth = 'Token ' + this.options.token
|
208 | const headers = { Authorization: auth }
|
209 |
|
210 | if (!this.options.apihost) {
|
211 | throw new Error('no apihost')
|
212 | }
|
213 | let endpoint = 'wss://' + this.options.apihost + '/v0/websocket'
|
214 |
|
215 |
|
216 |
|
217 | if (!/api\.chatie\.io/.test(this.options.apihost)) {
|
218 | endpoint = 'ws://' + this.options.apihost + '/v0/websocket'
|
219 | }
|
220 |
|
221 | const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })
|
222 |
|
223 | ws.on('open', () => this.wsOnOpen(ws))
|
224 | ws.on('message', data => this.wsOnMessage(data))
|
225 | ws.on('error', e => this.wsOnError(e))
|
226 | ws.on('close', (code, reason) => this.wsOnClose(ws, code, reason))
|
227 |
|
228 | await new Promise((resolve, reject) => {
|
229 | ws.once('open', resolve)
|
230 | ws.once('error', reject)
|
231 | ws.once('close', reject)
|
232 | })
|
233 |
|
234 | return ws
|
235 | }
|
236 |
|
237 | private async wsOnOpen (ws: WebSocket): Promise<void> {
|
238 | if (this.protocol !== ws.protocol) {
|
239 | log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
|
240 |
|
241 | }
|
242 | log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
|
243 |
|
244 |
|
245 |
|
246 |
|
247 |
|
248 |
|
249 | this.reconnectTimeout = undefined
|
250 |
|
251 | const name = 'sys'
|
252 | const payload = 'Wechaty version ' + this.options.wechaty.version() + ` with CUID: ${this.id}`
|
253 |
|
254 | const initEvent: IoEvent = {
|
255 | name,
|
256 | payload,
|
257 | }
|
258 | await this.send(initEvent)
|
259 | }
|
260 |
|
261 | private async wsOnMessage (data: WebSocket.Data) {
|
262 | log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
|
263 |
|
264 |
|
265 |
|
266 | if (typeof data !== 'string') {
|
267 | throw new Error('data should be string...')
|
268 | }
|
269 |
|
270 | const ioEvent: IoEvent = {
|
271 | name : 'raw',
|
272 | payload : data,
|
273 | }
|
274 |
|
275 | try {
|
276 | const obj = JSON.parse(data)
|
277 | ioEvent.name = obj.name
|
278 | ioEvent.payload = obj.payload
|
279 | } catch (e) {
|
280 | log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
|
281 | }
|
282 |
|
283 | switch (ioEvent.name) {
|
284 | case 'botie':
|
285 | {
|
286 | const payload = ioEvent.payload
|
287 | if (payload.onMessage) {
|
288 | const script = payload.script
|
289 | try {
|
290 | |
291 |
|
292 |
|
293 |
|
294 |
|
295 |
|
296 | const fn = eval(script)
|
297 |
|
298 | if (typeof fn === 'function') {
|
299 | this.onMessage = fn
|
300 | } else {
|
301 | log.warn('Io', 'server pushed function is invalid')
|
302 | }
|
303 | } catch (e) {
|
304 | log.warn('Io', 'server pushed function exception: %s', e)
|
305 | this.options.wechaty.emit('error', e)
|
306 | }
|
307 | }
|
308 | }
|
309 | break
|
310 |
|
311 | case 'reset':
|
312 | log.verbose('Io', 'on(reset): %s', ioEvent.payload)
|
313 | await this.options.wechaty.reset(ioEvent.payload)
|
314 | break
|
315 |
|
316 | case 'shutdown':
|
317 | log.info('Io', 'on(shutdown): %s', ioEvent.payload)
|
318 | process.exit(0)
|
319 |
|
320 | break
|
321 |
|
322 | case 'update':
|
323 | log.verbose('Io', 'on(update): %s', ioEvent.payload)
|
324 | {
|
325 | const wechaty = this.options.wechaty
|
326 | if (wechaty.logonoff()) {
|
327 | const loginEvent: IoEvent = {
|
328 | name : 'login',
|
329 | payload : {
|
330 | id : wechaty.userSelf().id,
|
331 | name : wechaty.userSelf().name(),
|
332 | },
|
333 | }
|
334 | await this.send(loginEvent)
|
335 | }
|
336 |
|
337 | if (this.scanPayload) {
|
338 | const scanEvent: IoEventScan = {
|
339 | name: 'scan',
|
340 | payload: this.scanPayload,
|
341 | }
|
342 | await this.send(scanEvent)
|
343 | }
|
344 | }
|
345 |
|
346 | break
|
347 |
|
348 | case 'sys':
|
349 |
|
350 | break
|
351 |
|
352 | case 'logout':
|
353 | log.info('Io', 'on(logout): %s', ioEvent.payload)
|
354 | await this.options.wechaty.logout()
|
355 | break
|
356 |
|
357 | case 'jsonrpc':
|
358 | log.info('Io', 'on(jsonrpc): %s', ioEvent.payload)
|
359 |
|
360 | try {
|
361 | const request = (ioEvent as IoEventJsonRpc).payload
|
362 | if (!isJsonRpcRequest(request)) {
|
363 | log.warn('Io', 'on(jsonrpc) payload is not a jsonrpc request: %s', JSON.stringify(request))
|
364 | return
|
365 | }
|
366 |
|
367 | if (!this.jsonRpc) {
|
368 | throw new Error('jsonRpc not initialized!')
|
369 | }
|
370 |
|
371 | const response = await this.jsonRpc.exec(request)
|
372 | if (!response) {
|
373 | log.warn('Io', 'on(jsonrpc) response is undefined.')
|
374 | return
|
375 | }
|
376 | const payload = parse(response) as JsonRpcPayloadResponse
|
377 |
|
378 | const jsonrpcEvent: IoEventJsonRpc = {
|
379 | name: 'jsonrpc',
|
380 | payload,
|
381 | }
|
382 |
|
383 | log.verbose('Io', 'on(jsonrpc) send(%s)', response)
|
384 | await this.send(jsonrpcEvent)
|
385 |
|
386 | } catch (e) {
|
387 | log.error('Io', 'on(jsonrpc): %s', e)
|
388 | }
|
389 |
|
390 | break
|
391 |
|
392 | default:
|
393 | log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
|
394 | break
|
395 | }
|
396 | }
|
397 |
|
398 |
|
399 |
|
400 | private wsOnError (e?: Error) {
|
401 | log.warn('Io', 'initWebSocket() error event[%s]', e && e.message)
|
402 | if (!e) {
|
403 | return
|
404 | }
|
405 | this.options.wechaty.emit('error', e)
|
406 |
|
407 |
|
408 |
|
409 |
|
410 |
|
411 |
|
412 | }
|
413 |
|
414 | private wsOnClose (
|
415 | ws : WebSocket,
|
416 | code : number,
|
417 | message : string,
|
418 | ): void {
|
419 | if (this.state.on()) {
|
420 | log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
|
421 | ws.close()
|
422 | this.reconnect()
|
423 | }
|
424 | }
|
425 |
|
426 | private reconnect () {
|
427 | log.verbose('Io', 'reconnect()')
|
428 |
|
429 | if (this.state.off()) {
|
430 | log.warn('Io', 'reconnect() canceled because state.target() === offline')
|
431 | return
|
432 | }
|
433 |
|
434 | if (this.connected()) {
|
435 | log.warn('Io', 'reconnect() on a already connected io')
|
436 | return
|
437 | }
|
438 | if (this.reconnectTimer) {
|
439 | log.warn('Io', 'reconnect() on a already re-connecting io')
|
440 | return
|
441 | }
|
442 |
|
443 | if (!this.reconnectTimeout) {
|
444 | this.reconnectTimeout = 1
|
445 | } else if (this.reconnectTimeout < 10 * 1000) {
|
446 | this.reconnectTimeout *= 3
|
447 | }
|
448 |
|
449 | log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
|
450 | this.reconnectTimer = setTimeout(async () => {
|
451 | this.reconnectTimer = undefined
|
452 | await this.initWebSocket()
|
453 | }, this.reconnectTimeout)
|
454 | }
|
455 |
|
456 | private async send (ioEvent?: IoEvent): Promise<void> {
|
457 | if (!this.ws) {
|
458 | throw new Error('no ws')
|
459 | }
|
460 |
|
461 | const ws = this.ws
|
462 |
|
463 | if (ioEvent) {
|
464 | log.silly('Io', 'send(%s)', JSON.stringify(ioEvent))
|
465 | this.eventBuffer.push(ioEvent)
|
466 | } else { log.silly('Io', 'send()') }
|
467 |
|
468 | if (!this.connected()) {
|
469 | log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
|
470 | return
|
471 | }
|
472 |
|
473 | const list: Array<Promise<any>> = []
|
474 | while (this.eventBuffer.length) {
|
475 | const data = JSON.stringify(
|
476 | this.eventBuffer.shift(),
|
477 | )
|
478 | const p = new Promise((resolve, reject) => ws.send(
|
479 | data,
|
480 | (err: undefined | Error) => {
|
481 | if (err) {
|
482 | reject(err)
|
483 | } else {
|
484 | resolve()
|
485 | }
|
486 | },
|
487 | ))
|
488 | list.push(p)
|
489 | }
|
490 |
|
491 | try {
|
492 | await Promise.all(list)
|
493 | } catch (e) {
|
494 | log.error('Io', 'send() exceptio: %s', e.stack)
|
495 | throw e
|
496 | }
|
497 | }
|
498 |
|
499 | public async stop (): Promise<void> {
|
500 | log.verbose('Io', 'stop()')
|
501 |
|
502 | if (!this.ws) {
|
503 | throw new Error('no ws')
|
504 | }
|
505 |
|
506 | this.state.off('pending')
|
507 |
|
508 |
|
509 | await this.send()
|
510 | this.eventBuffer = []
|
511 |
|
512 | if (this.reconnectTimer) {
|
513 | clearTimeout(this.reconnectTimer)
|
514 | this.reconnectTimer = undefined
|
515 | }
|
516 |
|
517 | if (this.lifeTimer) {
|
518 | clearInterval(this.lifeTimer)
|
519 | this.lifeTimer = undefined
|
520 | }
|
521 |
|
522 | this.ws.close()
|
523 | await new Promise(resolve => {
|
524 | if (this.ws) {
|
525 | this.ws.once('close', resolve)
|
526 | } else {
|
527 | resolve()
|
528 | }
|
529 | })
|
530 | this.ws = undefined
|
531 |
|
532 | this.state.off(true)
|
533 | }
|
534 |
|
535 | |
536 |
|
537 |
|
538 |
|
539 |
|
540 | private async ioMessage (m: Message): Promise<void> {
|
541 | log.silly('Io', 'ioMessage() is a nop function before be overwriten from cloud')
|
542 | if (typeof this.onMessage === 'function') {
|
543 | await this.onMessage(m)
|
544 | }
|
545 | }
|
546 |
|
547 | }
|