UNPKG

7.31 kBJavaScriptView Raw
1'use strict'
2
3const Promise = require('bluebird')
4const _ = require('lodash')
5const eventToPromise = require('event-to-promise')
6const { asyncLimit } = require('./utils')
7
8const co = Promise.coroutine
9
10// Associations for user class.
11class UserAssociations {
12
13 constructor (props) {
14 _.defaults(this, props)
15 }
16
17 userJoinRoomReport (userName, roomName) {
18 this.transport.emitToChannel(roomName, 'roomUserJoined', roomName, userName)
19 }
20
21 userLeftRoomReport (userName, roomName, enableUserlistUpdates) {
22 if (enableUserlistUpdates) {
23 this.transport.emitToChannel(roomName, 'roomUserLeft', roomName, userName)
24 }
25 }
26
27 userRemovedReport (userName, roomName, enableUserlistUpdates) {
28 let cn = this.echoChannel
29 this.transport.emitToChannel(cn, 'roomAccessRemoved', roomName)
30 this.userLeftRoomReport(userName, roomName, enableUserlistUpdates)
31 }
32
33 socketJoinEcho (id, roomName, njoined, isLocalCall) {
34 if (isLocalCall) {
35 this.transport.emitToChannel(
36 this.echoChannel, 'roomJoinedEcho', roomName, id, njoined)
37 } else {
38 this.transport.sendToChannel(
39 id, this.echoChannel, 'roomJoinedEcho', roomName, id, njoined)
40 }
41 }
42
43 socketLeftEcho (id, roomName, njoined, isLocalCall) {
44 if (isLocalCall) {
45 this.transport.emitToChannel(
46 this.echoChannel, 'roomLeftEcho', roomName, id, njoined)
47 } else {
48 this.transport.sendToChannel(
49 id, this.echoChannel, 'roomLeftEcho', roomName, id, njoined)
50 }
51 }
52
53 socketConnectEcho (id, nconnected) {
54 this.transport.sendToChannel(
55 id, this.echoChannel, 'socketConnectEcho', id, nconnected)
56 }
57
58 socketDisconnectEcho (id, nconnected) {
59 this.transport.sendToChannel(
60 id, this.echoChannel, 'socketDisconnectEcho', id, nconnected)
61 }
62
63 leaveChannel (id, channel) {
64 return this.transport.leaveChannel(id, channel).catch(e => {
65 let info = { roomName: channel, id, opType: 'transportChannel' }
66 return this.consistencyFailure(e, info)
67 })
68 }
69
70 socketLeaveChannels (id, channels) {
71 return Promise.map(
72 channels,
73 channel => this.leaveChannel(id, channel),
74 { concurrency: asyncLimit })
75 }
76
77 leaveChannelMessage (id, channel) {
78 let bus = this.clusterBus
79 return Promise.try(() => {
80 bus.emit('roomLeaveSocket', id, channel)
81 return eventToPromise(bus, bus.makeSocketRoomLeftName(id, channel))
82 }).timeout(this.busAckTimeout).catchReturn()
83 }
84
85 channelLeaveSockets (channel, ids) {
86 return Promise.map(
87 ids,
88 id => this.leaveChannelMessage(id, channel),
89 { concurrency: asyncLimit })
90 }
91
92 rollbackRoomJoin (error, roomName, id) {
93 return this.userState.removeSocketFromRoom(id, roomName).catch(e => {
94 this.consistencyFailure(e, { roomName, opType: 'userRooms' })
95 return [1]
96 }).spread(njoined => {
97 if (!njoined) {
98 return this.leaveRoom(roomName)
99 } else {
100 return Promise.resolve()
101 }
102 }).thenThrow(error)
103 }
104
105 leaveRoom (roomName) {
106 return Promise
107 .try(() => this.state.getRoom(roomName))
108 .then(room => room.leave(this.userName)
109 .catch(error => this.consistencyFailure(
110 error, {roomName, opType: 'roomUserlist'})))
111 .catchReturn()
112 }
113
114 getNotifySettings (roomName) {
115 return Promise
116 .try(() => this.state.getRoom(roomName))
117 .then(room => room.getNotificationsInfo(null, true))
118 .catchReturn({})
119 }
120
121 joinSocketToRoom (id, roomName, isLocalCall) {
122 let lock = this.userState.lockToRoom(roomName, this.lockTTL)
123 return Promise.using(lock, co(function * () {
124 let room = yield this.state.getRoom(roomName)
125 yield room.join(this.userName)
126 let enableUserlistUpdates = yield room.roomState.userlistUpdatesGet()
127 return this.userState.addSocketToRoom(id, roomName).then(njoined => {
128 return this.transport.joinChannel(id, roomName).then(() => {
129 if (njoined === 1 && enableUserlistUpdates) {
130 this.userJoinRoomReport(this.userName, roomName)
131 }
132 return this.socketJoinEcho(id, roomName, njoined, isLocalCall)
133 }).return(njoined)
134 }).catch(e => this.rollbackRoomJoin(e, roomName, id))
135 }).bind(this))
136 }
137
138 leaveSocketFromRoom (id, roomName, isLocalCall) {
139 let lock = this.userState.lockToRoom(roomName, this.lockTTL)
140 return Promise.using(lock, co(function * () {
141 let [njoined, hasChanged] =
142 yield this.userState.removeSocketFromRoom(id, roomName)
143 yield this.leaveChannel(id, roomName)
144 if (njoined === 0) {
145 yield this.leaveRoom(roomName)
146 }
147 if (hasChanged) {
148 let { enableUserlistUpdates } = yield this.getNotifySettings(roomName)
149 this.socketLeftEcho(id, roomName, njoined, isLocalCall)
150 this.userLeftRoomReport(this.userName, roomName, enableUserlistUpdates)
151 }
152 return Promise.resolve(njoined)
153 }).bind(this))
154 }
155
156 removeUserSocket (id) {
157 return this.userState.removeSocket(id)
158 .spread((roomsRemoved, joinedSockets, nconnected) => {
159 roomsRemoved = roomsRemoved || []
160 joinedSockets = joinedSockets || []
161 nconnected = nconnected || 0
162 return this.socketLeaveChannels(id, roomsRemoved).then(() => {
163 return Promise.map(
164 roomsRemoved,
165 (roomName, idx) => {
166 let njoined = joinedSockets[idx]
167 this.socketLeftEcho(id, roomName, njoined)
168 if (njoined) { return Promise.resolve() }
169 return this.leaveRoom(roomName)
170 .then(() => this.getNotifySettings(roomName))
171 .then(({enableUserlistUpdates}) => this.userLeftRoomReport(
172 this.userName, roomName, enableUserlistUpdates))
173 },
174 { concurrency: asyncLimit })
175 .then(() => this.socketDisconnectEcho(id, nconnected))
176 })
177 }).then(() => this.state.removeSocket(id))
178 }
179
180 removeSocketFromServer (id) {
181 return this.removeUserSocket(id).catch(e => {
182 let info = { id, opType: 'userSockets' }
183 return this.consistencyFailure(e, info)
184 })
185 }
186
187 removeUserSocketsFromRoom (roomName) {
188 return this.userState.removeAllSocketsFromRoom(roomName).catch(e => {
189 let info = { roomName, opType: 'roomUserlist' }
190 return this.consistencyFailure(e, info)
191 })
192 }
193
194 removeFromRoom (roomName) {
195 let lock = this.userState.lockToRoom(roomName, this.lockTTL)
196 return Promise.using(lock, co(function * () {
197 let removedSockets = yield this.removeUserSocketsFromRoom(roomName)
198 removedSockets = removedSockets || []
199 yield this.channelLeaveSockets(roomName, removedSockets)
200 if (removedSockets.length) {
201 let { enableUserlistUpdates } = yield this.getNotifySettings(roomName)
202 this.userRemovedReport(this.userName, roomName, enableUserlistUpdates)
203 }
204 return this.leaveRoom(roomName)
205 }).bind(this))
206 }
207
208 removeRoomUsers (roomName, userNames) {
209 userNames = userNames || []
210 return Promise.map(
211 userNames,
212 userName => {
213 return this.state.getUser(userName)
214 .then(user => user.removeFromRoom(roomName))
215 .catchReturn()
216 },
217 { concurrency: asyncLimit })
218 }
219
220}
221
222module.exports = UserAssociations