UNPKG

47.7 kBJavaScriptView Raw
1import EventEmitter from 'eventemitter3';
2import { decode as decodeBase64 } from 'base64-arraybuffer';
3import remove from 'lodash/remove';
4import values from 'lodash/values';
5import d from 'debug';
6import {
7 Conversation,
8 ChatRoom,
9 ServiceConversation,
10 TemporaryConversation,
11} from './conversations';
12import ConversationBase from './conversations/conversation-base';
13import ConversationQuery from './conversation-query';
14import {
15 GenericCommand,
16 SessionCommand,
17 ConvCommand,
18 AckCommand,
19 JsonObjectMessage,
20 ReadCommand,
21 ReadTuple,
22 CommandType,
23 OpType,
24} from '../proto/message';
25import * as Event from './events/im';
26import { ErrorCode, createError } from './error';
27import {
28 Expirable,
29 Cache,
30 keyRemap,
31 trim,
32 internal,
33 throttle,
34 encode,
35 decode,
36 decodeDate,
37 getTime,
38} from './utils';
39import { applyDecorators, applyDispatcher } from './plugin';
40import SessionManager from './session-manager';
41import runSignatureFactory from './signature-factory-runner';
42import { MessageStatus } from './messages/message';
43import { version as VERSION } from '../package.json';
44
45const debug = d('LC:IMClient');
46
47const {
48 INVITED,
49 KICKED,
50 MEMBERS_JOINED,
51 MEMBERS_LEFT,
52 MEMBER_INFO_UPDATED,
53 BLOCKED,
54 UNBLOCKED,
55 MEMBERS_BLOCKED,
56 MEMBERS_UNBLOCKED,
57 MUTED,
58 UNMUTED,
59 MEMBERS_MUTED,
60 MEMBERS_UNMUTED,
61 MESSAGE,
62 UNREAD_MESSAGES_COUNT_UPDATE,
63 CLOSE,
64 CONFLICT,
65 UNHANDLED_MESSAGE,
66 CONVERSATION_INFO_UPDATED,
67 MESSAGE_RECALL,
68 MESSAGE_UPDATE,
69 INFO_UPDATED,
70} = Event;
71
72const isTemporaryConversatrionId = id => /^_tmp:/.test(id);
73
74/**
75 * 1 patch-msg
76 * 1 temp-conv-msg
77 * 0 auto-bind-deviceid-and-installation
78 * 1 transient-msg-ack
79 * 1 keep-notification
80 * 1 partial-failed-msg
81 * @ignore
82 */
83const configBitmap = 0b111011;
84
85export default class IMClient extends EventEmitter {
86 /**
87 * 无法直接实例化,请使用 {@link Realtime#createIMClient} 创建新的 IMClient。
88 *
89 * @extends EventEmitter
90 */
91 constructor(id, options = {}, props) {
92 if (!(id === undefined || typeof id === 'string')) {
93 throw new TypeError(`Client id [${id}] is not a String`);
94 }
95 super();
96 Object.assign(
97 this,
98 {
99 /**
100 * @var id {String} 客户端 id
101 * @memberof IMClient#
102 */
103 id,
104 options,
105 },
106 props
107 );
108
109 if (!this._messageParser) {
110 throw new Error('IMClient must be initialized with a MessageParser');
111 }
112 this._conversationCache = new Cache(`client:${this.id}`);
113 this._ackMessageBuffer = {};
114 internal(this).lastPatchTime = Date.now();
115 internal(this).lastNotificationTime = undefined;
116 internal(this)._eventemitter = new EventEmitter();
117 if (debug.enabled) {
118 values(Event).forEach(event =>
119 this.on(event, (...payload) =>
120 this._debug(`${event} event emitted. %o`, payload)
121 )
122 );
123 }
124 // onIMClientCreate hook
125 applyDecorators(this._plugins.onIMClientCreate, this);
126 }
127
128 _debug(...params) {
129 debug(...params, `[${this.id}]`);
130 }
131
132 /**
133 * @override
134 * @private
135 */
136 async _dispatchCommand(command) {
137 this._debug(trim(command), 'received');
138 if (command.serverTs && command.notificationType === 1) {
139 internal(this).lastNotificationTime = getTime(
140 decodeDate(command.serverTs)
141 );
142 }
143 switch (command.cmd) {
144 case CommandType.conv:
145 return this._dispatchConvMessage(command);
146 case CommandType.direct:
147 return this._dispatchDirectMessage(command);
148 case CommandType.session:
149 return this._dispatchSessionMessage(command);
150 case CommandType.unread:
151 return this._dispatchUnreadMessage(command);
152 case CommandType.rcp:
153 return this._dispatchRcpMessage(command);
154 case CommandType.patch:
155 return this._dispatchPatchMessage(command);
156 default:
157 return this.emit(UNHANDLED_MESSAGE, command);
158 }
159 }
160
161 async _dispatchSessionMessage(message) {
162 const {
163 sessionMessage: { code, reason },
164 } = message;
165 switch (message.op) {
166 case OpType.closed: {
167 internal(this)._eventemitter.emit('close');
168 if (code === ErrorCode.SESSION_CONFLICT) {
169 /**
170 * 用户在其他客户端登录,当前客户端被服务端强行下线。详见文档「单点登录」章节。
171 * @event IMClient#CONFLICT
172 * @param {Object} payload
173 * @param {string} payload.reason 原因
174 */
175 return this.emit(CONFLICT, {
176 reason,
177 });
178 }
179 /**
180 * 当前客户端被服务端强行下线
181 * @event IMClient#CLOSE
182 * @param {Object} payload
183 * @param {Number} payload.code 错误码
184 * @param {String} payload.reason 原因
185 */
186 return this.emit(CLOSE, {
187 code,
188 reason,
189 });
190 }
191 default:
192 this.emit(UNHANDLED_MESSAGE, message);
193 throw new Error('Unrecognized session command');
194 }
195 }
196
197 _dispatchUnreadMessage({ unreadMessage: { convs, notifTime } }) {
198 internal(this).lastUnreadNotifTime = notifTime;
199 // ensure all converstions are cached
200 return this.getConversations(convs.map(conv => conv.cid))
201 .then(() =>
202 // update conversations data
203 Promise.all(
204 convs.map(
205 ({
206 cid,
207 unread,
208 mid,
209 timestamp: ts,
210 from,
211 data,
212 binaryMsg,
213 patchTimestamp,
214 mentioned,
215 }) => {
216 const conversation = this._conversationCache.get(cid);
217 // deleted conversation
218 if (!conversation) return null;
219 let timestamp;
220 if (ts) {
221 timestamp = decodeDate(ts);
222 conversation.lastMessageAt = timestamp; // eslint-disable-line no-param-reassign
223 }
224 return (mid
225 ? this._messageParser.parse(binaryMsg || data).then(message => {
226 const messageProps = {
227 id: mid,
228 cid,
229 timestamp,
230 updatedAt: patchTimestamp,
231 from,
232 };
233 Object.assign(message, messageProps);
234 conversation.lastMessage = message; // eslint-disable-line no-param-reassign
235 })
236 : Promise.resolve()
237 ).then(() => {
238 conversation._setUnreadMessagesMentioned(mentioned);
239 const countNotUpdated =
240 unread === internal(conversation).unreadMessagesCount;
241 if (countNotUpdated) return null; // to be filtered
242 // manipulate internal property directly to skip unreadmessagescountupdate event
243 internal(conversation).unreadMessagesCount = unread;
244 return conversation;
245 });
246 // filter conversations without unread count update
247 }
248 )
249 ).then(conversations =>
250 conversations.filter(conversation => conversation)
251 )
252 )
253 .then(conversations => {
254 if (conversations.length) {
255 /**
256 * 未读消息数目更新
257 * @event IMClient#UNREAD_MESSAGES_COUNT_UPDATE
258 * @since 3.4.0
259 * @param {Conversation[]} conversations 未读消息数目有更新的对话列表
260 */
261 this.emit(UNREAD_MESSAGES_COUNT_UPDATE, conversations);
262 }
263 });
264 }
265
266 async _dispatchRcpMessage(message) {
267 const {
268 rcpMessage,
269 rcpMessage: { read },
270 } = message;
271 const conversationId = rcpMessage.cid;
272 const messageId = rcpMessage.id;
273 const timestamp = decodeDate(rcpMessage.t);
274 const conversation = this._conversationCache.get(conversationId);
275 // conversation not cached means the client does not send the message
276 // during this session
277 if (!conversation) return;
278 conversation._handleReceipt({ messageId, timestamp, read });
279 }
280
281 _dispatchPatchMessage({ patchMessage: { patches } }) {
282 // ensure all converstions are cached
283 return this.getConversations(patches.map(patch => patch.cid)).then(() =>
284 Promise.all(
285 patches.map(
286 ({
287 cid,
288 mid,
289 timestamp,
290 recall,
291 data,
292 patchTimestamp,
293 from,
294 binaryMsg,
295 mentionAll,
296 mentionPids,
297 patchCode,
298 patchReason,
299 }) => {
300 const conversation = this._conversationCache.get(cid);
301 // deleted conversation
302 if (!conversation) return null;
303 return this._messageParser
304 .parse(binaryMsg || data)
305 .then(message => {
306 const patchTime = getTime(decodeDate(patchTimestamp));
307 const messageProps = {
308 id: mid,
309 cid,
310 timestamp,
311 updatedAt: patchTime,
312 from,
313 mentionList: mentionPids,
314 mentionedAll: mentionAll,
315 };
316 Object.assign(message, messageProps);
317 message._setStatus(MessageStatus.SENT);
318 message._updateMentioned(this.id);
319 if (internal(this).lastPatchTime < patchTime) {
320 internal(this).lastPatchTime = patchTime;
321 }
322 // update conversation lastMessage
323 if (
324 conversation.lastMessage &&
325 conversation.lastMessage.id === mid
326 ) {
327 conversation.lastMessage = message; // eslint-disable-line no-param-reassign
328 }
329 let reason;
330 if (patchCode) {
331 reason = {
332 code: patchCode.toNumber(),
333 detail: patchReason,
334 };
335 }
336 if (recall) {
337 /**
338 * 消息被撤回
339 * @event IMClient#MESSAGE_RECALL
340 * @param {AVMessage} message 被撤回的消息
341 * @param {ConversationBase} conversation 消息所在的会话
342 * @param {PatchReason} [reason] 撤回的原因,不存在代表是发送者主动撤回
343 */
344 this.emit(MESSAGE_RECALL, message, conversation, reason);
345 /**
346 * 消息被撤回
347 * @event ConversationBase#MESSAGE_RECALL
348 * @param {AVMessage} message 被撤回的消息
349 * @param {PatchReason} [reason] 撤回的原因,不存在代表是发送者主动撤回
350 */
351 conversation.emit(MESSAGE_RECALL, message, reason);
352 } else {
353 /**
354 * 消息被修改
355 * @event IMClient#MESSAGE_UPDATE
356 * @param {AVMessage} message 被修改的消息
357 * @param {ConversationBase} conversation 消息所在的会话
358 * @param {PatchReason} [reason] 修改的原因,不存在代表是发送者主动修改
359 */
360 this.emit(MESSAGE_UPDATE, message, conversation, reason);
361 /**
362 * 消息被修改
363 * @event ConversationBase#MESSAGE_UPDATE
364 * @param {AVMessage} message 被修改的消息
365 * @param {PatchReason} [reason] 修改的原因,不存在代表是发送者主动修改
366 */
367 conversation.emit(MESSAGE_UPDATE, message, reason);
368 }
369 });
370 }
371 )
372 )
373 );
374 }
375
376 async _dispatchConvMessage(message) {
377 const {
378 convMessage,
379 convMessage: { initBy, m, info, attr },
380 } = message;
381 const conversation = await this.getConversation(convMessage.cid);
382 switch (message.op) {
383 case OpType.joined: {
384 conversation._addMembers([this.id]);
385 const payload = {
386 invitedBy: initBy,
387 };
388 /**
389 * 当前用户被添加至某个对话
390 * @event IMClient#INVITED
391 * @param {Object} payload
392 * @param {String} payload.invitedBy 邀请者 id
393 * @param {ConversationBase} conversation
394 */
395 this.emit(INVITED, payload, conversation);
396 /**
397 * 当前用户被添加至当前对话
398 * @event ConversationBase#INVITED
399 * @param {Object} payload
400 * @param {String} payload.invitedBy 该移除操作的发起者 id
401 */
402 conversation.emit(INVITED, payload);
403 return;
404 }
405 case OpType.left: {
406 conversation._removeMembers([this.id]);
407 const payload = {
408 kickedBy: initBy,
409 };
410 /**
411 * 当前用户被从某个对话中移除
412 * @event IMClient#KICKED
413 * @param {Object} payload
414 * @param {String} payload.kickedBy 该移除操作的发起者 id
415 * @param {ConversationBase} conversation
416 */
417 this.emit(KICKED, payload, conversation);
418 /**
419 * 当前用户被从当前对话中移除
420 * @event ConversationBase#KICKED
421 * @param {Object} payload
422 * @param {String} payload.kickedBy 该移除操作的发起者 id
423 */
424 conversation.emit(KICKED, payload);
425 return;
426 }
427 case OpType.members_joined: {
428 conversation._addMembers(m);
429 const payload = {
430 invitedBy: initBy,
431 members: m,
432 };
433 /**
434 * 有用户被添加至某个对话
435 * @event IMClient#MEMBERS_JOINED
436 * @param {Object} payload
437 * @param {String[]} payload.members 被添加的用户 id 列表
438 * @param {String} payload.invitedBy 邀请者 id
439 * @param {ConversationBase} conversation
440 */
441 this.emit(MEMBERS_JOINED, payload, conversation);
442 /**
443 * 有成员被添加至当前对话
444 * @event ConversationBase#MEMBERS_JOINED
445 * @param {Object} payload
446 * @param {String[]} payload.members 被添加的成员 id 列表
447 * @param {String} payload.invitedBy 邀请者 id
448 */
449 conversation.emit(MEMBERS_JOINED, payload);
450 return;
451 }
452 case OpType.members_left: {
453 conversation._removeMembers(m);
454 const payload = {
455 kickedBy: initBy,
456 members: m,
457 };
458 /**
459 * 有成员被从某个对话中移除
460 * @event IMClient#MEMBERS_LEFT
461 * @param {Object} payload
462 * @param {String[]} payload.members 被移除的成员 id 列表
463 * @param {String} payload.kickedBy 该移除操作的发起者 id
464 * @param {ConversationBase} conversation
465 */
466 this.emit(MEMBERS_LEFT, payload, conversation);
467 /**
468 * 有成员被从当前对话中移除
469 * @event ConversationBase#MEMBERS_LEFT
470 * @param {Object} payload
471 * @param {String[]} payload.members 被移除的成员 id 列表
472 * @param {String} payload.kickedBy 该移除操作的发起者 id
473 */
474 conversation.emit(MEMBERS_LEFT, payload);
475 return;
476 }
477 case OpType.members_blocked: {
478 const payload = {
479 blockedBy: initBy,
480 members: m,
481 };
482 /**
483 * 有成员被加入某个对话的黑名单
484 * @event IMClient#MEMBERS_BLOCKED
485 * @param {Object} payload
486 * @param {String[]} payload.members 成员 id 列表
487 * @param {String} payload.blockedBy 该操作的发起者 id
488 * @param {ConversationBase} conversation
489 */
490 this.emit(MEMBERS_BLOCKED, payload, conversation);
491 /**
492 * 有成员被加入当前对话的黑名单
493 * @event ConversationBase#MEMBERS_BLOCKED
494 * @param {Object} payload
495 * @param {String[]} payload.members 成员 id 列表
496 * @param {String} payload.blockedBy 该操作的发起者 id
497 */
498 conversation.emit(MEMBERS_BLOCKED, payload);
499 return;
500 }
501 case OpType.members_unblocked: {
502 const payload = {
503 unblockedBy: initBy,
504 members: m,
505 };
506 /**
507 * 有成员被移出某个对话的黑名单
508 * @event IMClient#MEMBERS_UNBLOCKED
509 * @param {Object} payload
510 * @param {String[]} payload.members 成员 id 列表
511 * @param {String} payload.unblockedBy 该操作的发起者 id
512 * @param {ConversationBase} conversation
513 */
514 this.emit(MEMBERS_UNBLOCKED, payload, conversation);
515 /**
516 * 有成员被移出当前对话的黑名单
517 * @event ConversationBase#MEMBERS_UNBLOCKED
518 * @param {Object} payload
519 * @param {String[]} payload.members 成员 id 列表
520 * @param {String} payload.unblockedBy 该操作的发起者 id
521 */
522 conversation.emit(MEMBERS_UNBLOCKED, payload);
523 return;
524 }
525 case OpType.blocked: {
526 const payload = {
527 blockedBy: initBy,
528 };
529 /**
530 * 当前用户被加入某个对话的黑名单
531 * @event IMClient#BLOCKED
532 * @param {Object} payload
533 * @param {String} payload.blockedBy 该操作的发起者 id
534 * @param {ConversationBase} conversation
535 */
536 this.emit(BLOCKED, payload, conversation);
537 /**
538 * 当前用户被加入当前对话的黑名单
539 * @event ConversationBase#BLOCKED
540 * @param {Object} payload
541 * @param {String} payload.blockedBy 该操作的发起者 id
542 */
543 conversation.emit(BLOCKED, payload);
544 return;
545 }
546 case OpType.unblocked: {
547 const payload = {
548 unblockedBy: initBy,
549 };
550 /**
551 * 当前用户被移出某个对话的黑名单
552 * @event IMClient#UNBLOCKED
553 * @param {Object} payload
554 * @param {String} payload.unblockedBy 该操作的发起者 id
555 * @param {ConversationBase} conversation
556 */
557 this.emit(UNBLOCKED, payload, conversation);
558 /**
559 * 当前用户被移出当前对话的黑名单
560 * @event ConversationBase#UNBLOCKED
561 * @param {Object} payload
562 * @param {String} payload.unblockedBy 该操作的发起者 id
563 */
564 conversation.emit(UNBLOCKED, payload);
565 return;
566 }
567 case OpType.members_shutuped: {
568 const payload = {
569 mutedBy: initBy,
570 members: m,
571 };
572 /**
573 * 有成员在某个对话中被禁言
574 * @event IMClient#MEMBERS_MUTED
575 * @param {Object} payload
576 * @param {String[]} payload.members 成员 id 列表
577 * @param {String} payload.mutedBy 该操作的发起者 id
578 * @param {ConversationBase} conversation
579 */
580 this.emit(MEMBERS_MUTED, payload, conversation);
581 /**
582 * 有成员在当前对话中被禁言
583 * @event ConversationBase#MEMBERS_MUTED
584 * @param {Object} payload
585 * @param {String[]} payload.members 成员 id 列表
586 * @param {String} payload.mutedBy 该操作的发起者 id
587 */
588 conversation.emit(MEMBERS_MUTED, payload);
589 return;
590 }
591 case OpType.members_unshutuped: {
592 const payload = {
593 unmutedBy: initBy,
594 members: m,
595 };
596 /**
597 * 有成员在某个对话中被解除禁言
598 * @event IMClient#MEMBERS_UNMUTED
599 * @param {Object} payload
600 * @param {String[]} payload.members 成员 id 列表
601 * @param {String} payload.unmutedBy 该操作的发起者 id
602 * @param {ConversationBase} conversation
603 */
604 this.emit(MEMBERS_UNMUTED, payload, conversation);
605 /**
606 * 有成员在当前对话中被解除禁言
607 * @event ConversationBase#MEMBERS_UNMUTED
608 * @param {Object} payload
609 * @param {String[]} payload.members 成员 id 列表
610 * @param {String} payload.unmutedBy 该操作的发起者 id
611 */
612 conversation.emit(MEMBERS_UNMUTED, payload);
613 return;
614 }
615 case OpType.shutuped: {
616 const payload = {
617 mutedBy: initBy,
618 };
619 /**
620 * 有成员在某个对话中被禁言
621 * @event IMClient#MUTED
622 * @param {Object} payload
623 * @param {String} payload.mutedBy 该操作的发起者 id
624 * @param {ConversationBase} conversation
625 */
626 this.emit(MUTED, payload, conversation);
627 /**
628 * 有成员在当前对话中被禁言
629 * @event ConversationBase#MUTED
630 * @param {Object} payload
631 * @param {String} payload.mutedBy 该操作的发起者 id
632 */
633 conversation.emit(MUTED, payload);
634 return;
635 }
636 case OpType.unshutuped: {
637 const payload = {
638 unmutedBy: initBy,
639 };
640 /**
641 * 有成员在某个对话中被解除禁言
642 * @event IMClient#UNMUTED
643 * @param {Object} payload
644 * @param {String} payload.unmutedBy 该操作的发起者 id
645 * @param {ConversationBase} conversation
646 */
647 this.emit(UNMUTED, payload, conversation);
648 /**
649 * 有成员在当前对话中被解除禁言
650 * @event ConversationBase#UNMUTED
651 * @param {Object} payload
652 * @param {String} payload.unmutedBy 该操作的发起者 id
653 */
654 conversation.emit(UNMUTED, payload);
655 return;
656 }
657 case OpType.member_info_changed: {
658 const { pid, role } = info;
659 const { memberInfoMap } = internal(conversation);
660 // 如果不存在缓存,且不是 role 的更新,则不通知
661 if (!memberInfoMap && !role) return;
662 const memberInfo = await conversation.getMemberInfo(pid);
663 internal(memberInfo).role = role;
664 const payload = {
665 member: pid,
666 memberInfo,
667 updatedBy: initBy,
668 };
669 /**
670 * 有成员的对话信息被更新
671 * @event IMClient#MEMBER_INFO_UPDATED
672 * @param {Object} payload
673 * @param {String} payload.member 被更新对话信息的成员 id
674 * @param {ConversationMumberInfo} payload.memberInfo 被更新的成员对话信息
675 * @param {String} payload.updatedBy 该操作的发起者 id
676 * @param {ConversationBase} conversation
677 */
678 this.emit(MEMBER_INFO_UPDATED, payload, conversation);
679 /**
680 * 有成员的对话信息被更新
681 * @event ConversationBase#MEMBER_INFO_UPDATED
682 * @param {Object} payload
683 * @param {String} payload.member 被更新对话信息的成员 id
684 * @param {ConversationMumberInfo} payload.memberInfo 被更新的成员对话信息
685 * @param {String} payload.updatedBy 该操作的发起者 id
686 */
687 conversation.emit(MEMBER_INFO_UPDATED, payload);
688 return;
689 }
690 case OpType.updated: {
691 const attributes = decode(JSON.parse(attr.data));
692 conversation._updateServerAttributes(attributes);
693 const payload = {
694 attributes,
695 updatedBy: initBy,
696 };
697 /**
698 * 该对话信息被更新
699 * @event IMClient#CONVERSATION_INFO_UPDATED
700 * @param {Object} payload
701 * @param {Object} payload.attributes 被更新的属性
702 * @param {String} payload.updatedBy 该操作的发起者 id
703 * @param {ConversationBase} conversation
704 */
705 this.emit(CONVERSATION_INFO_UPDATED, payload, conversation);
706 /**
707 * 有对话信息被更新
708 * @event ConversationBase#INFO_UPDATED
709 * @param {Object} payload
710 * @param {Object} payload.attributes 被更新的属性
711 * @param {String} payload.updatedBy 该操作的发起者 id
712 */
713 conversation.emit(INFO_UPDATED, payload);
714 return;
715 }
716 default:
717 this.emit(UNHANDLED_MESSAGE, message);
718 throw new Error('Unrecognized conversation command');
719 }
720 }
721
722 _dispatchDirectMessage(originalMessage) {
723 const {
724 directMessage,
725 directMessage: {
726 id,
727 cid,
728 fromPeerId,
729 timestamp,
730 transient,
731 patchTimestamp,
732 mentionPids,
733 mentionAll,
734 binaryMsg,
735 msg,
736 },
737 } = originalMessage;
738 const content = binaryMsg ? binaryMsg.toArrayBuffer() : msg;
739 return Promise.all([
740 this.getConversation(directMessage.cid),
741 this._messageParser.parse(content),
742 ]).then(([conversation, message]) => {
743 // deleted conversation
744 if (!conversation) return undefined;
745 const messageProps = {
746 id,
747 cid,
748 timestamp,
749 updatedAt: patchTimestamp,
750 from: fromPeerId,
751 mentionList: mentionPids,
752 mentionedAll: mentionAll,
753 };
754 Object.assign(message, messageProps);
755 message._updateMentioned(this.id);
756 message._setStatus(MessageStatus.SENT);
757 // filter outgoing message sent from another device
758 if (message.from !== this.id) {
759 if (!(transient || conversation.transient)) {
760 this._sendAck(message);
761 }
762 }
763 return this._dispatchParsedMessage(message, conversation);
764 });
765 }
766
767 _dispatchParsedMessage(message, conversation) {
768 // beforeMessageDispatch hook
769 return applyDispatcher(this._plugins.beforeMessageDispatch, [
770 message,
771 conversation,
772 ]).then(shouldDispatch => {
773 if (shouldDispatch === false) return;
774 conversation.lastMessage = message; // eslint-disable-line no-param-reassign
775 conversation.lastMessageAt = message.timestamp; // eslint-disable-line no-param-reassign
776 // filter outgoing message sent from another device
777 if (message.from !== this.id) {
778 conversation.unreadMessagesCount += 1; // eslint-disable-line no-param-reassign
779 if (message.mentioned) conversation._setUnreadMessagesMentioned(true);
780 }
781 /**
782 * 当前用户收到消息
783 * @event IMClient#MESSAGE
784 * @param {Message} message
785 * @param {ConversationBase} conversation 收到消息的对话
786 */
787 this.emit(MESSAGE, message, conversation);
788 /**
789 * 当前对话收到消息
790 * @event ConversationBase#MESSAGE
791 * @param {Message} message
792 */
793 conversation.emit(MESSAGE, message);
794 });
795 }
796
797 _sendAck(message) {
798 this._debug('send ack for %O', message);
799 const { cid } = message;
800 if (!cid) {
801 throw new Error('missing cid');
802 }
803 if (!this._ackMessageBuffer[cid]) {
804 this._ackMessageBuffer[cid] = [];
805 }
806 this._ackMessageBuffer[cid].push(message);
807 return this._doSendAck();
808 }
809
810 // jsdoc-ignore-start
811 @throttle(1000)
812 // jsdoc-ignore-end
813 _doSendAck() {
814 // if not connected, just skip everything
815 if (!this._connection.is('connected')) return;
816 this._debug('do send ack %O', this._ackMessageBuffer);
817 Promise.all(
818 Object.keys(this._ackMessageBuffer).map(cid => {
819 const convAckMessages = this._ackMessageBuffer[cid];
820 const timestamps = convAckMessages.map(message => message.timestamp);
821 const command = new GenericCommand({
822 cmd: 'ack',
823 peerId: this.id,
824 ackMessage: new AckCommand({
825 cid,
826 fromts: Math.min.apply(null, timestamps),
827 tots: Math.max.apply(null, timestamps),
828 }),
829 });
830 delete this._ackMessageBuffer[cid];
831 return this._send(command, false).catch(error => {
832 this._debug('send ack failed: %O', error);
833 this._ackMessageBuffer[cid] = convAckMessages;
834 });
835 })
836 );
837 }
838
839 _omitPeerId(value) {
840 internal(this).peerIdOmittable = value;
841 }
842
843 _send(cmd, ...args) {
844 const command = cmd;
845 if (!internal(this).peerIdOmittable && this.id) {
846 command.peerId = this.id;
847 }
848 return this._connection.send(command, ...args);
849 }
850
851 async _open(appId, tag, deviceId, isReconnect = false) {
852 this._debug('open session');
853 const {
854 lastUnreadNotifTime,
855 lastPatchTime,
856 lastNotificationTime,
857 } = internal(this);
858 const command = new GenericCommand({
859 cmd: 'session',
860 op: 'open',
861 appId,
862 peerId: this.id,
863 sessionMessage: new SessionCommand({
864 ua: `js/${VERSION}`,
865 r: isReconnect,
866 lastUnreadNotifTime,
867 lastPatchTime,
868 configBitmap,
869 }),
870 });
871 if (!isReconnect) {
872 Object.assign(
873 command.sessionMessage,
874 trim({
875 tag,
876 deviceId,
877 })
878 );
879 if (this.options.signatureFactory) {
880 const signatureResult = await runSignatureFactory(
881 this.options.signatureFactory,
882 [this._identity]
883 );
884 Object.assign(
885 command.sessionMessage,
886 keyRemap(
887 {
888 signature: 's',
889 timestamp: 't',
890 nonce: 'n',
891 },
892 signatureResult
893 )
894 );
895 }
896 } else {
897 const sessionToken = await this._sessionManager.getSessionToken({
898 autoRefresh: false,
899 });
900 if (sessionToken && sessionToken !== Expirable.EXPIRED) {
901 Object.assign(command.sessionMessage, {
902 st: sessionToken,
903 });
904 }
905 }
906 let resCommand;
907 try {
908 resCommand = await this._send(command);
909 } catch (error) {
910 if (error.code === ErrorCode.SESSION_TOKEN_EXPIRED) {
911 if (!this._sessionManager) {
912 // let it fail if sessoinToken not cached but command rejected as token expired
913 // to prevent session openning flood
914 throw new Error('Unexpected session expiration');
915 }
916 debug('Session token expired, reopening');
917 this._sessionManager.revoke();
918 return this._open(appId, tag, deviceId, isReconnect);
919 }
920 throw error;
921 }
922 const {
923 peerId,
924 sessionMessage,
925 sessionMessage: { st: token, stTtl: tokenTTL, code },
926 serverTs,
927 } = resCommand;
928 if (code) {
929 throw createError(sessionMessage);
930 }
931 if (peerId) {
932 this.id = peerId;
933 if (!this._identity) this._identity = peerId;
934 if (token) {
935 this._sessionManager =
936 this._sessionManager || this._createSessionManager();
937 this._sessionManager.setSessionToken(token, tokenTTL);
938 }
939 const serverTime = getTime(decodeDate(serverTs));
940 if (serverTs) {
941 internal(this).lastPatchTime = serverTime;
942 }
943 if (lastNotificationTime) {
944 // Do not await for it as this is failable
945 this._syncNotifications(lastNotificationTime).catch(error =>
946 console.warn('Syncing notifications failed:', error)
947 );
948 } else {
949 // Set timestamp to now for next reconnection
950 internal(this).lastNotificationTime = serverTime;
951 }
952 } else {
953 console.warn('Unexpected session opened without peerId.');
954 }
955 return undefined;
956 }
957
958 async _syncNotifications(timestamp) {
959 const { hasMore, notifications } = await this._fetchNotifications(
960 timestamp
961 );
962 notifications.forEach(notification => {
963 const { cmd, op, serverTs, notificationType, ...payload } = notification;
964 this._dispatchCommand({
965 cmd: CommandType[cmd],
966 op: OpType[op],
967 serverTs,
968 notificationType,
969 [`${cmd}Message`]: payload,
970 });
971 });
972 if (hasMore) {
973 return this._syncNotifications(internal(this).lastNotificationTime);
974 }
975 return undefined;
976 }
977
978 async _fetchNotifications(timestamp) {
979 return this._requestWithSessionToken({
980 method: 'GET',
981 path: '/rtm/notifications',
982 query: {
983 start_ts: timestamp,
984 notification_type: 'permanent',
985 },
986 });
987 }
988
989 _createSessionManager() {
990 debug('create SessionManager');
991 return new SessionManager({
992 onBeforeGetSessionToken: this._connection.checkConnectionAvailability.bind(
993 this._connection
994 ),
995 refresh: (manager, expiredSessionToken) =>
996 manager.setSessionTokenAsync(
997 Promise.resolve(
998 new GenericCommand({
999 cmd: 'session',
1000 op: 'refresh',
1001 sessionMessage: new SessionCommand({
1002 ua: `js/${VERSION}`,
1003 st: expiredSessionToken,
1004 }),
1005 })
1006 )
1007 .then(async command => {
1008 if (this.options.signatureFactory) {
1009 const signatureResult = await runSignatureFactory(
1010 this.options.signatureFactory,
1011 [this._identity]
1012 );
1013 Object.assign(
1014 command.sessionMessage,
1015 keyRemap(
1016 {
1017 signature: 's',
1018 timestamp: 't',
1019 nonce: 'n',
1020 },
1021 signatureResult
1022 )
1023 );
1024 }
1025 return command;
1026 })
1027 .then(this._send.bind(this))
1028 .then(({ sessionMessage: { st: token, stTtl: ttl } }) => [
1029 token,
1030 ttl,
1031 ])
1032 ),
1033 });
1034 }
1035
1036 async _requestWithSessionToken({ headers, query, ...params }) {
1037 const sessionToken = await this._sessionManager.getSessionToken();
1038 return this._request({
1039 headers: {
1040 'X-LC-IM-Session-Token': sessionToken,
1041 ...headers,
1042 },
1043 query: {
1044 client_id: this.id,
1045 ...query,
1046 },
1047 ...params,
1048 });
1049 }
1050
1051 /**
1052 * 关闭客户端
1053 * @return {Promise}
1054 */
1055 async close() {
1056 this._debug('close session');
1057 const _ee = internal(this)._eventemitter;
1058 _ee.emit('beforeclose');
1059 if (this._connection.is('connected')) {
1060 const command = new GenericCommand({
1061 cmd: 'session',
1062 op: 'close',
1063 });
1064 await this._send(command);
1065 }
1066 _ee.emit('close');
1067 this.emit(CLOSE, {
1068 code: 0,
1069 });
1070 }
1071
1072 /**
1073 * 获取 client 列表中在线的 client,每次查询最多 20 个 clientId,超出部分会被忽略
1074 * @param {String[]} clientIds 要查询的 client ids
1075 * @return {Primse.<String[]>} 在线的 client ids
1076 */
1077 async ping(clientIds) {
1078 this._debug('ping');
1079 if (!(clientIds instanceof Array)) {
1080 throw new TypeError(`clientIds ${clientIds} is not an Array`);
1081 }
1082 if (!clientIds.length) {
1083 return Promise.resolve([]);
1084 }
1085 const command = new GenericCommand({
1086 cmd: 'session',
1087 op: 'query',
1088 sessionMessage: new SessionCommand({
1089 sessionPeerIds: clientIds,
1090 }),
1091 });
1092 const resCommand = await this._send(command);
1093 return resCommand.sessionMessage.onlineSessionPeerIds;
1094 }
1095
1096 /**
1097 * 获取某个特定的对话
1098 * @param {String} id 对话 id,对应 _Conversation 表中的 objectId
1099 * @param {Boolean} [noCache=false] 强制不从缓存中获取
1100 * @return {Promise.<ConversationBase>} 如果 id 对应的对话不存在则返回 null
1101 */
1102 async getConversation(id, noCache = false) {
1103 if (typeof id !== 'string') {
1104 throw new TypeError(`${id} is not a String`);
1105 }
1106 if (!noCache) {
1107 const cachedConversation = this._conversationCache.get(id);
1108 if (cachedConversation) {
1109 return cachedConversation;
1110 }
1111 }
1112 if (isTemporaryConversatrionId(id)) {
1113 return (await this._getTemporaryConversations([id]))[0] || null;
1114 }
1115 return this.getQuery()
1116 .equalTo('objectId', id)
1117 .find()
1118 .then(conversations => conversations[0] || null);
1119 }
1120
1121 /**
1122 * 通过 id 批量获取某个特定的对话
1123 * @since 3.4.0
1124 * @param {String[]} ids 对话 id 列表,对应 _Conversation 表中的 objectId
1125 * @param {Boolean} [noCache=false] 强制不从缓存中获取
1126 * @return {Promise.<ConversationBase[]>} 如果 id 对应的对话不存在则返回 null
1127 */
1128 async getConversations(ids, noCache = false) {
1129 const remoteConversationIds = noCache
1130 ? ids
1131 : ids.filter(id => this._conversationCache.get(id) === null);
1132 if (remoteConversationIds.length) {
1133 const remoteTemporaryConversationIds = remove(
1134 remoteConversationIds,
1135 isTemporaryConversatrionId
1136 );
1137 const query = [];
1138 if (remoteConversationIds.length) {
1139 query.push(
1140 this.getQuery()
1141 .containedIn('objectId', remoteConversationIds)
1142 .limit(999)
1143 .find()
1144 );
1145 }
1146 if (remoteTemporaryConversationIds.length) {
1147 const remoteTemporaryConversationsPromise = remoteTemporaryConversationIds.map(
1148 this._getTemporaryConversations.bind(this)
1149 );
1150 query.push(...remoteTemporaryConversationsPromise);
1151 }
1152 await Promise.all(query);
1153 }
1154 return ids.map(id => this._conversationCache.get(id));
1155 }
1156
1157 async _getTemporaryConversations(ids) {
1158 const command = new GenericCommand({
1159 cmd: 'conv',
1160 op: 'query',
1161 convMessage: new ConvCommand({
1162 tempConvIds: ids,
1163 }),
1164 });
1165 const resCommand = await this._send(command);
1166 return this._handleQueryResults(resCommand);
1167 }
1168
1169 /**
1170 * 构造一个 ConversationQuery 来查询对话
1171 * @return {ConversationQuery.<PersistentConversation>}
1172 */
1173 getQuery() {
1174 return new ConversationQuery(this);
1175 }
1176
1177 /**
1178 * 构造一个 ConversationQuery 来查询聊天室
1179 * @return {ConversationQuery.<ChatRoom>}
1180 */
1181 getChatRoomQuery() {
1182 return this.getQuery().equalTo('tr', true);
1183 }
1184
1185 /**
1186 * 构造一个 ConversationQuery 来查询服务号
1187 * @return {ConversationQuery.<ServiceConversation>}
1188 */
1189 getServiceConversationQuery() {
1190 return this.getQuery().equalTo('sys', true);
1191 }
1192
1193 async _executeQuery(query) {
1194 const queryJSON = query.toJSON();
1195 queryJSON.where = new JsonObjectMessage({
1196 data: JSON.stringify(encode(queryJSON.where)),
1197 });
1198 const command = new GenericCommand({
1199 cmd: 'conv',
1200 op: 'query',
1201 convMessage: new ConvCommand(queryJSON),
1202 });
1203 const resCommand = await this._send(command);
1204 return this._handleQueryResults(resCommand);
1205 }
1206
1207 async _handleQueryResults(resCommand) {
1208 let conversations;
1209 try {
1210 conversations = decode(JSON.parse(resCommand.convMessage.results.data));
1211 } catch (error) {
1212 const commandString = JSON.stringify(trim(resCommand));
1213 throw new Error(
1214 `Parse query result failed: ${error.message}. Command: ${commandString}`
1215 );
1216 }
1217 conversations = await Promise.all(
1218 conversations.map(this._parseConversationFromRawData.bind(this))
1219 );
1220 return conversations.map(this._upsertConversationToCache.bind(this));
1221 }
1222
1223 _upsertConversationToCache(fetchedConversation) {
1224 let conversation = this._conversationCache.get(fetchedConversation.id);
1225 if (!conversation) {
1226 conversation = fetchedConversation;
1227 this._debug('no match, set cache');
1228 this._conversationCache.set(fetchedConversation.id, fetchedConversation);
1229 } else {
1230 this._debug('update cached conversation');
1231 [
1232 'creator',
1233 'createdAt',
1234 'updatedAt',
1235 'lastMessageAt',
1236 'lastMessage',
1237 'mutedMembers',
1238 'members',
1239 '_attributes',
1240 'transient',
1241 'muted',
1242 ].forEach(key => {
1243 const value = fetchedConversation[key];
1244 if (value !== undefined) conversation[key] = value;
1245 });
1246 if (conversation._reset) conversation._reset();
1247 }
1248 return conversation;
1249 }
1250
1251 /**
1252 * 反序列化消息,与 {@link Message#toFullJSON} 相对。
1253 * @param {Object}
1254 * @return {AVMessage} 解析后的消息
1255 * @since 4.0.0
1256 */
1257 async parseMessage({ data, bin = false, ...properties }) {
1258 const content = bin ? decodeBase64(data) : data;
1259 const message = await this._messageParser.parse(content);
1260 Object.assign(message, properties);
1261 message._updateMentioned(this.id);
1262 return message;
1263 }
1264
1265 /**
1266 * 反序列化对话,与 {@link Conversation#toFullJSON} 相对。
1267 * @param {Object}
1268 * @return {ConversationBase} 解析后的对话
1269 * @since 4.0.0
1270 */
1271 async parseConversation({
1272 id,
1273 lastMessageAt,
1274 lastMessage,
1275 lastDeliveredAt,
1276 lastReadAt,
1277 unreadMessagesCount,
1278 members,
1279 mentioned,
1280 ...properties
1281 }) {
1282 const conversationData = {
1283 id,
1284 lastMessageAt,
1285 lastMessage,
1286 lastDeliveredAt,
1287 lastReadAt,
1288 unreadMessagesCount,
1289 members,
1290 mentioned,
1291 };
1292 if (lastMessage) {
1293 conversationData.lastMessage = await this.parseMessage(lastMessage);
1294 conversationData.lastMessage._setStatus(MessageStatus.SENT);
1295 }
1296 const { transient, system, expiredAt } = properties;
1297 if (transient) return new ChatRoom(conversationData, properties, this);
1298 if (system)
1299 return new ServiceConversation(conversationData, properties, this);
1300 if (expiredAt || isTemporaryConversatrionId(id)) {
1301 return new TemporaryConversation(conversationData, { expiredAt }, this);
1302 }
1303 return new Conversation(conversationData, properties, this);
1304 }
1305
1306 async _parseConversationFromRawData(rawData) {
1307 const data = keyRemap(
1308 {
1309 objectId: 'id',
1310 lm: 'lastMessageAt',
1311 m: 'members',
1312 tr: 'transient',
1313 sys: 'system',
1314 c: 'creator',
1315 mu: 'mutedMembers',
1316 },
1317 rawData
1318 );
1319 if (data.msg) {
1320 data.lastMessage = {
1321 data: data.msg,
1322 bin: data.bin,
1323 from: data.msg_from,
1324 id: data.msg_mid,
1325 timestamp: data.msg_timestamp,
1326 updatedAt: data.patch_timestamp,
1327 };
1328 delete data.lastMessageFrom;
1329 delete data.lastMessageId;
1330 delete data.lastMessageTimestamp;
1331 delete data.lastMessagePatchTimestamp;
1332 }
1333 const { ttl } = data;
1334 if (ttl) data.expiredAt = Date.now() + ttl * 1000;
1335 return this.parseConversation(data);
1336 }
1337
1338 /**
1339 * 创建一个对话
1340 * @param {Object} options 除了下列字段外的其他字段将被视为对话的自定义属性
1341 * @param {String[]} options.members 对话的初始成员列表,默认包含当前 client
1342 * @param {String} [options.name] 对话的名字
1343 * @param {Boolean} [options.unique=true] 唯一对话,当其为 true 时,如果当前已经有相同成员的对话存在则返回该对话,否则会创建新的对话
1344 * @return {Promise.<Conversation>}
1345 */
1346 async createConversation({
1347 members: m,
1348 name,
1349 transient,
1350 unique = true,
1351 _tempConv: tempConv,
1352 _tempConvTTL: tempConvTTL,
1353 ...properties
1354 } = {}) {
1355 if (!(transient || Array.isArray(m))) {
1356 throw new TypeError(`conversation members ${m} is not an array`);
1357 }
1358 let members = new Set(m);
1359 members.add(this.id);
1360 members = Array.from(members).sort();
1361 let attr = properties || {};
1362 if (name) {
1363 if (typeof name !== 'string') {
1364 throw new TypeError(`conversation name ${name} is not a string`);
1365 }
1366 attr.name = name;
1367 }
1368 attr = new JsonObjectMessage({
1369 data: JSON.stringify(encode(attr)),
1370 });
1371
1372 const startCommandJson = {
1373 m: members,
1374 attr,
1375 transient,
1376 unique,
1377 tempConv,
1378 tempConvTTL,
1379 };
1380
1381 const command = new GenericCommand({
1382 cmd: 'conv',
1383 op: 'start',
1384 convMessage: new ConvCommand(startCommandJson),
1385 });
1386
1387 if (this.options.conversationSignatureFactory) {
1388 const params = [null, this._identity, members, 'create'];
1389 const signatureResult = await runSignatureFactory(
1390 this.options.conversationSignatureFactory,
1391 params
1392 );
1393 Object.assign(
1394 command.convMessage,
1395 keyRemap(
1396 {
1397 signature: 's',
1398 timestamp: 't',
1399 nonce: 'n',
1400 },
1401 signatureResult
1402 )
1403 );
1404 }
1405
1406 const {
1407 convMessage: { cid, cdate, tempConvTTL: ttl },
1408 } = await this._send(command);
1409 const data = {
1410 name,
1411 transient,
1412 unique,
1413 id: cid,
1414 createdAt: cdate,
1415 updatedAt: cdate,
1416 lastMessageAt: null,
1417 creator: this.id,
1418 members: transient ? [] : members,
1419 ...properties,
1420 };
1421 if (ttl) data.expiredAt = Date.now() + ttl * 1000;
1422 const conversation = await this.parseConversation(data);
1423 return this._upsertConversationToCache(conversation);
1424 }
1425
1426 /**
1427 * 创建一个聊天室
1428 * @since 4.0.0
1429 * @param {Object} options 除了下列字段外的其他字段将被视为对话的自定义属性
1430 * @param {String} [options.name] 对话的名字
1431 * @return {Promise.<ChatRoom>}
1432 */
1433 async createChatRoom(param) {
1434 return this.createConversation({
1435 ...param,
1436 transient: true,
1437 members: null,
1438 unique: false,
1439 _tempConv: false,
1440 });
1441 }
1442
1443 /**
1444 * 创建一个临时对话
1445 * @since 4.0.0
1446 * @param {Object} options
1447 * @param {String[]} options.members 对话的初始成员列表,默认包含当前 client
1448 * @param {String} [options.ttl] 对话存在时间,单位为秒,最大值与默认值均为 86400(一天),过期后该对话不再可用。
1449 * @return {Promise.<TemporaryConversation>}
1450 */
1451 async createTemporaryConversation({ ttl: _tempConvTTL, ...param }) {
1452 return this.createConversation({
1453 ...param,
1454 transient: false,
1455 unique: false,
1456 _tempConv: true,
1457 _tempConvTTL,
1458 });
1459 }
1460
1461 // jsdoc-ignore-start
1462 @throttle(1000)
1463 // jsdoc-ignore-end
1464 _doSendRead() {
1465 // if not connected, just skip everything
1466 if (!this._connection.is('connected')) return;
1467 const buffer = internal(this).readConversationsBuffer;
1468 const conversations = Array.from(buffer);
1469 if (!conversations.length) return;
1470 const ids = conversations.map(conversation => {
1471 if (!(conversation instanceof ConversationBase)) {
1472 throw new TypeError(`${conversation} is not a Conversation`);
1473 }
1474 return conversation.id;
1475 });
1476 this._debug(`mark [${ids}] as read`);
1477 buffer.clear();
1478 this._sendReadCommand(conversations).catch(error => {
1479 this._debug('send read failed: %O', error);
1480 conversations.forEach(buffer.add.bind(buffer));
1481 });
1482 }
1483
1484 _sendReadCommand(conversations) {
1485 return this._send(
1486 new GenericCommand({
1487 cmd: 'read',
1488 readMessage: new ReadCommand({
1489 convs: conversations.map(
1490 conversation =>
1491 new ReadTuple({
1492 cid: conversation.id,
1493 mid:
1494 conversation.lastMessage &&
1495 conversation.lastMessage.from !== this.id
1496 ? conversation.lastMessage.id
1497 : undefined,
1498 timestamp: (conversation.lastMessageAt || new Date()).getTime(),
1499 })
1500 ),
1501 }),
1502 }),
1503 false
1504 );
1505 }
1506}
1507
1508/**
1509 * 修改、撤回消息的原因
1510 * @typedef PatchReason
1511 * @type {Object}
1512 * @property {number} code 负数为内置 code,正数为开发者在 hook 中自定义的 code。比如因为敏感词过滤被修改的 code 为 -4408。
1513 * @property {string} [detail] 具体的原因说明。
1514 */