'use strict'
const uuid = require('uuid')
const ActionHero = require('./../index.js')
const api = ActionHero.api
/**
* Redis helpers and connections.
*
* @namespace api.redis
* @property {Object} clients - Holds the redis clients. Contains 3 redis connections: 'client', 'subscriber' and 'tasks'. Configured via `api.config.redis`.
* @property {Object} clients.client - The main redis connection. Use this if you need direct access to redis.
* @property {Object} clients.subscriber - A Redis connection only listeneing for reids pub/sub events.
* @property {Object} clients.tasks - A Redis connection for use only in the task ssytem.
* @property {Object} subscriptionHandlers - Callbacks for redis pub/sub
* @property {Object} rpcCallbacks - RPC callbacks for responses to other clients
* @property {Object} status - Redis connection statuses
* @extends ActionHero.Initializer
*/
module.exports = class Redis extends ActionHero.Initializer {
constructor () {
super()
this.name = 'redis'
this.loadPriority = 200
this.startPriority = 101
this.stopPriority = 99999
}
async initialize () {
if (api.config.redis.enabled === false) { return }
api.redis = {}
api.redis.clients = {}
api.redis.subscriptionHandlers = {}
api.redis.rpcCallbacks = {}
api.redis.status = {
subscribed: false
}
api.redis.publish = async (payload) => {
const channel = api.config.general.channel
return api.redis.clients.client.publish(channel, JSON.stringify(payload))
}
api.redis.subscriptionHandlers['do'] = async (message) => {
if (!message.connectionId || (api.connections && api.connections.connections[message.connectionId])) {
let cmdParts = message.method.split('.')
let cmd = cmdParts.shift()
if (cmd !== 'api') { throw new Error('cannot operate on a method outside of the api object') }
let method = api.utils.dotProp.get(api, cmdParts.join('.'))
let args = message.args
if (args === null) { args = [] }
if (!Array.isArray(args)) { args = [args] }
if (method) {
let response = await method.apply(null, args)
await api.redis.respondCluster(message.requestId, response)
} else {
api.log('RPC method `' + cmdParts.join('.') + '` not found', 'warning')
}
}
}
api.redis.subscriptionHandlers.doResponse = function (message) {
if (api.redis.rpcCallbacks[message.requestId]) {
let {resolve, timer} = api.redis.rpcCallbacks[message.requestId]
clearTimeout(timer)
resolve(message.response)
delete api.redis.rpcCallbacks[message.requestId]
}
}
/**
* Invoke a command on all servers in this cluster.
*
* @async
* @param {string} method The method to call on the remote server.
* @param {Array} args The arguments to pass to `method`
* @param {string} connectionId (optional) Should this method only apply to a server which `connectionId` is connected to?
* @param {Boolean} waitForRespons (optional) Should we await a response from a remote server in the cluster?
* @return {Promise} The return value from the remote server.
*/
api.redis.doCluster = async (method, args, connectionId, waitForRespons) => {
if (waitForRespons === undefined || waitForRespons === null) { waitForRespons = false }
const requestId = uuid.v4()
const payload = {
messageType: 'do',
serverId: api.id,
serverToken: api.config.general.serverToken,
requestId: requestId,
method: method,
connectionId: connectionId,
args: args // [1,2,3]
}
await api.redis.publish(payload)
if (waitForRespons) {
let response = await new Promise((resolve, reject) => {
let timer = setTimeout(() => reject(new Error('RPC Timeout')), api.config.general.rpcTimeout)
api.redis.rpcCallbacks[requestId] = {timer, resolve, reject}
})
return response
}
}
api.redis.respondCluster = async (requestId, response) => {
const payload = {
messageType: 'doResponse',
serverId: api.id,
serverToken: api.config.general.serverToken,
requestId: requestId,
response: response // args to pass back, including error
}
await api.redis.publish(payload)
}
const connectionNames = ['client', 'subscriber', 'tasks']
for (var i in connectionNames) {
let r = connectionNames[i]
if (api.config.redis[r].buildNew === true) {
const args = api.config.redis[r].args
api.redis.clients[r] = new api.config.redis[r].konstructor(args[0], args[1], args[2]) // eslint-disable-line
api.redis.clients[r].on('error', (error) => { api.log(`Redis connection \`${r}\` error`, 'error', error) })
api.redis.clients[r].on('connect', () => { api.log(`Redis connection \`${r}\` connected`, 'debug') })
} else {
api.redis.clients[r] = api.config.redis[r].konstructor.apply(null, api.config.redis[r].args)
api.redis.clients[r].on('error', (error) => { api.log(`Redis connection \`${r}\` error`, 'error', error) })
api.log(`Redis connection \`${r}\` connected`, 'debug')
}
await api.redis.clients[r].get('_test')
}
if (!api.redis.status.subscribed) {
await api.redis.clients.subscriber.subscribe(api.config.general.channel)
api.redis.status.subscribed = true
const messageHandler = async (messageChannel, message) => {
try { message = JSON.parse(message) } catch (e) { message = {} }
if (messageChannel === api.config.general.channel && message.serverToken === api.config.general.serverToken) {
if (api.redis.subscriptionHandlers[message.messageType]) {
await api.redis.subscriptionHandlers[message.messageType](message)
}
}
}
api.redis.clients.subscriber.on('message', messageHandler)
}
}
async start () {
if (api.config.redis.enabled === false) {
api.log('redis is disabled', 'notice')
} else {
await api.redis.doCluster('api.log', [`actionhero member ${api.id} has joined the cluster`])
}
}
async stop () {
if (api.config.redis.enabled === false) { return }
api.redis.doCluster('api.log', [`actionhero member ${api.id} has left the cluster`])
await api.redis.clients.subscriber.unsubscribe()
api.redis.status.subscribed = false;
['client', 'subscriber', 'tasks'].forEach((r) => {
let client = api.redis.clients[r]
if (typeof client.quit === 'function') {
client.quit()
} else if (typeof client.end === 'function') {
client.end()
} else if (typeof client.disconnect === 'function') {
client.disconnect()
}
})
}
}