UNPKG

5 kBJavaScriptView Raw
1'use strict'
2
3const ChatServiceError = require('./ChatServiceError')
4const Promise = require('bluebird')
5const RedisAdapter = require('socket.io-redis')
6const SocketIOClusterBus = require('./SocketIOClusterBus')
7const SocketServer = require('socket.io')
8const _ = require('lodash')
9const { run } = require('./utils')
10
11// Socket.io transport.
12class SocketIOTransport {
13
14 constructor (server, options, adapterConstructor, adapterOptions) {
15 this.server = server
16 this.options = options
17 this.adapterConstructor = adapterConstructor
18 this.adapterOptions = adapterOptions
19 this.port = this.server.port
20 this.io = this.options.io
21 this.middleware = options.middleware
22 this.namespace = this.options.namespace || '/chat-service'
23 let Adapter = (() => {
24 switch (true) {
25 case this.adapterConstructor === 'memory':
26 return null
27 case this.adapterConstructor === 'redis':
28 return RedisAdapter
29 case _.isFunction(this.adapterConstructor):
30 return this.adapterConstructor
31 default:
32 let c = this.adapterConstructor
33 throw new Error(`Invalid transport adapter: ${c}`)
34 }
35 })()
36 if (!this.io) {
37 this.ioOptions = this.options.ioOptions
38 this.http = this.options.http
39 if (this.http) {
40 this.dontCloseIO = true
41 this.io = new SocketServer(this.options.http, this.ioOptions)
42 } else {
43 this.io = new SocketServer(this.port, this.ioOptions)
44 }
45 if (Adapter) {
46 this.adapter = new Adapter(...this.adapterOptions)
47 this.io.adapter(this.adapter)
48 }
49 } else {
50 this.dontCloseIO = true
51 }
52 this.nsp = this.io.of(this.namespace)
53 this.server.io = this.io
54 this.server.nsp = this.nsp
55 this.clusterBus = new SocketIOClusterBus(this.server, this.nsp.adapter)
56 this.closed = false
57 }
58
59 rejectLogin (socket, error) {
60 error = this.server.convertError(error)
61 socket.emit('loginRejected', error)
62 socket.disconnect()
63 }
64
65 confirmLogin (socket, userName, authData) {
66 authData.id = socket.id
67 socket.emit('loginConfirmed', userName, authData)
68 }
69
70 ensureUserName (socket, userName) {
71 return Promise.try(() => {
72 if (!userName) {
73 let { query } = socket.handshake
74 userName = query && query.user
75 if (!userName) {
76 return Promise.reject(new ChatServiceError('noLogin'))
77 }
78 }
79 return Promise.resolve(userName)
80 })
81 }
82
83 setEvents () {
84 if (this.middleware) {
85 let middleware = _.castArray(this.middleware)
86 for (let fn of middleware) {
87 this.nsp.use(fn)
88 }
89 }
90 this.nsp.on('connection', socket => {
91 return run(this, function * () {
92 let id = socket.id
93 let [userName, authData = {}] = yield this.server.onConnect(id)
94 userName = yield this.ensureUserName(socket, userName)
95 yield this.server.registerClient(userName, id)
96 this.confirmLogin(socket, userName, authData)
97 }).catch(error => this.rejectLogin(socket, error))
98 })
99 }
100
101 close () {
102 this.closed = true
103 this.nsp.removeAllListeners('connection')
104 this.clusterBus.removeAllListeners()
105 return Promise.try(() => {
106 if (!this.dontCloseIO) {
107 this.io.close()
108 } else if (this.http) {
109 this.io.engine.close()
110 } else {
111 for (let [, socket] of _.toPairs(this.nsp.connected)) {
112 socket.disconnect()
113 }
114 }
115 return Promise.resolve()
116 })
117 }
118
119 bindHandler (id, name, fn) {
120 let socket = this.getSocket(id)
121 if (socket) {
122 socket.on(name, fn)
123 }
124 }
125
126 getServer () {
127 return this.io
128 }
129
130 getSocket (id) {
131 return this.nsp.connected[id]
132 }
133
134 emitToChannel (channel, eventName, ...eventData) {
135 this.nsp.to(channel).emit(eventName, ...eventData)
136 }
137
138 sendToChannel (id, channel, eventName, ...eventData) {
139 let socket = this.getSocket(id)
140 if (!socket) {
141 this.emitToChannel(channel, eventName, ...eventData)
142 } else {
143 socket.to(channel).emit(eventName, ...eventData)
144 }
145 }
146
147 getHandshakeData (id) {
148 let res = { isConnected: false, query: {}, headers: {} }
149 let socket = this.getSocket(id)
150 if (!socket) { return res }
151 res.isConnected = true
152 res.query = socket.handshake.query
153 res.headers = socket.handshake.headers
154 return res
155 }
156
157 joinChannel (id, channel) {
158 let socket = this.getSocket(id)
159 if (!socket) {
160 return Promise.reject(new ChatServiceError('invalidSocket', id))
161 } else {
162 return Promise.fromCallback(fn => socket.join(channel, fn))
163 }
164 }
165
166 leaveChannel (id, channel) {
167 let socket = this.getSocket(id)
168 if (!socket) { return Promise.resolve() }
169 return Promise.fromCallback(fn => socket.leave(channel, fn))
170 }
171
172 disconnectSocket (id) {
173 let socket = this.getSocket(id)
174 if (socket) {
175 socket.disconnect()
176 }
177 return Promise.resolve()
178 }
179
180}
181
182module.exports = SocketIOTransport