1 | 'use strict'
|
2 |
|
3 | const ChatServiceError = require('./ChatServiceError')
|
4 | const Promise = require('bluebird')
|
5 | const Redis = require('ioredis')
|
6 | const Room = require('./Room')
|
7 | const User = require('./User')
|
8 | const _ = require('lodash')
|
9 | const promiseRetry = require('promise-retry')
|
10 | const uid = require('uid-safe')
|
11 | const { mixin } = require('es6-mixin')
|
12 |
|
13 | let namespace = 'chatservice'
|
14 |
|
15 | function initSet (redis, set, values) {
|
16 | return redis.del(set).then(() => {
|
17 | if (!values) {
|
18 | return Promise.resolve()
|
19 | } else {
|
20 | return redis.sadd(set, values)
|
21 | }
|
22 | })
|
23 | }
|
24 |
|
25 |
|
26 | // State init/remove operations.
|
27 | class StateOperations {
|
28 |
|
29 | constructor (name, exitsErrorName, redis, makeKeyName, stateReset) {
|
30 | this.name = name
|
31 | this.exitsErrorName = exitsErrorName
|
32 | this.redis = redis
|
33 | this.makeKeyName = makeKeyName
|
34 | this.stateReset = stateReset
|
35 | }
|
36 |
|
37 | initState (state) {
|
38 | return this.redis.setnx(this.makeKeyName('exists'), true).then(isnew => {
|
39 | if (!isnew) {
|
40 | let error = new ChatServiceError(this.exitsErrorName, this.name)
|
41 | return Promise.reject(error)
|
42 | } else {
|
43 | return Promise.resolve()
|
44 | }
|
45 | }).then(() => this.stateReset(state))
|
46 | .then(() => this.redis.setnx(this.makeKeyName('isInit'), true))
|
47 | }
|
48 |
|
49 | removeState () {
|
50 | return this.stateReset().then(() => {
|
51 | return this.redis.del(
|
52 | this.makeKeyName('exists'), this.makeKeyName('isInit'))
|
53 | })
|
54 | }
|
55 |
|
56 | startRemoving () {
|
57 | return this.redis.del(this.makeKeyName('isInit'))
|
58 | }
|
59 |
|
60 | }
|
61 |
|
62 |
|
63 | class LockOperations {
|
64 |
|
65 | constructor (redis) {
|
66 | this.redis = redis
|
67 | }
|
68 |
|
69 | lock (key, val, ttl) {
|
70 | return promiseRetry(
|
71 | {minTimeout: 100, retries: 10, factor: 1.5, randomize: true},
|
72 | (retry, n) => {
|
73 | return this.redis.set(key, val, 'NX', 'PX', ttl).then(res => {
|
74 | if (!res) {
|
75 | let error = new ChatServiceError('timeout')
|
76 | return retry(error)
|
77 | } else {
|
78 | return null
|
79 | }
|
80 | }).catch(retry)
|
81 | })
|
82 | }
|
83 |
|
84 | unlock (key, val) {
|
85 | return this.redis.unlock(key, val)
|
86 | }
|
87 |
|
88 | }
|
89 |
|
90 |
|
91 | let luaCommands = {
|
92 | unlock: {
|
93 | numberOfKeys: 1,
|
94 | lua: `
|
95 | if redis.call("get",KEYS[1]) == ARGV[1] then
|
96 | return redis.call("del",KEYS[1])
|
97 | else
|
98 | return 0
|
99 | end`
|
100 | },
|
101 |
|
102 | messageAdd: {
|
103 | numberOfKeys: 5,
|
104 | lua: `
|
105 | local msg = ARGV[1]
|
106 | local ts = ARGV[2]
|
107 |
|
108 | local lastMessageId = KEYS[1]
|
109 | local historyMaxSize = KEYS[2]
|
110 | local messagesIds = KEYS[3]
|
111 | local messagesTimestamps = KEYS[4]
|
112 | local messagesHistory = KEYS[5]
|
113 |
|
114 | local id = tonumber(redis.call('INCR', lastMessageId))
|
115 | local maxsz = tonumber(redis.call('GET', historyMaxSize))
|
116 |
|
117 | redis.call('LPUSH', messagesIds, id)
|
118 | redis.call('LPUSH', messagesTimestamps, ts)
|
119 | redis.call('LPUSH', messagesHistory, msg)
|
120 |
|
121 | local sz = tonumber(redis.call('LLEN', messagesHistory))
|
122 |
|
123 | if sz > maxsz then
|
124 | redis.call('RPOP', messagesIds)
|
125 | redis.call('RPOP', messagesTimestamps)
|
126 | redis.call('RPOP', messagesHistory)
|
127 | end
|
128 |
|
129 | return {id}`
|
130 | },
|
131 |
|
132 | messagesGet: {
|
133 | numberOfKeys: 5,
|
134 | lua: `
|
135 | local id = ARGV[1]
|
136 | local maxlen = ARGV[2]
|
137 |
|
138 | local lastMessageId = KEYS[1]
|
139 | local historyMaxSize = KEYS[2]
|
140 | local messagesIds = KEYS[3]
|
141 | local messagesTimestamps = KEYS[4]
|
142 | local messagesHistory = KEYS[5]
|
143 |
|
144 | local lastid = tonumber(redis.call('GET', lastMessageId))
|
145 | local maxsz = tonumber(redis.call('GET', historyMaxSize))
|
146 | local id = math.min(id, lastid)
|
147 | local endp = lastid - id
|
148 | local len = math.min(maxlen, endp)
|
149 | local start = math.max(0, endp - len)
|
150 |
|
151 | if start >= endp then
|
152 | return {}
|
153 | end
|
154 |
|
155 | endp = endp - 1
|
156 | local msgs = redis.call('LRANGE', messagesHistory, start, endp)
|
157 | local tss = redis.call('LRANGE', messagesTimestamps, start, endp)
|
158 | local ids = redis.call('LRANGE', messagesIds, start, endp)
|
159 |
|
160 | return {msgs, tss, ids}`
|
161 | },
|
162 |
|
163 | getSocketsToRooms: {
|
164 | numberOfKeys: 1,
|
165 | lua: `
|
166 | local result = {}
|
167 | local sockets = KEYS[1]
|
168 | local prefix = ARGV[1]
|
169 | local ids = redis.call('HKEYS', sockets)
|
170 |
|
171 | if table.getn(ids) == 0 then
|
172 | local jsonResult = cjson.encode(cjson.null)
|
173 | return {jsonResult}
|
174 | end
|
175 |
|
176 | for i, id in pairs(ids) do
|
177 | local joined = redis.call('SMEMBERS', prefix .. id)
|
178 | result[id] = joined
|
179 | end
|
180 |
|
181 | local jsonResult = cjson.encode(result)
|
182 | return {jsonResult}`
|
183 | },
|
184 |
|
185 | removeAllSocketsFromRoom: {
|
186 | numberOfKeys: 1,
|
187 | lua: `
|
188 | local room = KEYS[1]
|
189 | local prefix = ARGV[1]
|
190 | local roomName = ARGV[2]
|
191 | local ids = redis.call('SMEMBERS', room)
|
192 |
|
193 | if table.getn(ids) == 0 then
|
194 | local jsonResult = cjson.encode(cjson.null)
|
195 | return {jsonResult}
|
196 | end
|
197 |
|
198 | redis.call('DEL', room)
|
199 |
|
200 | for i, id in pairs(ids) do
|
201 | redis.call('SREM', prefix .. id, roomName)
|
202 | end
|
203 |
|
204 | local jsonResult = cjson.encode(ids)
|
205 | return {jsonResult}`
|
206 | },
|
207 |
|
208 | removeSocket: {
|
209 | numberOfKeys: 2,
|
210 | lua: `
|
211 | local id = KEYS[1]
|
212 | local sockets = KEYS[2]
|
213 | local prefix = ARGV[1]
|
214 | local socketid = ARGV[2]
|
215 |
|
216 | local rooms = redis.call('SMEMBERS', id)
|
217 | redis.call('DEL', id)
|
218 |
|
219 | redis.call('HDEL', sockets, socketid)
|
220 | local nconnected = redis.call('HLEN', sockets)
|
221 |
|
222 | local removedRooms = {}
|
223 | local joinedSockets = {}
|
224 |
|
225 | for i, room in pairs(rooms) do
|
226 | local ismember = redis.call('SISMEMBER', prefix .. room, socketid)
|
227 | if ismember == 1 then
|
228 | redis.call('SREM', prefix .. room, socketid)
|
229 | local njoined = redis.call('SCARD', prefix .. room)
|
230 | table.insert(removedRooms, room)
|
231 | table.insert(joinedSockets, njoined)
|
232 | end
|
233 | end
|
234 |
|
235 | if table.getn(removedRooms) == 0 or table.getn(rooms) == 0 then
|
236 | local jsonResult = cjson.encode({cjson.null, cjson.null, nconnected})
|
237 | return {jsonResult}
|
238 | end
|
239 |
|
240 | local jsonResult = cjson.encode({removedRooms, joinedSockets, nconnected})
|
241 | return {jsonResult}`
|
242 | }
|
243 |
|
244 | }
|
245 |
|
246 |
|
247 | class ListsStateRedis {
|
248 |
|
249 | makeKeyName (keyName) {
|
250 | return `${namespace}:${this.prefix}:{${this.name}}:${keyName}`
|
251 | }
|
252 |
|
253 | checkList (listName, num, limit) {
|
254 | if (!this.hasList(listName)) {
|
255 | let error = new ChatServiceError('noList', listName)
|
256 | return Promise.reject(error)
|
257 | }
|
258 | if (listName === 'userlist') {
|
259 | return Promise.resolve()
|
260 | }
|
261 | return this.redis.scard(listName).then(sz => {
|
262 | if (sz + num > limit) {
|
263 | let error = new ChatServiceError('listLimitExceeded', listName)
|
264 | return Promise.reject(error)
|
265 | } else {
|
266 | return Promise.resolve()
|
267 | }
|
268 | })
|
269 | }
|
270 |
|
271 | addToList (listName, elems, limit) {
|
272 | let num = elems.length
|
273 | return this.checkList(listName, num, limit)
|
274 | .then(() => this.redis.sadd(this.makeKeyName(listName), elems))
|
275 | }
|
276 |
|
277 | removeFromList (listName, elems) {
|
278 | return this.checkList(listName)
|
279 | .then(() => this.redis.srem(this.makeKeyName(listName), elems))
|
280 | }
|
281 |
|
282 | getList (listName) {
|
283 | return this.checkList(listName)
|
284 | .then(() => this.redis.smembers(this.makeKeyName(listName)))
|
285 | }
|
286 |
|
287 | hasInList (listName, elem) {
|
288 | return this.checkList(listName)
|
289 | .then(() => this.redis.sismember(this.makeKeyName(listName), elem))
|
290 | .then(data => Promise.resolve(Boolean(data)))
|
291 | }
|
292 |
|
293 | whitelistOnlySet (mode) {
|
294 | let whitelistOnly = mode ? true : ''
|
295 | return this.redis.set(this.makeKeyName('whitelistMode'), whitelistOnly)
|
296 | }
|
297 |
|
298 | whitelistOnlyGet () {
|
299 | return this.redis.get(this.makeKeyName('whitelistMode'))
|
300 | .then(data => Promise.resolve(Boolean(data)))
|
301 | }
|
302 |
|
303 | }
|
304 |
|
305 |
|
306 | class RoomStateRedis extends ListsStateRedis {
|
307 |
|
308 | constructor (server, roomName) {
|
309 | super()
|
310 | this.server = server
|
311 | this.roomName = roomName
|
312 | this.name = this.roomName
|
313 | this.historyMaxGetMessages = this.server.historyMaxGetMessages
|
314 | this.redis = this.server.redis
|
315 | this.exitsErrorName = 'roomExists'
|
316 | this.prefix = 'rooms'
|
317 | mixin(this, StateOperations, this.name, this.exitsErrorName, this.redis,
|
318 | this.makeKeyName.bind(this), this.stateReset.bind(this))
|
319 | }
|
320 |
|
321 | stateReset (state) {
|
322 | state = state || {}
|
323 | let { whitelist, blacklist, adminlist,
|
324 | whitelistOnly, owner, historyMaxSize,
|
325 | enableAccessListsUpdates = this.server.enableAccessListsUpdates,
|
326 | enableUserlistUpdates = this.server.enableUserlistUpdates
|
327 | } = state
|
328 | if (!owner) { owner = '' }
|
329 | return Promise.all([
|
330 | initSet(this.redis, this.makeKeyName('whitelist'), whitelist),
|
331 | initSet(this.redis, this.makeKeyName('blacklist'), blacklist),
|
332 | initSet(this.redis, this.makeKeyName('adminlist'), adminlist),
|
333 | initSet(this.redis, this.makeKeyName('userlist'), null),
|
334 | this.redis.del(this.makeKeyName('messagesHistory')),
|
335 | this.redis.del(this.makeKeyName('messagesTimestamps')),
|
336 | this.redis.del(this.makeKeyName('messagesIds')),
|
337 | this.redis.del(this.makeKeyName('usersseen')),
|
338 | this.redis.set(this.makeKeyName('lastMessageId'), 0),
|
339 | this.redis.set(this.makeKeyName('owner'), owner),
|
340 | this.whitelistOnlySet(whitelistOnly),
|
341 | this.accessListsUpdatesSet(enableAccessListsUpdates),
|
342 | this.userlistUpdatesSet(enableUserlistUpdates),
|
343 | this.historyMaxSizeSet(historyMaxSize)
|
344 | ]).return()
|
345 | }
|
346 |
|
347 | hasList (listName) {
|
348 | return listName === 'adminlist' || listName === 'whitelist' ||
|
349 | listName === 'blacklist' || listName === 'userlist'
|
350 | }
|
351 |
|
352 | ownerGet () {
|
353 | return this.redis.get(this.makeKeyName('owner'))
|
354 | }
|
355 |
|
356 | ownerSet (owner) {
|
357 | return this.redis.set(this.makeKeyName('owner'), owner)
|
358 | }
|
359 |
|
360 | accessListsUpdatesSet (enableAccessListsUpdates) {
|
361 | enableAccessListsUpdates = enableAccessListsUpdates ? true : ''
|
362 | return this.redis.set(this.makeKeyName('enableAccessListsUpdates'),
|
363 | enableAccessListsUpdates)
|
364 | }
|
365 |
|
366 | accessListsUpdatesGet () {
|
367 | return this.redis.get(this.makeKeyName('enableAccessListsUpdates'))
|
368 | .then(data => Promise.resolve(Boolean(data)))
|
369 | }
|
370 |
|
371 | userlistUpdatesSet (enableUserlistUpdates) {
|
372 | enableUserlistUpdates = enableUserlistUpdates ? true : ''
|
373 | return this.redis.set(this.makeKeyName('enableUserlistUpdates'),
|
374 | enableUserlistUpdates)
|
375 | }
|
376 |
|
377 | userlistUpdatesGet () {
|
378 | return this.redis.get(this.makeKeyName('enableUserlistUpdates'))
|
379 | .then(data => Promise.resolve(Boolean(data)))
|
380 | }
|
381 |
|
382 | historyMaxSizeSet (historyMaxSize) {
|
383 | let limit = historyMaxSize
|
384 | if (!(_.isNumber(historyMaxSize) && historyMaxSize >= 0)) {
|
385 | limit = this.server.historyMaxSize
|
386 | }
|
387 | if (limit === 0) {
|
388 | return this.redis.multi()
|
389 | .set(this.makeKeyName('historyMaxSize'), limit)
|
390 | .del(this.makeKeyName('messagesHistory'))
|
391 | .del(this.makeKeyName('messagesTimestamps'))
|
392 | .del(this.makeKeyName('messagesIds'))
|
393 | .exec()
|
394 | } else {
|
395 | let last = limit - 1
|
396 | return this.redis.multi()
|
397 | .set(this.makeKeyName('historyMaxSize'), limit)
|
398 | .ltrim(this.makeKeyName('messagesHistory'), 0, last)
|
399 | .ltrim(this.makeKeyName('messagesTimestamps'), 0, last)
|
400 | .ltrim(this.makeKeyName('messagesIds'), 0, last)
|
401 | .exec()
|
402 | }
|
403 | }
|
404 |
|
405 | historyInfo () {
|
406 | return this.redis.multi()
|
407 | .get(this.makeKeyName('historyMaxSize'))
|
408 | .llen(this.makeKeyName('messagesHistory'))
|
409 | .get(this.makeKeyName('lastMessageId'))
|
410 | .exec()
|
411 | .spread(([, historyMaxSize], [, historySize], [, lastMessageId]) => {
|
412 | historySize = parseInt(historySize)
|
413 | historyMaxSize = parseFloat(historyMaxSize)
|
414 | lastMessageId = parseInt(lastMessageId)
|
415 | let info = { historySize,
|
416 | historyMaxSize,
|
417 | historyMaxGetMessages: this.historyMaxGetMessages,
|
418 | lastMessageId }
|
419 | return Promise.resolve(info)
|
420 | })
|
421 | }
|
422 |
|
423 | getCommonUsers () {
|
424 | return this.redis.sdiff(this.makeKeyName('userlist'),
|
425 | this.makeKeyName('whitelist'),
|
426 | this.makeKeyName('adminlist'))
|
427 | }
|
428 |
|
429 | messageAdd (msg) {
|
430 | let timestamp = _.now()
|
431 | let smsg = JSON.stringify(msg)
|
432 | return this.redis.messageAdd(
|
433 | this.makeKeyName('lastMessageId'), this.makeKeyName('historyMaxSize'),
|
434 | this.makeKeyName('messagesIds'), this.makeKeyName('messagesTimestamps'),
|
435 | this.makeKeyName('messagesHistory'), smsg, timestamp)
|
436 | .spread(id => {
|
437 | msg.id = id
|
438 | msg.timestamp = timestamp
|
439 | return Promise.resolve(msg)
|
440 | })
|
441 | }
|
442 |
|
443 | convertMessages (msgs, tss, ids) {
|
444 | let data = []
|
445 | if (!msgs) {
|
446 | return Promise.resolve(data)
|
447 | }
|
448 | for (let idx = 0; idx < msgs.length; idx++) {
|
449 | let msg = msgs[idx]
|
450 | let obj = JSON.parse(msg, (key, val) => {
|
451 | if (val && val.type === 'Buffer') {
|
452 | return new Buffer(val.data)
|
453 | } else {
|
454 | return val
|
455 | }
|
456 | })
|
457 | obj.timestamp = parseInt(tss[idx])
|
458 | obj.id = parseInt(ids[idx])
|
459 | data[idx] = obj
|
460 | }
|
461 | return Promise.resolve(data)
|
462 | }
|
463 |
|
464 | messagesGetRecent () {
|
465 | if (this.historyMaxGetMessages <= 0) { return Promise.resolve([]) }
|
466 | let limit = this.historyMaxGetMessages - 1
|
467 | return this.redis.multi()
|
468 | .lrange(this.makeKeyName('messagesHistory'), 0, limit)
|
469 | .lrange(this.makeKeyName('messagesTimestamps'), 0, limit)
|
470 | .lrange(this.makeKeyName('messagesIds'), 0, limit)
|
471 | .exec()
|
472 | .spread(([, msgs], [, tss], [, ids]) => {
|
473 | return this.convertMessages(msgs, tss, ids)
|
474 | })
|
475 | }
|
476 |
|
477 | messagesGet (id, maxMessages = this.historyMaxGetMessages) {
|
478 | if (maxMessages <= 0) { return Promise.resolve([]) }
|
479 | id = _.max([0, id])
|
480 | return this.redis.messagesGet(
|
481 | this.makeKeyName('lastMessageId'), this.makeKeyName('historyMaxSize'),
|
482 | this.makeKeyName('messagesIds'), this.makeKeyName('messagesTimestamps'),
|
483 | this.makeKeyName('messagesHistory'), id, maxMessages)
|
484 | .spread((msgs, tss, ids) => {
|
485 | return this.convertMessages(msgs, tss, ids)
|
486 | })
|
487 | }
|
488 |
|
489 | userSeenGet (userName) {
|
490 | return this.redis.multi()
|
491 | .hget(this.makeKeyName('usersseen'), userName)
|
492 | .sismember(this.makeKeyName('userlist'), userName)
|
493 | .exec()
|
494 | .spread(([, ts], [, isjoined]) => {
|
495 | let joined = Boolean(isjoined)
|
496 | let timestamp = ts ? parseInt(ts) : null
|
497 | return {joined, timestamp}
|
498 | })
|
499 | }
|
500 |
|
501 | userSeenUpdate (userName) {
|
502 | let timestamp = _.now()
|
503 | return this.redis.hset(this.makeKeyName('usersseen'), userName, timestamp)
|
504 | }
|
505 |
|
506 | }
|
507 |
|
508 |
|
509 | class DirectMessagingStateRedis extends ListsStateRedis {
|
510 |
|
511 | constructor (server, userName) {
|
512 | super()
|
513 | this.server = server
|
514 | this.userName = userName
|
515 | this.name = this.userName
|
516 | this.prefix = 'users'
|
517 | this.exitsErrorName = 'userExists'
|
518 | this.redis = this.server.redis
|
519 | mixin(this, StateOperations, this.name, this.exitsErrorName, this.redis,
|
520 | this.makeKeyName.bind(this), this.stateReset.bind(this))
|
521 | }
|
522 |
|
523 | hasList (listName) {
|
524 | return listName === 'whitelist' || listName === 'blacklist'
|
525 | }
|
526 |
|
527 | stateReset (state) {
|
528 | state = state || {}
|
529 | let { whitelist, blacklist, whitelistOnly } = state
|
530 | whitelistOnly = whitelistOnly ? true : ''
|
531 | return Promise.all([
|
532 | initSet(this.redis, this.makeKeyName('whitelist'), whitelist),
|
533 | initSet(this.redis, this.makeKeyName('blacklist'), blacklist),
|
534 | this.redis.set(this.makeKeyName('whitelistMode'), whitelistOnly)
|
535 | ]).return()
|
536 | }
|
537 |
|
538 | }
|
539 |
|
540 |
|
541 | class UserStateRedis {
|
542 |
|
543 | constructor (server, userName) {
|
544 | this.server = server
|
545 | this.userName = userName
|
546 | this.name = this.userName
|
547 | this.prefix = 'users'
|
548 | this.redis = this.server.redis
|
549 | mixin(this, LockOperations, this.redis)
|
550 | }
|
551 |
|
552 | makeKeyName (keyName) {
|
553 | return `${namespace}:${this.prefix}:{${this.name}}:${keyName}`
|
554 | }
|
555 |
|
556 | makeSocketToRooms (id = '') {
|
557 | return this.makeKeyName(`socketsToRooms:${id}`)
|
558 | }
|
559 |
|
560 | makeRoomToSockets (room = '') {
|
561 | return this.makeKeyName(`roomsToSockets:${room}`)
|
562 | }
|
563 |
|
564 | makeRoomLock (room) {
|
565 | return this.makeKeyName(`roomLock:${room}`)
|
566 | }
|
567 |
|
568 | addSocket (id, uid) {
|
569 | return this.redis.multi()
|
570 | .hset(this.makeKeyName('sockets'), id, uid)
|
571 | .hlen(this.makeKeyName('sockets'))
|
572 | .exec()
|
573 | .spread((_, [, nconnected]) => Promise.resolve(nconnected))
|
574 | }
|
575 |
|
576 | getAllSockets () {
|
577 | return this.redis.hkeys(this.makeKeyName('sockets'))
|
578 | }
|
579 |
|
580 | getSocketsToInstance () {
|
581 | return this.redis.hgetall(this.makeKeyName('sockets'))
|
582 | }
|
583 |
|
584 | getRoomToSockets (roomName) {
|
585 | return this.redis.smembers(this.makeRoomToSockets(roomName))
|
586 | }
|
587 |
|
588 | getSocketsToRooms () {
|
589 | return this.redis.getSocketsToRooms(
|
590 | this.makeKeyName('sockets'), this.makeSocketToRooms())
|
591 | .spread(result => {
|
592 | let data = JSON.parse(result) || {}
|
593 | for (let [k, v] of _.toPairs(data)) {
|
594 | if (_.isEmpty(v)) { data[k] = [] }
|
595 | }
|
596 | return Promise.resolve(data)
|
597 | })
|
598 | }
|
599 |
|
600 | addSocketToRoom (id, roomName) {
|
601 | return this.redis.multi()
|
602 | .sadd(this.makeSocketToRooms(id), roomName)
|
603 | .sadd(this.makeRoomToSockets(roomName), id)
|
604 | .scard(this.makeRoomToSockets(roomName))
|
605 | .exec()
|
606 | .then(([, , [, njoined]]) => Promise.resolve(njoined))
|
607 | }
|
608 |
|
609 | removeSocketFromRoom (id, roomName) {
|
610 | return this.redis.multi()
|
611 | .scard(this.makeRoomToSockets(roomName))
|
612 | .srem(this.makeSocketToRooms(id), roomName)
|
613 | .srem(this.makeRoomToSockets(roomName), id)
|
614 | .scard(this.makeRoomToSockets(roomName))
|
615 | .exec()
|
616 | .then(([[, wasjoined], , , [, njoined]]) => {
|
617 | let hasChanged = njoined !== wasjoined
|
618 | return Promise.resolve([njoined, hasChanged])
|
619 | })
|
620 | }
|
621 |
|
622 | removeAllSocketsFromRoom (roomName) {
|
623 | return this.redis.removeAllSocketsFromRoom(
|
624 | this.makeRoomToSockets(roomName), this.makeSocketToRooms(), roomName)
|
625 | .spread(result => Promise.resolve(JSON.parse(result)))
|
626 | }
|
627 |
|
628 | removeSocket (id) {
|
629 | return this.redis.removeSocket(
|
630 | this.makeSocketToRooms(id), this.makeKeyName('sockets'),
|
631 | this.makeRoomToSockets(), id)
|
632 | .spread(result => Promise.resolve(JSON.parse(result)))
|
633 | }
|
634 |
|
635 | lockToRoom (roomName, ttl) {
|
636 | return uid(18).then(val => {
|
637 | let start = _.now()
|
638 | return this.lock(this.makeRoomLock(roomName), val, ttl).then(() => {
|
639 | return Promise.resolve().disposer(() => {
|
640 | if (start + ttl < _.now()) {
|
641 | this.server.emit(
|
642 | 'lockTimeExceeded', val, {userName: this.userName, roomName})
|
643 | }
|
644 | return this.unlock(this.makeRoomLock(roomName), val)
|
645 | })
|
646 | })
|
647 | })
|
648 | }
|
649 |
|
650 | }
|
651 |
|
652 |
|
653 | class RedisState {
|
654 |
|
655 | constructor (server, options) {
|
656 | this.server = server
|
657 | this.options = options
|
658 | this.closed = false
|
659 | if (this.options.useCluster) {
|
660 | this.redis = new Redis.Cluster(...this.options.redisOptions)
|
661 | } else {
|
662 | let redisOptions = _.castArray(this.options.redisOptions)
|
663 | this.redis = new Redis(...redisOptions)
|
664 | }
|
665 | this.RoomState = RoomStateRedis
|
666 | this.UserState = UserStateRedis
|
667 | this.DirectMessagingState = DirectMessagingStateRedis
|
668 | this.lockTTL = this.options.lockTTL || 10000
|
669 | this.instanceUID = this.server.instanceUID
|
670 | this.server.redis = this.redis
|
671 | for (let [cmd, def] of _.toPairs(luaCommands)) {
|
672 | this.redis.defineCommand(cmd, {
|
673 | numberOfKeys: def.numberOfKeys,
|
674 | lua: def.lua
|
675 | })
|
676 | }
|
677 | }
|
678 |
|
679 | makeKeyName (prefix, name, keyName) {
|
680 | return `${namespace}:${prefix}:{${name}}:${keyName}`
|
681 | }
|
682 |
|
683 | hasRoom (name) {
|
684 | return this.redis.get(this.makeKeyName('rooms', name, 'isInit'))
|
685 | }
|
686 |
|
687 | hasUser (name) {
|
688 | return this.redis.get(this.makeKeyName('users', name, 'isInit'))
|
689 | }
|
690 |
|
691 | close () {
|
692 | this.closed = true
|
693 | return this.redis.quit().return()
|
694 | }
|
695 |
|
696 | getRoom (name, isPredicate = false) {
|
697 | let room = new Room(this.server, name)
|
698 | return this.hasRoom(name).then(exists => {
|
699 | if (!exists) {
|
700 | if (isPredicate) {
|
701 | return Promise.resolve(null)
|
702 | } else {
|
703 | let error = new ChatServiceError('noRoom', name)
|
704 | return Promise.reject(error)
|
705 | }
|
706 | }
|
707 | return Promise.resolve(room)
|
708 | })
|
709 | }
|
710 |
|
711 | addRoom (name, state) {
|
712 | let room = new Room(this.server, name)
|
713 | return room.initState(state).return(room)
|
714 | }
|
715 |
|
716 | removeRoom (name) {
|
717 | return Promise.resolve()
|
718 | }
|
719 |
|
720 | addSocket (id, userName) {
|
721 | return this.redis.hset(
|
722 | this.makeKeyName('instances', this.instanceUID, 'sockets'), id, userName)
|
723 | }
|
724 |
|
725 | removeSocket (id) {
|
726 | return this.redis.hdel(
|
727 | this.makeKeyName('instances', this.instanceUID, 'sockets'), id)
|
728 | }
|
729 |
|
730 | getInstanceSockets (uid = this.instanceUID) {
|
731 | return this.redis.hgetall(this.makeKeyName('instances', uid, 'sockets'))
|
732 | }
|
733 |
|
734 | updateHeartbeat () {
|
735 | return this.redis.set(
|
736 | this.makeKeyName('instances', this.instanceUID, 'heartbeat'), _.now())
|
737 | .catchReturn()
|
738 | }
|
739 |
|
740 | getInstanceHeartbeat (uid = this.instanceUID) {
|
741 | return this.redis.get(this.makeKeyName('instances', uid, 'heartbeat'))
|
742 | .then(ts => ts ? parseInt(ts) : null)
|
743 | }
|
744 |
|
745 | getOrAddUser (name, state) {
|
746 | let user = new User(this.server, name)
|
747 | return this.hasUser(name).then(exists => {
|
748 | if (!exists) {
|
749 | return user.initState(state)
|
750 | } else {
|
751 | return Promise.resolve()
|
752 | }
|
753 | }).catch(ChatServiceError, e => user)
|
754 | .return(user)
|
755 | }
|
756 |
|
757 | getUser (name, isPredicate = false) {
|
758 | let user = new User(this.server, name)
|
759 | return this.hasUser(name).then(exists => {
|
760 | if (!exists) {
|
761 | if (isPredicate) {
|
762 | return Promise.resolve(null)
|
763 | } else {
|
764 | let error = new ChatServiceError('noUser', name)
|
765 | return Promise.reject(error)
|
766 | }
|
767 | }
|
768 | return Promise.resolve(user)
|
769 | })
|
770 | }
|
771 |
|
772 | addUser (name, state) {
|
773 | let user = new User(this.server, name)
|
774 | return user.initState(state).return(user)
|
775 | }
|
776 |
|
777 | removeUser (name) {
|
778 | return Promise.resolve()
|
779 | }
|
780 |
|
781 | }
|
782 |
|
783 | module.exports = RedisState
|