1 | 'use strict'
|
2 |
|
3 | const Promise = require('bluebird')
|
4 | const _ = require('lodash')
|
5 | const eventToPromise = require('event-to-promise')
|
6 | const { asyncLimit } = require('./utils')
|
7 |
|
8 | const co = Promise.coroutine
|
9 |
|
10 |
|
11 | class UserAssociations {
|
12 |
|
13 | constructor (props) {
|
14 | _.defaults(this, props)
|
15 | }
|
16 |
|
17 | userJoinRoomReport (userName, roomName) {
|
18 | this.transport.emitToChannel(roomName, 'roomUserJoined', roomName, userName)
|
19 | }
|
20 |
|
21 | userLeftRoomReport (userName, roomName, enableUserlistUpdates) {
|
22 | if (enableUserlistUpdates) {
|
23 | this.transport.emitToChannel(roomName, 'roomUserLeft', roomName, userName)
|
24 | }
|
25 | }
|
26 |
|
27 | userRemovedReport (userName, roomName, enableUserlistUpdates) {
|
28 | let cn = this.echoChannel
|
29 | this.transport.emitToChannel(cn, 'roomAccessRemoved', roomName)
|
30 | this.userLeftRoomReport(userName, roomName, enableUserlistUpdates)
|
31 | }
|
32 |
|
33 | socketJoinEcho (id, roomName, njoined, isLocalCall) {
|
34 | if (isLocalCall) {
|
35 | this.transport.emitToChannel(
|
36 | this.echoChannel, 'roomJoinedEcho', roomName, id, njoined)
|
37 | } else {
|
38 | this.transport.sendToChannel(
|
39 | id, this.echoChannel, 'roomJoinedEcho', roomName, id, njoined)
|
40 | }
|
41 | }
|
42 |
|
43 | socketLeftEcho (id, roomName, njoined, isLocalCall) {
|
44 | if (isLocalCall) {
|
45 | this.transport.emitToChannel(
|
46 | this.echoChannel, 'roomLeftEcho', roomName, id, njoined)
|
47 | } else {
|
48 | this.transport.sendToChannel(
|
49 | id, this.echoChannel, 'roomLeftEcho', roomName, id, njoined)
|
50 | }
|
51 | }
|
52 |
|
53 | socketConnectEcho (id, nconnected) {
|
54 | this.transport.sendToChannel(
|
55 | id, this.echoChannel, 'socketConnectEcho', id, nconnected)
|
56 | }
|
57 |
|
58 | socketDisconnectEcho (id, nconnected) {
|
59 | this.transport.sendToChannel(
|
60 | id, this.echoChannel, 'socketDisconnectEcho', id, nconnected)
|
61 | }
|
62 |
|
63 | leaveChannel (id, channel) {
|
64 | return this.transport.leaveChannel(id, channel).catch(e => {
|
65 | let info = { roomName: channel, id, opType: 'transportChannel' }
|
66 | return this.consistencyFailure(e, info)
|
67 | })
|
68 | }
|
69 |
|
70 | socketLeaveChannels (id, channels) {
|
71 | return Promise.map(
|
72 | channels,
|
73 | channel => this.leaveChannel(id, channel),
|
74 | { concurrency: asyncLimit })
|
75 | }
|
76 |
|
77 | leaveChannelMessage (id, channel) {
|
78 | let bus = this.clusterBus
|
79 | return Promise.try(() => {
|
80 | bus.emit('roomLeaveSocket', id, channel)
|
81 | return eventToPromise(bus, bus.makeSocketRoomLeftName(id, channel))
|
82 | }).timeout(this.busAckTimeout).catchReturn()
|
83 | }
|
84 |
|
85 | channelLeaveSockets (channel, ids) {
|
86 | return Promise.map(
|
87 | ids,
|
88 | id => this.leaveChannelMessage(id, channel),
|
89 | { concurrency: asyncLimit })
|
90 | }
|
91 |
|
92 | rollbackRoomJoin (error, roomName, id) {
|
93 | return this.userState.removeSocketFromRoom(id, roomName).catch(e => {
|
94 | this.consistencyFailure(e, { roomName, opType: 'userRooms' })
|
95 | return [1]
|
96 | }).spread(njoined => {
|
97 | if (!njoined) {
|
98 | return this.leaveRoom(roomName)
|
99 | } else {
|
100 | return Promise.resolve()
|
101 | }
|
102 | }).thenThrow(error)
|
103 | }
|
104 |
|
105 | leaveRoom (roomName) {
|
106 | return Promise
|
107 | .try(() => this.state.getRoom(roomName))
|
108 | .then(room => room.leave(this.userName)
|
109 | .catch(error => this.consistencyFailure(
|
110 | error, {roomName, opType: 'roomUserlist'})))
|
111 | .catchReturn()
|
112 | }
|
113 |
|
114 | getNotifySettings (roomName) {
|
115 | return Promise
|
116 | .try(() => this.state.getRoom(roomName))
|
117 | .then(room => room.getNotificationsInfo(null, true))
|
118 | .catchReturn({})
|
119 | }
|
120 |
|
121 | joinSocketToRoom (id, roomName, isLocalCall) {
|
122 | let lock = this.userState.lockToRoom(roomName, this.lockTTL)
|
123 | return Promise.using(lock, co(function * () {
|
124 | let room = yield this.state.getRoom(roomName)
|
125 | yield room.join(this.userName)
|
126 | let enableUserlistUpdates = yield room.roomState.userlistUpdatesGet()
|
127 | return this.userState.addSocketToRoom(id, roomName).then(njoined => {
|
128 | return this.transport.joinChannel(id, roomName).then(() => {
|
129 | if (njoined === 1 && enableUserlistUpdates) {
|
130 | this.userJoinRoomReport(this.userName, roomName)
|
131 | }
|
132 | return this.socketJoinEcho(id, roomName, njoined, isLocalCall)
|
133 | }).return(njoined)
|
134 | }).catch(e => this.rollbackRoomJoin(e, roomName, id))
|
135 | }).bind(this))
|
136 | }
|
137 |
|
138 | leaveSocketFromRoom (id, roomName, isLocalCall) {
|
139 | let lock = this.userState.lockToRoom(roomName, this.lockTTL)
|
140 | return Promise.using(lock, co(function * () {
|
141 | let [njoined, hasChanged] =
|
142 | yield this.userState.removeSocketFromRoom(id, roomName)
|
143 | yield this.leaveChannel(id, roomName)
|
144 | if (njoined === 0) {
|
145 | yield this.leaveRoom(roomName)
|
146 | }
|
147 | if (hasChanged) {
|
148 | let { enableUserlistUpdates } = yield this.getNotifySettings(roomName)
|
149 | this.socketLeftEcho(id, roomName, njoined, isLocalCall)
|
150 | this.userLeftRoomReport(this.userName, roomName, enableUserlistUpdates)
|
151 | }
|
152 | return Promise.resolve(njoined)
|
153 | }).bind(this))
|
154 | }
|
155 |
|
156 | removeUserSocket (id) {
|
157 | return this.userState.removeSocket(id)
|
158 | .spread((roomsRemoved, joinedSockets, nconnected) => {
|
159 | roomsRemoved = roomsRemoved || []
|
160 | joinedSockets = joinedSockets || []
|
161 | nconnected = nconnected || 0
|
162 | return this.socketLeaveChannels(id, roomsRemoved).then(() => {
|
163 | return Promise.map(
|
164 | roomsRemoved,
|
165 | (roomName, idx) => {
|
166 | let njoined = joinedSockets[idx]
|
167 | this.socketLeftEcho(id, roomName, njoined)
|
168 | if (njoined) { return Promise.resolve() }
|
169 | return this.leaveRoom(roomName)
|
170 | .then(() => this.getNotifySettings(roomName))
|
171 | .then(({enableUserlistUpdates}) => this.userLeftRoomReport(
|
172 | this.userName, roomName, enableUserlistUpdates))
|
173 | },
|
174 | { concurrency: asyncLimit })
|
175 | .then(() => this.socketDisconnectEcho(id, nconnected))
|
176 | })
|
177 | }).then(() => this.state.removeSocket(id))
|
178 | }
|
179 |
|
180 | removeSocketFromServer (id) {
|
181 | return this.removeUserSocket(id).catch(e => {
|
182 | let info = { id, opType: 'userSockets' }
|
183 | return this.consistencyFailure(e, info)
|
184 | })
|
185 | }
|
186 |
|
187 | removeUserSocketsFromRoom (roomName) {
|
188 | return this.userState.removeAllSocketsFromRoom(roomName).catch(e => {
|
189 | let info = { roomName, opType: 'roomUserlist' }
|
190 | return this.consistencyFailure(e, info)
|
191 | })
|
192 | }
|
193 |
|
194 | removeFromRoom (roomName) {
|
195 | let lock = this.userState.lockToRoom(roomName, this.lockTTL)
|
196 | return Promise.using(lock, co(function * () {
|
197 | let removedSockets = yield this.removeUserSocketsFromRoom(roomName)
|
198 | removedSockets = removedSockets || []
|
199 | yield this.channelLeaveSockets(roomName, removedSockets)
|
200 | if (removedSockets.length) {
|
201 | let { enableUserlistUpdates } = yield this.getNotifySettings(roomName)
|
202 | this.userRemovedReport(this.userName, roomName, enableUserlistUpdates)
|
203 | }
|
204 | return this.leaveRoom(roomName)
|
205 | }).bind(this))
|
206 | }
|
207 |
|
208 | removeRoomUsers (roomName, userNames) {
|
209 | userNames = userNames || []
|
210 | return Promise.map(
|
211 | userNames,
|
212 | userName => {
|
213 | return this.state.getUser(userName)
|
214 | .then(user => user.removeFromRoom(roomName))
|
215 | .catchReturn()
|
216 | },
|
217 | { concurrency: asyncLimit })
|
218 | }
|
219 |
|
220 | }
|
221 |
|
222 | module.exports = UserAssociations
|