UNPKG

14.5 kBPlain TextView Raw
1/**
2 * Wechaty Chatbot SDK - https://github.com/wechaty/wechaty
3 *
4 * @copyright 2016 Huan LI (李卓桓) <https://github.com/huan>, and
5 * Wechaty Contributors <https://github.com/wechaty>.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 */
20import { StateSwitch } from 'state-switch'
21import WebSocket from 'ws'
22
23import {
24 Message,
25} from './user/mod'
26
27import {
28 EventScanPayload,
29} from 'wechaty-puppet'
30
31import Peer, {
32 JsonRpcPayload,
33 JsonRpcPayloadResponse,
34 parse,
35} from 'json-rpc-peer'
36
37import {
38 config,
39 log,
40} from './config'
41import {
42 AnyFunction,
43} from './types'
44import {
45 Wechaty,
46} from './wechaty'
47
48import {
49 getPeer,
50 isJsonRpcRequest,
51} from './io-peer/io-peer'
52
53export interface IoOptions {
54 wechaty: Wechaty,
55 token: string,
56 apihost?: string,
57 protocol?: string,
58 hostiePort?:number,
59}
60
61export const IO_EVENT_DICT = {
62 botie : 'tbw',
63 error : 'tbw',
64 heartbeat : 'tbw',
65 jsonrpc : 'JSON RPC',
66 login : 'tbw',
67 logout : 'tbw',
68 message : 'tbw',
69 raw : 'tbw',
70 reset : 'tbw',
71 scan : 'tbw',
72 shutdown : 'tbw',
73 sys : 'tbw',
74 update : 'tbw',
75}
76
77type IoEventName = keyof typeof IO_EVENT_DICT
78
79interface IoEventScan {
80 name : 'scan',
81 payload : EventScanPayload,
82}
83
84interface IoEventJsonRpc {
85 name: 'jsonrpc',
86 payload: JsonRpcPayload,
87}
88
89interface IoEventAny {
90 name: IoEventName,
91 payload: any,
92}
93
94type IoEvent = IoEventScan | IoEventJsonRpc | IoEventAny
95
96export class Io {
97
98 private readonly id : string
99 private readonly protocol : string
100 private eventBuffer : IoEvent[] = []
101 private ws : undefined | WebSocket
102
103 private readonly state = new StateSwitch('Io', { log })
104
105 private reconnectTimer? : NodeJS.Timer
106 private reconnectTimeout? : number
107
108 private lifeTimer? : NodeJS.Timer
109
110 private onMessage: undefined | AnyFunction
111
112 private scanPayload?: EventScanPayload
113
114 protected jsonRpc?: Peer
115
116 constructor (
117 private options: IoOptions,
118 ) {
119 options.apihost = options.apihost || config.apihost
120 options.protocol = options.protocol || config.default.DEFAULT_PROTOCOL
121
122 this.id = options.wechaty.id
123
124 this.protocol = options.protocol + '|' + options.wechaty.id
125 log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], cuid[%s]',
126 options.apihost,
127 options.token,
128 options.protocol,
129 this.id,
130 )
131
132 if (options.hostiePort) {
133 this.jsonRpc = getPeer({
134 hostieGrpcPort: this.options.hostiePort!,
135 })
136 }
137
138 }
139
140 public toString () {
141 return `Io<${this.options.token}>`
142 }
143
144 private connected () {
145 return this.ws && this.ws.readyState === WebSocket.OPEN
146 }
147
148 public async start (): Promise<void> {
149 log.verbose('Io', 'start()')
150
151 if (this.lifeTimer) {
152 throw new Error('lifeTimer exist')
153 }
154
155 this.state.on('pending')
156
157 try {
158 this.initEventHook()
159
160 this.ws = await this.initWebSocket()
161
162 this.options.wechaty.on('scan', (qrcode, status) => {
163 this.scanPayload = {
164 ...this.scanPayload,
165 qrcode,
166 status,
167 }
168 })
169
170 this.lifeTimer = setInterval(() => {
171 if (this.ws && this.connected()) {
172 log.silly('Io', 'start() setInterval() ws.ping()')
173 // TODO: check 'pong' event on ws
174 this.ws.ping()
175 }
176 }, 1000 * 10)
177
178 this.state.on(true)
179
180 } catch (e) {
181 log.warn('Io', 'start() exception: %s', e.message)
182 this.state.off(true)
183 throw e
184 }
185 }
186
187 private initEventHook () {
188 log.verbose('Io', 'initEventHook()')
189 const wechaty = this.options.wechaty
190
191 wechaty.on('error', error => this.send({ name: 'error', payload: error }))
192 wechaty.on('heartbeat', data => this.send({ name: 'heartbeat', payload: { cuid: this.id, data } }))
193 wechaty.on('login', user => this.send({ name: 'login', payload: user }))
194 wechaty.on('logout', user => this.send({ name: 'logout', payload: user }))
195 wechaty.on('message', message => this.ioMessage(message))
196
197 // FIXME: payload schema need to be defined universal
198 // wechaty.on('scan', (url, code) => this.send({ name: 'scan', payload: { url, code } }))
199 wechaty.on('scan', (qrcode, status) => this.send({ name: 'scan', payload: { qrcode, status } } as IoEventScan))
200 }
201
202 private async initWebSocket (): Promise<WebSocket> {
203 log.verbose('Io', 'initWebSocket()')
204 // this.state.current('on', false)
205
206 // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
207 const auth = 'Token ' + this.options.token
208 const headers = { Authorization: auth }
209
210 if (!this.options.apihost) {
211 throw new Error('no apihost')
212 }
213 let endpoint = 'wss://' + this.options.apihost + '/v0/websocket'
214
215 // XXX quick and dirty: use no ssl for APIHOST other than official
216 // FIXME: use a configuarable VARIABLE for the domain name at here:
217 if (!/api\.chatie\.io/.test(this.options.apihost)) {
218 endpoint = 'ws://' + this.options.apihost + '/v0/websocket'
219 }
220
221 const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })
222
223 ws.on('open', () => this.wsOnOpen(ws))
224 ws.on('message', data => this.wsOnMessage(data))
225 ws.on('error', e => this.wsOnError(e))
226 ws.on('close', (code, reason) => this.wsOnClose(ws, code, reason))
227
228 await new Promise((resolve, reject) => {
229 ws.once('open', resolve)
230 ws.once('error', reject)
231 ws.once('close', reject)
232 })
233
234 return ws
235 }
236
237 private async wsOnOpen (ws: WebSocket): Promise<void> {
238 if (this.protocol !== ws.protocol) {
239 log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
240 // XXX deal with error?
241 }
242 log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
243 // this.currentState('connected')
244 // this.state.current('on')
245
246 // FIXME: how to keep alive???
247 // ws._socket.setKeepAlive(true, 100)
248
249 this.reconnectTimeout = undefined
250
251 const name = 'sys'
252 const payload = 'Wechaty version ' + this.options.wechaty.version() + ` with CUID: ${this.id}`
253
254 const initEvent: IoEvent = {
255 name,
256 payload,
257 }
258 await this.send(initEvent)
259 }
260
261 private async wsOnMessage (data: WebSocket.Data) {
262 log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
263 // flags.binary will be set if a binary data is received.
264 // flags.masked will be set if the data was masked.
265
266 if (typeof data !== 'string') {
267 throw new Error('data should be string...')
268 }
269
270 const ioEvent: IoEvent = {
271 name : 'raw',
272 payload : data,
273 }
274
275 try {
276 const obj = JSON.parse(data)
277 ioEvent.name = obj.name
278 ioEvent.payload = obj.payload
279 } catch (e) {
280 log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
281 }
282
283 switch (ioEvent.name) {
284 case 'botie':
285 {
286 const payload = ioEvent.payload
287 if (payload.onMessage) {
288 const script = payload.script
289 try {
290 /**
291 * https://github.com/Chatie/botie/issues/2
292 * const AsyncFunction = Object.getPrototypeOf(async () => {}).constructor
293 * const fn = new AsyncFunction('require', 'github', 'context', script)
294 */
295 // eslint-disable-next-line
296 const fn = eval(script)
297
298 if (typeof fn === 'function') {
299 this.onMessage = fn
300 } else {
301 log.warn('Io', 'server pushed function is invalid')
302 }
303 } catch (e) {
304 log.warn('Io', 'server pushed function exception: %s', e)
305 this.options.wechaty.emit('error', e)
306 }
307 }
308 }
309 break
310
311 case 'reset':
312 log.verbose('Io', 'on(reset): %s', ioEvent.payload)
313 await this.options.wechaty.reset(ioEvent.payload)
314 break
315
316 case 'shutdown':
317 log.info('Io', 'on(shutdown): %s', ioEvent.payload)
318 process.exit(0)
319 // eslint-disable-next-line
320 break
321
322 case 'update':
323 log.verbose('Io', 'on(update): %s', ioEvent.payload)
324 {
325 const wechaty = this.options.wechaty
326 if (wechaty.logonoff()) {
327 const loginEvent: IoEvent = {
328 name : 'login',
329 payload : {
330 id : wechaty.userSelf().id,
331 name : wechaty.userSelf().name(),
332 },
333 }
334 await this.send(loginEvent)
335 }
336
337 if (this.scanPayload) {
338 const scanEvent: IoEventScan = {
339 name: 'scan',
340 payload: this.scanPayload,
341 }
342 await this.send(scanEvent)
343 }
344 }
345
346 break
347
348 case 'sys':
349 // do nothing
350 break
351
352 case 'logout':
353 log.info('Io', 'on(logout): %s', ioEvent.payload)
354 await this.options.wechaty.logout()
355 break
356
357 case 'jsonrpc':
358 log.info('Io', 'on(jsonrpc): %s', ioEvent.payload)
359
360 try {
361 const request = (ioEvent as IoEventJsonRpc).payload
362 if (!isJsonRpcRequest(request)) {
363 log.warn('Io', 'on(jsonrpc) payload is not a jsonrpc request: %s', JSON.stringify(request))
364 return
365 }
366
367 if (!this.jsonRpc) {
368 throw new Error('jsonRpc not initialized!')
369 }
370
371 const response = await this.jsonRpc.exec(request)
372 if (!response) {
373 log.warn('Io', 'on(jsonrpc) response is undefined.')
374 return
375 }
376 const payload = parse(response) as JsonRpcPayloadResponse
377
378 const jsonrpcEvent: IoEventJsonRpc = {
379 name: 'jsonrpc',
380 payload,
381 }
382
383 log.verbose('Io', 'on(jsonrpc) send(%s)', response)
384 await this.send(jsonrpcEvent)
385
386 } catch (e) {
387 log.error('Io', 'on(jsonrpc): %s', e)
388 }
389
390 break
391
392 default:
393 log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
394 break
395 }
396 }
397
398 // FIXME: it seems the parameter `e` might be `undefined`.
399 // @types/ws might has bug for `ws.on('error', e => this.wsOnError(e))`
400 private wsOnError (e?: Error) {
401 log.warn('Io', 'initWebSocket() error event[%s]', e && e.message)
402 if (!e) {
403 return
404 }
405 this.options.wechaty.emit('error', e)
406
407 // when `error`, there must have already a `close` event
408 // we should not call this.reconnect() again
409 //
410 // this.close()
411 // this.reconnect()
412 }
413
414 private wsOnClose (
415 ws : WebSocket,
416 code : number,
417 message : string,
418 ): void {
419 if (this.state.on()) {
420 log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
421 ws.close()
422 this.reconnect()
423 }
424 }
425
426 private reconnect () {
427 log.verbose('Io', 'reconnect()')
428
429 if (this.state.off()) {
430 log.warn('Io', 'reconnect() canceled because state.target() === offline')
431 return
432 }
433
434 if (this.connected()) {
435 log.warn('Io', 'reconnect() on a already connected io')
436 return
437 }
438 if (this.reconnectTimer) {
439 log.warn('Io', 'reconnect() on a already re-connecting io')
440 return
441 }
442
443 if (!this.reconnectTimeout) {
444 this.reconnectTimeout = 1
445 } else if (this.reconnectTimeout < 10 * 1000) {
446 this.reconnectTimeout *= 3
447 }
448
449 log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
450 this.reconnectTimer = setTimeout(async () => {
451 this.reconnectTimer = undefined
452 await this.initWebSocket()
453 }, this.reconnectTimeout)// as any as NodeJS.Timer
454 }
455
456 private async send (ioEvent?: IoEvent): Promise<void> {
457 if (!this.ws) {
458 throw new Error('no ws')
459 }
460
461 const ws = this.ws
462
463 if (ioEvent) {
464 log.silly('Io', 'send(%s)', JSON.stringify(ioEvent))
465 this.eventBuffer.push(ioEvent)
466 } else { log.silly('Io', 'send()') }
467
468 if (!this.connected()) {
469 log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
470 return
471 }
472
473 const list: Array<Promise<any>> = []
474 while (this.eventBuffer.length) {
475 const data = JSON.stringify(
476 this.eventBuffer.shift(),
477 )
478 const p = new Promise((resolve, reject) => ws.send(
479 data,
480 (err: undefined | Error) => {
481 if (err) {
482 reject(err)
483 } else {
484 resolve()
485 }
486 },
487 ))
488 list.push(p)
489 }
490
491 try {
492 await Promise.all(list)
493 } catch (e) {
494 log.error('Io', 'send() exceptio: %s', e.stack)
495 throw e
496 }
497 }
498
499 public async stop (): Promise<void> {
500 log.verbose('Io', 'stop()')
501
502 if (!this.ws) {
503 throw new Error('no ws')
504 }
505
506 this.state.off('pending')
507
508 // try to send IoEvents in buffer
509 await this.send()
510 this.eventBuffer = []
511
512 if (this.reconnectTimer) {
513 clearTimeout(this.reconnectTimer)
514 this.reconnectTimer = undefined
515 }
516
517 if (this.lifeTimer) {
518 clearInterval(this.lifeTimer)
519 this.lifeTimer = undefined
520 }
521
522 this.ws.close()
523 await new Promise(resolve => {
524 if (this.ws) {
525 this.ws.once('close', resolve)
526 } else {
527 resolve()
528 }
529 })
530 this.ws = undefined
531
532 this.state.off(true)
533 }
534
535 /**
536 *
537 * Prepare to be overwriten by server setting
538 *
539 */
540 private async ioMessage (m: Message): Promise<void> {
541 log.silly('Io', 'ioMessage() is a nop function before be overwriten from cloud')
542 if (typeof this.onMessage === 'function') {
543 await this.onMessage(m)
544 }
545 }
546
547}