UNPKG

15.3 kBJavaScriptView Raw
1'use strict'
2
3const ChatServiceError = require('./ChatServiceError')
4const FastMap = require('collections/fast-map')
5const FastSet = require('collections/fast-set')
6const List = require('collections/list')
7const Promise = require('bluebird')
8const Room = require('./Room')
9const User = require('./User')
10const _ = require('lodash')
11const promiseRetry = require('promise-retry')
12const uid = require('uid-safe')
13const { mixin } = require('es6-mixin')
14
15function initState (state, values) {
16 state.clear()
17 if (!values) {
18 return Promise.resolve()
19 } else {
20 return state.addEach(values)
21 }
22}
23
24// Memory lock operations.
25// @mixin
26class LockOperations {
27
28 constructor (locks) {
29 this.locks = locks
30 }
31
32 lock (key, val, ttl) {
33 return promiseRetry(
34 {minTimeout: 100, retries: 10, factor: 1.5, randomize: true},
35 (retry, n) => {
36 if (this.locks.has(key)) {
37 let err = new ChatServiceError('timeout')
38 return retry(err)
39 } else {
40 this.locks.set(key, val)
41 return Promise.resolve()
42 }
43 }
44 )
45 }
46
47 unlock (key, val) {
48 let currentVal = this.locks.get(key)
49 if (currentVal === val) {
50 this.locks.delete(key)
51 }
52 return Promise.resolve()
53 }
54
55}
56
57// Implements state API lists management.
58class ListsStateMemory {
59
60 checkList (listName, num, limit) {
61 if (!this.hasList(listName)) {
62 let error = new ChatServiceError('noList', listName)
63 return Promise.reject(error)
64 }
65 if (listName === 'userlist') {
66 return Promise.resolve()
67 }
68 if (this[listName].length + num > limit) {
69 let error = new ChatServiceError('listLimitExceeded', listName)
70 return Promise.reject(error)
71 } else {
72 return Promise.resolve()
73 }
74 }
75
76 addToList (listName, elems, limit) {
77 let num = elems.length
78 return this.checkList(listName, num, limit).then(() => {
79 this[listName].addEach(elems)
80 return Promise.resolve()
81 })
82 }
83
84 removeFromList (listName, elems) {
85 return this.checkList(listName).then(() => {
86 this[listName].deleteEach(elems)
87 return Promise.resolve()
88 })
89 }
90
91 getList (listName) {
92 return this.checkList(listName).then(() => {
93 let data = this[listName].toArray()
94 return Promise.resolve(data)
95 })
96 }
97
98 hasInList (listName, elem) {
99 return this.checkList(listName).then(() => {
100 let data = this[listName].has(elem)
101 data = Boolean(data)
102 return Promise.resolve(data)
103 })
104 }
105
106 whitelistOnlySet (mode) {
107 this.whitelistOnly = Boolean(mode)
108 return Promise.resolve()
109 }
110
111 whitelistOnlyGet () {
112 return Promise.resolve(this.whitelistOnly)
113 }
114
115}
116
117// Implements room state API.
118class RoomStateMemory extends ListsStateMemory {
119
120 constructor (server, name) {
121 super()
122 this.server = server
123 this.name = name
124 this.historyMaxGetMessages = this.server.historyMaxGetMessages
125 this.historyMaxSize = this.server.historyMaxSize
126 this.whitelist = new FastSet()
127 this.blacklist = new FastSet()
128 this.adminlist = new FastSet()
129 this.userlist = new FastSet()
130 this.messagesHistory = new List()
131 this.messagesTimestamps = new List()
132 this.messagesIds = new List()
133 this.usersseen = new FastMap()
134 this.lastMessageId = 0
135 this.whitelistOnly = false
136 this.owner = null
137 }
138
139 initState (state) {
140 state = state || {}
141 let { whitelist, blacklist, adminlist,
142 whitelistOnly, owner, historyMaxSize,
143 enableAccessListsUpdates = this.server.enableAccessListsUpdates,
144 enableUserlistUpdates = this.server.enableUserlistUpdates
145 } = state
146 initState(this.whitelist, whitelist)
147 initState(this.blacklist, blacklist)
148 initState(this.adminlist, adminlist)
149 initState(this.messagesHistory)
150 initState(this.messagesTimestamps)
151 initState(this.messagesIds)
152 initState(this.usersseen)
153 this.whitelistOnly = Boolean(whitelistOnly)
154 this.enableAccessListsUpdates = Boolean(enableAccessListsUpdates)
155 this.enableUserlistUpdates = Boolean(enableUserlistUpdates)
156 this.owner = owner || null
157 return this.historyMaxSizeSet(historyMaxSize)
158 }
159
160 removeState () {
161 return Promise.resolve()
162 }
163
164 startRemoving () {
165 return Promise.resolve()
166 }
167
168 hasList (listName) {
169 return listName === 'adminlist' || listName === 'whitelist' ||
170 listName === 'blacklist' || listName === 'userlist'
171 }
172
173 ownerGet () {
174 return Promise.resolve(this.owner)
175 }
176
177 ownerSet (owner) {
178 this.owner = owner
179 return Promise.resolve()
180 }
181
182 accessListsUpdatesSet (enableAccessListsUpdates) {
183 this.enableAccessListsUpdates = Boolean(enableAccessListsUpdates)
184 return Promise.resolve()
185 }
186
187 accessListsUpdatesGet () {
188 return Promise.resolve(this.enableAccessListsUpdates)
189 }
190
191 userlistUpdatesSet (enableUserlistUpdates) {
192 this.enableUserlistUpdates = Boolean(enableUserlistUpdates)
193 return Promise.resolve()
194 }
195
196 userlistUpdatesGet () {
197 return Promise.resolve(this.enableUserlistUpdates)
198 }
199
200 historyMaxSizeSet (historyMaxSize) {
201 if (_.isNumber(historyMaxSize) && historyMaxSize >= 0) {
202 this.historyMaxSize = historyMaxSize
203 }
204 let limit = this.historyMaxSize
205 this.messagesHistory = new List(this.messagesHistory.slice(0, limit))
206 this.messagesTimestamps = new List(this.messagesTimestamps.slice(0, limit))
207 this.messagesIds = new List(this.messagesIds.slice(0, limit))
208 return Promise.resolve()
209 }
210
211 historyInfo () {
212 let historySize = this.messagesHistory.length
213 let info = {
214 historySize,
215 historyMaxSize: this.historyMaxSize,
216 historyMaxGetMessages: this.historyMaxGetMessages,
217 lastMessageId: this.lastMessageId
218 }
219 return Promise.resolve(info)
220 }
221
222 getCommonUsers () {
223 let nonWL = this.userlist.difference(this.whitelist)
224 let nonAdmin = nonWL.difference(this.adminlist)
225 let data = nonAdmin.toArray()
226 return Promise.resolve(data)
227 }
228
229 messageAdd (msg) {
230 let timestamp = _.now()
231 this.lastMessageId++
232 let makeResult = () => {
233 msg.timestamp = timestamp
234 msg.id = this.lastMessageId
235 return Promise.resolve(msg)
236 }
237 if (this.historyMaxSize <= 0) {
238 return makeResult()
239 }
240 this.messagesHistory.unshift(msg)
241 this.messagesTimestamps.unshift(timestamp)
242 this.messagesIds.unshift(this.lastMessageId)
243 if (this.messagesHistory.length > this.historyMaxSize) {
244 this.messagesHistory.pop()
245 this.messagesTimestamps.pop()
246 this.messagesIds.pop()
247 }
248 return makeResult()
249 }
250
251 messagesGetRecent () {
252 let msgs = this.messagesHistory.slice(0, this.historyMaxGetMessages)
253 let tss = this.messagesTimestamps.slice(0, this.historyMaxGetMessages)
254 let ids = this.messagesIds.slice(0, this.historyMaxGetMessages)
255 let data = []
256 for (let idx = 0; idx < msgs.length; idx++) {
257 let msg = msgs[idx]
258 let obj = _.cloneDeep(msg)
259 obj.timestamp = tss[idx]
260 obj.id = ids[idx]
261 data[idx] = obj
262 }
263 return Promise.resolve(data)
264 }
265
266 messagesGet (id, maxMessages = this.historyMaxGetMessages) {
267 if (maxMessages <= 0) { return Promise.resolve([]) }
268 id = _.max([0, id])
269 let lastid = this.lastMessageId
270 id = _.min([ id, lastid ])
271 let end = lastid - id
272 let len = _.min([ maxMessages, end ])
273 let start = _.max([ 0, end - len ])
274 let msgs = this.messagesHistory.slice(start, end)
275 let tss = this.messagesTimestamps.slice(start, end)
276 let ids = this.messagesIds.slice(start, end)
277 let data = []
278 for (let idx = 0; idx < msgs.length; idx++) {
279 let msg = msgs[idx]
280 let obj = _.cloneDeep(msg)
281 msg.timestamp = tss[idx]
282 msg.id = ids[idx]
283 data[idx] = obj
284 }
285 return Promise.resolve(msgs)
286 }
287
288 userSeenGet (userName) {
289 let joined = Boolean(this.userlist.get(userName))
290 let timestamp = this.usersseen.get(userName) || null
291 return Promise.resolve({joined, timestamp})
292 }
293
294 userSeenUpdate (userName) {
295 let timestamp = _.now()
296 this.usersseen.set(userName, timestamp)
297 return Promise.resolve()
298 }
299
300}
301
302// Implements direct messaging state API.
303class DirectMessagingStateMemory extends ListsStateMemory {
304
305 constructor (server, userName) {
306 super()
307 this.server = server
308 this.userName = userName
309 this.whitelistOnly = false
310 this.whitelist = new FastSet()
311 this.blacklist = new FastSet()
312 }
313
314 initState ({ whitelist, blacklist, whitelistOnly }) {
315 initState(this.whitelist, whitelist)
316 initState(this.blacklist, blacklist)
317 this.whitelistOnly = Boolean(whitelistOnly)
318 return Promise.resolve()
319 }
320
321 removeState () {
322 return Promise.resolve()
323 }
324
325 hasList (listName) {
326 return listName === 'whitelist' || listName === 'blacklist'
327 }
328
329}
330
331// Implements user state API.
332class UserStateMemory {
333
334 constructor (server, userName) {
335 this.server = server
336 this.userName = userName
337 this.socketsToRooms = new FastMap()
338 this.socketsToInstances = new FastMap()
339 this.roomsToSockets = new FastMap()
340 this.locks = new FastMap()
341 mixin(this, LockOperations, this.locks)
342 }
343
344 addSocket (id, uid) {
345 let roomsset = new FastSet()
346 this.socketsToRooms.set(id, roomsset)
347 this.socketsToInstances.set(id, uid)
348 let nconnected = this.socketsToRooms.length
349 return Promise.resolve(nconnected)
350 }
351
352 getAllSockets () {
353 let sockets = this.socketsToRooms.keysArray()
354 return Promise.resolve(sockets)
355 }
356
357 getSocketsToInstance () {
358 let data = this.socketsToInstances.toObject()
359 return Promise.resolve(data)
360 }
361
362 getRoomToSockets (roomName) {
363 let socketsset = this.roomsToSockets.get(roomName)
364 let data = (socketsset && socketsset.toObject()) || {}
365 return Promise.resolve(data)
366 }
367
368 getSocketsToRooms () {
369 let result = {}
370 let sockets = this.socketsToRooms.keysArray()
371 for (let id of sockets) {
372 let socketsset = this.socketsToRooms.get(id)
373 result[id] = (socketsset && socketsset.toArray()) || []
374 }
375 return Promise.resolve(result)
376 }
377
378 addSocketToRoom (id, roomName) {
379 let roomsset = this.socketsToRooms.get(id)
380 let socketsset = this.roomsToSockets.get(roomName)
381 if (!socketsset) {
382 socketsset = new FastSet()
383 this.roomsToSockets.set(roomName, socketsset)
384 }
385 roomsset.add(roomName)
386 socketsset.add(id)
387 let njoined = socketsset.length
388 return Promise.resolve(njoined)
389 }
390
391 removeSocketFromRoom (id, roomName) {
392 let roomsset = this.socketsToRooms.get(id)
393 let socketsset = this.roomsToSockets.get(roomName)
394 let wasjoined = (socketsset && socketsset.length) || 0
395 if (roomsset) {
396 roomsset.delete(roomName)
397 }
398 if (socketsset) {
399 socketsset.delete(id)
400 }
401 let njoined = 0
402 if (wasjoined > 0) {
403 njoined = socketsset.length
404 }
405 let hasChanged = njoined !== wasjoined
406 return Promise.resolve([njoined, hasChanged])
407 }
408
409 removeAllSocketsFromRoom (roomName) {
410 let sockets = this.socketsToRooms.keysArray()
411 let socketsset = this.roomsToSockets.get(roomName)
412 let removedSockets = (socketsset && socketsset.toArray()) || []
413 for (let id of removedSockets) {
414 let roomsset = this.socketsToRooms.get(id)
415 roomsset.delete(roomName)
416 }
417 if (socketsset) {
418 socketsset = socketsset.difference(sockets)
419 this.roomsToSockets.set(roomName, socketsset)
420 }
421 return Promise.resolve(removedSockets)
422 }
423
424 removeSocket (id) {
425 let roomsset = this.socketsToRooms.get(id)
426 let removedRooms = (roomsset && roomsset.toArray()) || []
427 let joinedSockets = []
428 for (let idx = 0; idx < removedRooms.length; idx++) {
429 let roomName = removedRooms[idx]
430 let socketsset = this.roomsToSockets.get(roomName)
431 socketsset.delete(id)
432 let njoined = socketsset.length
433 joinedSockets[idx] = njoined
434 }
435 this.socketsToRooms.delete(id)
436 this.socketsToInstances.delete(id)
437 let nconnected = this.socketsToRooms.length
438 return Promise.resolve([ removedRooms, joinedSockets, nconnected ])
439 }
440
441 lockToRoom (roomName, ttl) {
442 return uid(18).then(val => {
443 let start = _.now()
444 return this.lock(roomName, val, ttl).then(() => {
445 return Promise.resolve().disposer(() => {
446 if (start + ttl < _.now()) {
447 this.server.emit(
448 'lockTimeExceeded', val, {userName: this.userName, roomName})
449 }
450 return this.unlock(roomName, val)
451 })
452 })
453 })
454 }
455
456}
457
458// Implements global state API.
459class MemoryState {
460
461 constructor (server, options) {
462 this.server = server
463 this.options = options
464 this.closed = false
465 this.users = new FastMap()
466 this.rooms = new FastMap()
467 this.sockets = new FastMap()
468 this.RoomState = RoomStateMemory
469 this.UserState = UserStateMemory
470 this.DirectMessagingState = DirectMessagingStateMemory
471 this.instanceUID = this.server.instanceUID
472 this.heartbeatStamp = null
473 this.lockTTL = this.options.lockTTL || 5000
474 }
475
476 close () {
477 this.closed = true
478 return Promise.resolve()
479 }
480
481 getRoom (name, isPredicate = false) {
482 let room = this.rooms.get(name)
483 if (room) { return Promise.resolve(room) }
484 if (isPredicate) {
485 return Promise.resolve(null)
486 } else {
487 let error = new ChatServiceError('noRoom', name)
488 return Promise.reject(error)
489 }
490 }
491
492 addRoom (name, state) {
493 let room = new Room(this.server, name)
494 if (!this.rooms.get(name)) {
495 this.rooms.set(name, room)
496 } else {
497 let error = new ChatServiceError('roomExists', name)
498 return Promise.reject(error)
499 }
500 return room.initState(state).return(room)
501 }
502
503 removeRoom (name) {
504 this.rooms.delete(name)
505 return Promise.resolve()
506 }
507
508 addSocket (id, userName) {
509 this.sockets.set(id, userName)
510 return Promise.resolve()
511 }
512
513 removeSocket (id) {
514 this.sockets.delete(id)
515 return Promise.resolve()
516 }
517
518 getInstanceSockets (uid = this.instanceUID) {
519 return Promise.resolve(this.sockets.toObject())
520 }
521
522 updateHeartbeat () {
523 this.heartbeatStamp = _.now()
524 return Promise.resolve()
525 }
526
527 getInstanceHeartbeat (uid = this.instanceUID) {
528 if (uid !== this.instanceUID) { return Promise.resolve(null) }
529 return Promise.resolve(this.heartbeatStamp)
530 }
531
532 getOrAddUser (name, state) {
533 let user = this.users.get(name)
534 if (user) { return Promise.resolve(user) }
535 return this.addUser(name, state)
536 }
537
538 getUser (name, isPredicate = false) {
539 let user = this.users.get(name)
540 if (user) { return Promise.resolve(user) }
541 if (isPredicate) {
542 return Promise.resolve(null)
543 } else {
544 let error = new ChatServiceError('noUser', name)
545 return Promise.reject(error)
546 }
547 }
548
549 addUser (name, state) {
550 let user = this.users.get(name)
551 if (user) {
552 let error = new ChatServiceError('userExists', name)
553 return Promise.reject(error)
554 }
555 user = new User(this.server, name)
556 this.users.set(name, user)
557 if (state) {
558 return user.initState(state).return(user)
559 } else {
560 return Promise.resolve(user)
561 }
562 }
563
564 removeUser (name) {
565 this.users.delete(name)
566 return Promise.resolve()
567 }
568}
569
570module.exports = MemoryState