UNPKG

13.8 kBJavaScriptView Raw
1'use strict'
2
3/**
4 * Node style callback. All callbacks are optional, promises may be
5 * used instead. But only one API must be used per invocation.
6 *
7 * @callback callback
8 * @param {Error} error
9 * @param {...*} results
10 */
11
12/**
13 * Server side related documentation.
14 *
15 * @example <caption>npm package usage</caption>
16 * let ChatService = require('chat-service')
17 *
18 * @namespace chat-service
19 */
20
21const ArgumentsValidator = require('./ArgumentsValidator')
22const ChatServiceError = require('./ChatServiceError')
23const MemoryState = require('./MemoryState')
24const Promise = require('bluebird')
25const RecoveryAPI = require('./RecoveryAPI')
26const RedisState = require('./RedisState')
27const Room = require('./Room')
28const ServiceAPI = require('./ServiceAPI')
29const SocketIOClusterBus = require('./SocketIOClusterBus')
30const SocketIOTransport = require('./SocketIOTransport')
31const User = require('./User')
32const _ = require('lodash')
33const uid = require('uid-safe')
34const { EventEmitter } = require('events')
35const { checkNameSymbols, convertError, execHook, logError } =
36 require('./utils')
37const { mixin } = require('es6-mixin')
38
39const rpcRequestsNames = [
40 'directAddToList',
41 'directGetAccessList',
42 'directGetWhitelistMode',
43 'directMessage',
44 'directRemoveFromList',
45 'directSetWhitelistMode',
46 'listOwnSockets',
47 'roomAddToList',
48 'roomCreate',
49 'roomDelete',
50 'roomGetAccessList',
51 'roomGetOwner',
52 'roomGetWhitelistMode',
53 'roomHistoryGet',
54 'roomHistoryInfo',
55 'roomJoin',
56 'roomLeave',
57 'roomMessage',
58 'roomNotificationsInfo',
59 'roomRecentHistory',
60 'roomRemoveFromList',
61 'roomSetWhitelistMode',
62 'roomUserSeen',
63 'systemMessage'
64]
65
66/**
67 * Service class, is the package exported object.
68 *
69 * @extends EventEmitter
70 *
71 * @mixes chat-service.ServiceAPI
72 * @mixes chat-service.RecoveryAPI
73 *
74 * @fires chat-service.ChatService.ready
75 * @fires chat-service.ChatService.closed
76 * @fires chat-service.ChatService.storeConsistencyFailure
77 * @fires chat-service.ChatService.transportConsistencyFailure
78 * @fires chat-service.ChatService.lockTimeExceeded
79 *
80 * @example <caption>starting a server</caption>
81 * let ChatService = require('chat-service')
82 * let service = new ChatService(options, hooks)
83 *
84 * @example <caption>server-side: adding a room</caption>
85 * let owner = 'admin'
86 * let whitelistOnly = true
87 * let whitelist = [ 'user' ]
88 * let state = { owner, whitelistOnly, whitelist }
89 * chatService.addRoom('someRoom', state).then(fn)
90 *
91 * @example <caption>server-side: sending a room message</caption>
92 * let room = 'someRoom'
93 * let msg = { textMessage: 'some message' }
94 * let context = {
95 * userName: 'system',
96 * bypassPermissions: true
97 * }
98 * chatService.execUserCommand(context, 'roomMessage', room, msg)
99 * .then(fn)
100 *
101 * @example <caption>server-side: joining an user socket to a room</caption>
102 * let room = 'someRoom'
103 * let context = {
104 * userName: 'user',
105 * id: id // socket id
106 * }
107 * chatService.execUserCommand(context, 'roomJoin', room)
108 * .then(fn) // real sockets will get a notification
109 *
110 * @memberof chat-service
111 *
112 */
113class ChatService extends EventEmitter {
114
115 /**
116 * Crates an object and starts a new service instance. The {@link
117 * chat-service.ChatService#close} method __MUST__ be called before
118 * the node process exit.
119 *
120 * @param {chat-service.config.options} [options] Service
121 * configuration options.
122 *
123 * @param {chat-service.hooks.HooksInterface} [hooks] Service
124 * customisation hooks.
125 */
126 constructor (options = {}, hooks = {}) {
127 super()
128 this.options = options
129 this.hooks = hooks
130 this.initVariables()
131 this.setOptions()
132 this.setIntegraionOptions()
133 this.setComponents()
134 this.attachBusListeners()
135 mixin(this, ServiceAPI, this.state,
136 () => new User(this), this.clusterBus)
137 mixin(this, RecoveryAPI, this.state, this.transport,
138 this.execUserCommand.bind(this), this.instanceUID)
139 this.startServer()
140 }
141
142 /**
143 * ChatService errors constructor. This errors are intended to be
144 * returned to clients as a part of a normal service functioning
145 * (something like 403 errors). Can be also used to create custom
146 * errors subclasses.
147 *
148 * @name ChatServiceError
149 * @type Class
150 * @static
151 * @readonly
152 *
153 * @memberof chat-service.ChatService
154 *
155 * @see rpc.datatypes.ChatServiceError
156 */
157
158 /**
159 * Service instance UID.
160 *
161 * @name chat-service.ChatService#instanceUID
162 * @type string
163 * @readonly
164 */
165
166 /**
167 * Cluster communication via an adapter. Emits messages to all
168 * services nodes, including the sender node.
169 *
170 * @name chat-service.ChatService#clusterBus
171 * @type EventEmitter
172 * @readonly
173 */
174
175 /**
176 * Transport object.
177 *
178 * @name chat-service.ChatService#transport
179 * @type chat-service.TransportInterface
180 * @readonly
181 */
182
183 /**
184 * Service is ready, state and transport are up.
185 * @event ready
186 *
187 * @memberof chat-service.ChatService
188 */
189
190 /**
191 * Service is closed, state and transport are closed.
192 * @event closed
193 * @param {Error} [error] If was closed due to an error.
194 *
195 * @memberof chat-service.ChatService
196 */
197
198 /**
199 * State store failed to be updated to reflect an user's connection
200 * or presence state.
201 *
202 * @event storeConsistencyFailure
203 * @param {Error} error Error.
204 * @param {Object} operationInfo Operation details.
205 * @property {string} operationInfo.userName User name.
206 * @property {string} operationInfo.opType Operation type.
207 * @property {string} [operationInfo.roomName] Room name.
208 * @property {string} [operationInfo.id] Socket id.
209 *
210 * @see chat-service.RecoveryAPI
211 *
212 * @memberof chat-service.ChatService
213 */
214
215 /**
216 * Failed to teardown a transport connection.
217 *
218 * @event transportConsistencyFailure
219 *
220 * @param {Error} error Error.
221 * @param {Object} operationInfo Operation details.
222 * @property {string} operationInfo.userName User name.
223 * @property {string} operationInfo.opType Operation type.
224 * @property {string} [operationInfo.roomName] Room name.
225 * @property {string} [operationInfo.id] Socket id.
226 *
227 * @memberof chat-service.ChatService
228 */
229
230 /**
231 * Lock was hold longer than a lock ttl.
232 *
233 * @event lockTimeExceeded
234 *
235 * @param {string} id Lock id.
236 * @param {Object} lockInfo Lock resource details.
237 * @property {string} [lockInfo.userName] User name.
238 * @property {string} [lockInfo.roomName] Room name.
239 *
240 * @see chat-service.RecoveryAPI
241 *
242 * @memberof chat-service.ChatService
243 */
244
245 /**
246 * Exposes an internal arguments validation method, it is run
247 * automatically by all client request (command) handlers.
248 *
249 * @method chat-service.ChatService#checkArguments
250 *
251 * @param {string} name Command name.
252 * @param {...*} args Command arguments.
253 * @param {callback} [cb] Optional callback.
254 *
255 * @return {Promise<undefined>} Promise that resolves without any
256 * data if validation is successful, otherwise a promise is
257 * rejected.
258 */
259
260 initVariables () {
261 this.instanceUID = uid.sync(18)
262 this.runningCommands = 0
263 this.rpcRequestsNames = rpcRequestsNames
264 this.closed = false
265 // constants
266 this.ChatServiceError = ChatServiceError
267 this.SocketIOClusterBus = SocketIOClusterBus
268 this.User = User
269 this.Room = Room
270 }
271
272 setOptions () {
273 this.closeTimeout = this.options.closeTimeout || 15000
274 this.busAckTimeout = this.options.busAckTimeout || 5000
275 this.heartbeatRate = this.options.heartbeatRate || 10000
276 this.heartbeatTimeout = this.options.heartbeatTimeout || 30000
277 this.directListSizeLimit = this.options.directListSizeLimit || 1000
278 this.roomListSizeLimit = this.options.roomListSizeLimit || 10000
279 this.enableAccessListsUpdates =
280 this.options.enableAccessListsUpdates || false
281 this.enableDirectMessages = this.options.enableDirectMessages || false
282 this.enableRoomsManagement = this.options.enableRoomsManagement || false
283 this.enableUserlistUpdates = this.options.enableUserlistUpdates || false
284 this.historyMaxGetMessages = this.options.historyMaxGetMessages
285 if (!_.isNumber(this.historyMaxGetMessages) ||
286 this.historyMaxGetMessages < 0) {
287 this.historyMaxGetMessages = 100
288 }
289 this.historyMaxSize = this.options.historyMaxSize
290 if (!_.isNumber(this.historyMaxSize) ||
291 this.historyMaxSize < 0) {
292 this.historyMaxSize = 10000
293 }
294 this.port = this.options.port || 8000
295 this.directMessagesChecker = this.hooks.directMessagesChecker
296 this.roomMessagesChecker = this.hooks.roomMessagesChecker
297 this.useRawErrorObjects = this.options.useRawErrorObjects || false
298 }
299
300 setIntegraionOptions () {
301 this.adapterConstructor = this.options.adapter || 'memory'
302 this.adapterOptions = _.castArray(this.options.adapterOptions)
303
304 this.stateConstructor = this.options.state || 'memory'
305 this.stateOptions = this.options.stateOptions || {}
306
307 this.transportConstructor = this.options.transport || 'socket.io'
308 this.transportOptions = this.options.transportOptions || {}
309 }
310
311 setComponents () {
312 let State = (() => {
313 switch (true) {
314 case this.stateConstructor === 'memory':
315 return MemoryState
316 case this.stateConstructor === 'redis':
317 return RedisState
318 case _.isFunction(this.stateConstructor):
319 return this.stateConstructor
320 default:
321 throw new Error(`Invalid state: ${this.stateConstructor}`)
322 }
323 })()
324 let Transport = (() => {
325 switch (true) {
326 case this.transportConstructor === 'socket.io':
327 return SocketIOTransport
328 case _.isFunction(this.transportConstructor):
329 return this.transportConstructor
330 default:
331 throw new Error(`Invalid transport: ${this.transportConstructor}`)
332 }
333 })()
334 this.validator = new ArgumentsValidator(this)
335 this.checkArguments = this.validator.checkArguments.bind(this.validator)
336 this.state = new State(this, this.stateOptions)
337 this.transport = new Transport(
338 this, this.transportOptions,
339 this.adapterConstructor, this.adapterOptions)
340 this.clusterBus = this.transport.clusterBus
341 }
342
343 attachBusListeners () {
344 this.clusterBus.on('roomLeaveSocket', (id, roomName) => {
345 return this.transport.leaveChannel(id, roomName)
346 .then(() => this.clusterBus.emit('socketRoomLeft', id, roomName))
347 .catchReturn()
348 })
349 this.clusterBus.on('disconnectUserSockets', userName => {
350 return this.state.getUser(userName)
351 .then(user => user.disconnectInstanceSockets())
352 .catchReturn()
353 })
354 }
355
356 // for transport plugins integration
357 convertError (error) {
358 return convertError(error, this.useRawErrorObjects)
359 }
360
361 // for transport plugins integration
362 onConnect (id) {
363 if (this.hooks.onConnect) {
364 return Promise.try(() => {
365 return execHook(this.hooks.onConnect, this, id)
366 }).then(loginData => {
367 loginData = _.castArray(loginData)
368 return Promise.resolve(loginData)
369 }).catch(error => logError(error))
370 } else {
371 return Promise.resolve([])
372 }
373 }
374
375 // for transport plugins integration
376 registerClient (userName, id) {
377 return checkNameSymbols(userName)
378 .then(() => this.state.getOrAddUser(userName))
379 .then(user => user.registerSocket(id))
380 .catch(error => logError(error))
381 }
382
383 waitCommands () {
384 if (this.runningCommands > 0) {
385 return Promise.fromCallback(cb => {
386 return this.once('commandsFinished', cb)
387 })
388 } else {
389 return Promise.resolve()
390 }
391 }
392
393 closeTransport () {
394 return this.transport.close()
395 .then(() => this.waitCommands())
396 .timeout(this.closeTimeout)
397 }
398
399 startServer () {
400 return Promise.try(() => {
401 if (this.hooks.onStart) {
402 return this.clusterBus.listen()
403 .then(() => execHook(this.hooks.onStart, this))
404 .then(() => this.transport.setEvents())
405 } else {
406 // tests spec compatibility
407 this.transport.setEvents()
408 return this.clusterBus.listen()
409 }
410 }).then(() => {
411 this.state.updateHeartbeat()
412 let hbupdater = this.state.updateHeartbeat.bind(this.state)
413 this.hbtimer = setInterval(hbupdater, this.heartbeatRate)
414 return this.emit('ready')
415 }).catch(error => {
416 this.closed = true
417 return this.closeTransport()
418 .then(() => this.state.close())
419 .finally(() => this.emit('closed', error))
420 })
421 }
422
423 /**
424 * Closes server.
425 * @note __MUST__ be called before node process shutdown to correctly
426 * update the state.
427 * @param {callback} [cb] Optional callback.
428 * @return {Promise<undefined>} Promise that resolves without any data.
429 */
430 close (cb) {
431 if (this.closed) { return Promise.resolve() }
432 this.closed = true
433 clearInterval(this.hbtimer)
434 let closeError = null
435 return this.closeTransport().then(
436 () => execHook(this.hooks.onClose, this, null),
437 error => {
438 if (this.hooks.onClose) {
439 return execHook(this.hooks.onClose, this, error)
440 } else {
441 return Promise.reject(error)
442 }
443 }).catch(error => {
444 closeError = error
445 return Promise.reject(error)
446 }).finally(() => {
447 return this.state.close()
448 .finally(() => this.emit('closed', closeError))
449 }).asCallback(cb)
450 }
451}
452
453// for custom errors
454ChatService.ChatServiceError = ChatServiceError
455
456// for transport plugin implementations
457ChatService.SocketIOClusterBus = SocketIOClusterBus
458
459// for store plugin implementations
460ChatService.User = User
461ChatService.Room = Room
462
463module.exports = ChatService