UNPKG

15 kBPlain TextView Raw
1/**
2 * Wechaty Chatbot SDK - https://github.com/wechaty/wechaty
3 *
4 * @copyright 2016 Huan LI (李卓桓) <https://github.com/huan>, and
5 * Wechaty Contributors <https://github.com/wechaty>.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 */
20import WebSocket from 'ws'
21
22import {
23 Message,
24} from './user/mod'
25
26import {
27 StateSwitch,
28
29 EventScanPayload,
30} from 'wechaty-puppet'
31
32import Peer, {
33 JsonRpcPayload,
34 JsonRpcPayloadResponse,
35 parse,
36} from 'json-rpc-peer'
37
38import {
39 config,
40 log,
41} from './config'
42import {
43 AnyFunction,
44} from './types'
45import {
46 Wechaty,
47} from './wechaty'
48
49import {
50 getPeer,
51 isJsonRpcRequest,
52} from './io-peer/io-peer'
53
54export interface IoOptions {
55 wechaty : Wechaty,
56 token : string,
57 apihost? : string,
58 protocol? : string,
59 servicePort? : number,
60}
61
62export const IO_EVENT_DICT = {
63 botie : 'tbw',
64 error : 'tbw',
65 heartbeat : 'tbw',
66 jsonrpc : 'JSON RPC',
67 login : 'tbw',
68 logout : 'tbw',
69 message : 'tbw',
70 raw : 'tbw',
71 reset : 'tbw',
72 scan : 'tbw',
73 shutdown : 'tbw',
74 sys : 'tbw',
75 update : 'tbw',
76}
77
78type IoEventName = keyof typeof IO_EVENT_DICT
79
80interface IoEventScan {
81 name : 'scan',
82 payload : EventScanPayload,
83}
84
85interface IoEventJsonRpc {
86 name: 'jsonrpc',
87 payload: JsonRpcPayload,
88}
89
90interface IoEventAny {
91 name: IoEventName,
92 payload: any,
93}
94
95type IoEvent = IoEventScan | IoEventJsonRpc | IoEventAny
96
97/**
98 * https://github.com/Chatie/botie/issues/2
99 * https://github.com/actions/github-script/blob/f035cea4677903b153fa754aa8c2bba66f8dc3eb/src/async-function.ts#L6
100 */
101const AsyncFunction = Object.getPrototypeOf(async () => null).constructor
102
103// function callAsyncFunction<U extends {} = {}, V = unknown> (
104// args: U,
105// source: string
106// ): Promise<V> {
107// const fn = new AsyncFunction(...Object.keys(args), source)
108// return fn(...Object.values(args))
109// }
110
111export class Io {
112
113 private readonly id : string
114 private readonly protocol : string
115 private eventBuffer : IoEvent[] = []
116 private ws : undefined | WebSocket
117
118 private readonly state = new StateSwitch('Io', { log })
119
120 private reconnectTimer? : NodeJS.Timer
121 private reconnectTimeout? : number
122
123 private lifeTimer? : NodeJS.Timer
124
125 private onMessage: undefined | AnyFunction
126
127 private scanPayload?: EventScanPayload
128
129 protected jsonRpc?: Peer
130
131 constructor (
132 private options: IoOptions,
133 ) {
134 options.apihost = options.apihost || config.apihost
135 options.protocol = options.protocol || config.default.DEFAULT_PROTOCOL
136
137 this.id = options.wechaty.id
138
139 this.protocol = options.protocol + '|' + options.wechaty.id + '|' + config.serviceIp + '|' + options.servicePort
140 log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], cuid[%s]',
141 options.apihost,
142 options.token,
143 options.protocol,
144 this.id,
145 )
146
147 if (options.servicePort) {
148 this.jsonRpc = getPeer({
149 serviceGrpcPort: this.options.servicePort!,
150 })
151 }
152
153 }
154
155 public toString () {
156 return `Io<${this.options.token}>`
157 }
158
159 private connected () {
160 return this.ws && this.ws.readyState === WebSocket.OPEN
161 }
162
163 public async start (): Promise<void> {
164 log.verbose('Io', 'start()')
165
166 if (this.lifeTimer) {
167 throw new Error('lifeTimer exist')
168 }
169
170 this.state.on('pending')
171
172 try {
173 this.initEventHook()
174
175 this.ws = await this.initWebSocket()
176
177 this.options.wechaty.on('login', () => { this.scanPayload = undefined })
178 this.options.wechaty.on('scan', (qrcode, status) => {
179 this.scanPayload = {
180 ...this.scanPayload,
181 qrcode,
182 status,
183 }
184 })
185
186 this.lifeTimer = setInterval(() => {
187 if (this.ws && this.connected()) {
188 log.silly('Io', 'start() setInterval() ws.ping()')
189 // TODO: check 'pong' event on ws
190 this.ws.ping()
191 }
192 }, 1000 * 10)
193
194 this.state.on(true)
195
196 } catch (e) {
197 log.warn('Io', 'start() exception: %s', e.message)
198 this.state.off(true)
199 throw e
200 }
201 }
202
203 private initEventHook () {
204 log.verbose('Io', 'initEventHook()')
205 const wechaty = this.options.wechaty
206
207 wechaty.on('error', error => this.send({ name: 'error', payload: error }))
208 wechaty.on('heartbeat', data => this.send({ name: 'heartbeat', payload: { cuid: this.id, data } }))
209 wechaty.on('login', user => this.send({ name: 'login', payload: user.payload }))
210 wechaty.on('logout', user => this.send({ name: 'logout', payload: user.payload }))
211 wechaty.on('message', message => this.ioMessage(message))
212
213 // FIXME: payload schema need to be defined universal
214 // wechaty.on('scan', (url, code) => this.send({ name: 'scan', payload: { url, code } }))
215 wechaty.on('scan', (qrcode, status) => this.send({ name: 'scan', payload: { qrcode, status } } as IoEventScan))
216 }
217
218 private async initWebSocket (): Promise<WebSocket> {
219 log.verbose('Io', 'initWebSocket()')
220 // this.state.current('on', false)
221
222 // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
223 const auth = 'Token ' + this.options.token
224 const headers = { Authorization: auth }
225
226 if (!this.options.apihost) {
227 throw new Error('no apihost')
228 }
229 let endpoint = 'wss://' + this.options.apihost + '/v0/websocket'
230
231 // XXX quick and dirty: use no ssl for API_HOST other than official
232 // FIXME: use a configurable VARIABLE for the domain name at here:
233 if (!/api\.chatie\.io/.test(this.options.apihost)) {
234 endpoint = 'ws://' + this.options.apihost + '/v0/websocket'
235 }
236
237 const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })
238
239 ws.on('open', () => this.wsOnOpen(ws))
240 ws.on('message', data => this.wsOnMessage(data))
241 ws.on('error', e => this.wsOnError(e))
242 ws.on('close', (code, reason) => this.wsOnClose(ws, code, reason))
243
244 await new Promise((resolve, reject) => {
245 ws.once('open', resolve)
246 ws.once('error', reject)
247 ws.once('close', reject)
248 })
249
250 return ws
251 }
252
253 private async wsOnOpen (ws: WebSocket): Promise<void> {
254 if (this.protocol !== ws.protocol) {
255 log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
256 // XXX deal with error?
257 }
258 log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
259 // this.currentState('connected')
260 // this.state.current('on')
261
262 // FIXME: how to keep alive???
263 // ws._socket.setKeepAlive(true, 100)
264
265 this.reconnectTimeout = undefined
266
267 const name = 'sys'
268 const payload = 'Wechaty version ' + this.options.wechaty.version() + ` with CUID: ${this.id}`
269
270 const initEvent: IoEvent = {
271 name,
272 payload,
273 }
274 await this.send(initEvent)
275 }
276
277 private async wsOnMessage (data: WebSocket.Data) {
278 log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
279 // flags.binary will be set if a binary data is received.
280 // flags.masked will be set if the data was masked.
281
282 if (typeof data !== 'string') {
283 throw new Error('data should be string...')
284 }
285
286 const ioEvent: IoEvent = {
287 name : 'raw',
288 payload : data,
289 }
290
291 try {
292 const obj = JSON.parse(data)
293 ioEvent.name = obj.name
294 ioEvent.payload = obj.payload
295 } catch (e) {
296 log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
297 }
298
299 switch (ioEvent.name) {
300 case 'botie':
301 {
302 const payload = ioEvent.payload
303
304 const args = payload.args
305 const source = payload.source
306
307 try {
308 if (args[0] === 'message' && args.length === 1) {
309 const fn = new AsyncFunction(...args, source)
310 this.onMessage = fn
311 } else {
312 log.warn('Io', 'server pushed function is invalid. args: %s', JSON.stringify(args))
313 }
314 } catch (e) {
315 log.warn('Io', 'server pushed function exception: %s', e)
316 this.options.wechaty.emit('error', e)
317 }
318 }
319 break
320
321 case 'reset':
322 log.verbose('Io', 'on(reset): %s', ioEvent.payload)
323 await this.options.wechaty.reset(ioEvent.payload)
324 break
325
326 case 'shutdown':
327 log.info('Io', 'on(shutdown): %s', ioEvent.payload)
328 process.exit(0)
329 // eslint-disable-next-line
330 break
331
332 case 'update':
333 log.verbose('Io', 'on(update): %s', ioEvent.payload)
334 {
335 const wechaty = this.options.wechaty
336 if (wechaty.logonoff()) {
337 const loginEvent: IoEvent = {
338 name : 'login',
339 payload : (wechaty.userSelf() as any).payload,
340 }
341 await this.send(loginEvent)
342 }
343
344 if (this.scanPayload) {
345 const scanEvent: IoEventScan = {
346 name: 'scan',
347 payload: this.scanPayload,
348 }
349 await this.send(scanEvent)
350 }
351 }
352
353 break
354
355 case 'sys':
356 // do nothing
357 break
358
359 case 'logout':
360 log.info('Io', 'on(logout): %s', ioEvent.payload)
361 await this.options.wechaty.logout()
362 break
363
364 case 'jsonrpc':
365 log.info('Io', 'on(jsonrpc): %s', ioEvent.payload)
366
367 try {
368 const request = (ioEvent as IoEventJsonRpc).payload
369 if (!isJsonRpcRequest(request)) {
370 log.warn('Io', 'on(jsonrpc) payload is not a jsonrpc request: %s', JSON.stringify(request))
371 return
372 }
373
374 if (!this.jsonRpc) {
375 throw new Error('jsonRpc not initialized!')
376 }
377
378 const response = await this.jsonRpc.exec(request)
379 if (!response) {
380 log.warn('Io', 'on(jsonrpc) response is undefined.')
381 return
382 }
383 const payload = parse(response) as JsonRpcPayloadResponse
384
385 const jsonrpcEvent: IoEventJsonRpc = {
386 name: 'jsonrpc',
387 payload,
388 }
389
390 log.verbose('Io', 'on(jsonrpc) send(%s)', response)
391 await this.send(jsonrpcEvent)
392
393 } catch (e) {
394 log.error('Io', 'on(jsonrpc): %s', e)
395 }
396
397 break
398
399 default:
400 log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
401 break
402 }
403 }
404
405 // FIXME: it seems the parameter `e` might be `undefined`.
406 // @types/ws might has bug for `ws.on('error', e => this.wsOnError(e))`
407 private wsOnError (e?: Error) {
408 log.warn('Io', 'initWebSocket() error event[%s]', e && e.message)
409 if (!e) {
410 return
411 }
412 this.options.wechaty.emit('error', e)
413
414 // when `error`, there must have already a `close` event
415 // we should not call this.reconnect() again
416 //
417 // this.close()
418 // this.reconnect()
419 }
420
421 private wsOnClose (
422 ws : WebSocket,
423 code : number,
424 message : string,
425 ): void {
426 if (this.state.on()) {
427 log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
428 ws.close()
429 this.reconnect()
430 }
431 }
432
433 private reconnect () {
434 log.verbose('Io', 'reconnect()')
435
436 if (this.state.off()) {
437 log.warn('Io', 'reconnect() canceled because state.target() === offline')
438 return
439 }
440
441 if (this.connected()) {
442 log.warn('Io', 'reconnect() on a already connected io')
443 return
444 }
445 if (this.reconnectTimer) {
446 log.warn('Io', 'reconnect() on a already re-connecting io')
447 return
448 }
449
450 if (!this.reconnectTimeout) {
451 this.reconnectTimeout = 1
452 } else if (this.reconnectTimeout < 10 * 1000) {
453 this.reconnectTimeout *= 3
454 }
455
456 log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
457 this.reconnectTimer = setTimeout(async () => {
458 this.reconnectTimer = undefined
459 await this.initWebSocket()
460 }, this.reconnectTimeout)// as any as NodeJS.Timer
461 }
462
463 private async send (ioEvent?: IoEvent): Promise<void> {
464 if (!this.ws) {
465 throw new Error('no ws')
466 }
467
468 const ws = this.ws
469
470 if (ioEvent) {
471 log.silly('Io', 'send(%s)', JSON.stringify(ioEvent))
472 this.eventBuffer.push(ioEvent)
473 } else { log.silly('Io', 'send()') }
474
475 if (!this.connected()) {
476 log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
477 return
478 }
479
480 const list: Array<Promise<any>> = []
481 while (this.eventBuffer.length) {
482 const data = JSON.stringify(
483 this.eventBuffer.shift(),
484 )
485 const p = new Promise<void>((resolve, reject) => ws.send(
486 data,
487 (err: undefined | Error) => {
488 if (err) {
489 reject(err)
490 } else {
491 resolve()
492 }
493 },
494 ))
495 list.push(p)
496 }
497
498 try {
499 await Promise.all(list)
500 } catch (e) {
501 log.error('Io', 'send() exception: %s', e.stack)
502 throw e
503 }
504 }
505
506 public async stop (): Promise<void> {
507 log.verbose('Io', 'stop()')
508
509 if (!this.ws) {
510 throw new Error('no ws')
511 }
512
513 this.state.off('pending')
514
515 // try to send IoEvents in buffer
516 await this.send()
517 this.eventBuffer = []
518
519 if (this.reconnectTimer) {
520 clearTimeout(this.reconnectTimer)
521 this.reconnectTimer = undefined
522 }
523
524 if (this.lifeTimer) {
525 clearInterval(this.lifeTimer)
526 this.lifeTimer = undefined
527 }
528
529 this.ws.close()
530 await new Promise<void>(resolve => {
531 if (this.ws) {
532 this.ws.once('close', resolve)
533 } else {
534 resolve()
535 }
536 })
537 this.ws = undefined
538
539 this.state.off(true)
540 }
541
542 /**
543 *
544 * Prepare to be overwritten by server setting
545 *
546 */
547 private async ioMessage (m: Message): Promise<void> {
548 log.silly('Io', 'ioMessage() is a nop function before be overwritten from cloud')
549 if (typeof this.onMessage === 'function') {
550 await this.onMessage(m)
551 }
552 }
553
554 protected async syncMessage (m: Message): Promise<void> {
555 log.silly('Io', 'syncMessage(%s)', m)
556
557 const messageEvent: IoEvent = {
558 name : 'message',
559 payload : (m as any).payload,
560 }
561 await this.send(messageEvent)
562 }
563
564}
565
\No newline at end of file