1 | const redis = require('redis');
|
2 | const { promisifyAll } = require('bluebird');
|
3 | const _ = require('lodash');
|
4 |
|
5 | promisifyAll(redis.RedisClient.prototype);
|
6 | promisifyAll(redis.Multi.prototype);
|
7 |
|
8 | module.exports = configurationParam => {
|
9 | const CHANNEL_KEY_UPDATE = '__keyevent@0__:set';
|
10 |
|
11 | const configuration = generateConfiguration(configurationParam);
|
12 | const client = redis.createClient(configuration);
|
13 | const listener = redis.createClient(configuration);
|
14 | const databaseListeners = [];
|
15 | const channelListeners = [];
|
16 | const keyListeners = [];
|
17 | const subscribedChannels = [];
|
18 | const subscribedKeys = [];
|
19 |
|
20 | const clientOnReady = client => {
|
21 | const name = configurationParam.name || process.env.PLATFORM;
|
22 |
|
23 | if (name) {
|
24 | try {
|
25 | client.internal_send_command({ command: 'CLIENT', args: ['SETNAME', name] });
|
26 | } catch (error) {
|
27 | console.error('redis client set name error');
|
28 | }
|
29 | }
|
30 |
|
31 | setInterval(() => client.ping(), 60000);
|
32 | };
|
33 |
|
34 | const clientOnError = (client, error) => {
|
35 | console.error('redis error', error.message);
|
36 | };
|
37 |
|
38 | client.on('ready', () => clientOnReady(client));
|
39 | client.on('error', error => clientOnError(client, error));
|
40 |
|
41 | listener.on('ready', () => clientOnReady(listener));
|
42 | listener.on('error', error => clientOnError(listener, error));
|
43 |
|
44 | function toJSON(value) {
|
45 | let cache = [];
|
46 | const string = JSON.stringify(value, function(key, value) {
|
47 | if (typeof value === 'object' && value !== null) {
|
48 | if (cache.indexOf(value) !== -1) {
|
49 |
|
50 | return;
|
51 | }
|
52 |
|
53 | cache.push(value);
|
54 | }
|
55 | return value;
|
56 | });
|
57 | cache = null;
|
58 |
|
59 | return string;
|
60 | }
|
61 |
|
62 | function generateConfiguration(conf) {
|
63 | return _.defaults(conf, {
|
64 | retry_strategy: () => 1000,
|
65 | });
|
66 | }
|
67 |
|
68 | const delay = ms => new Promise(res => setTimeout(res, ms));
|
69 |
|
70 | const isConnected = async () => {
|
71 | if (client.connected) {
|
72 | return true;
|
73 | }
|
74 |
|
75 | await delay(1000);
|
76 | return client.connected;
|
77 | };
|
78 |
|
79 | const setValue = async (key, value, json = true) => {
|
80 | if (!(await isConnected())) {
|
81 | return;
|
82 | }
|
83 |
|
84 | client.setAsync(key, json ? toJSON(value) : value);
|
85 | };
|
86 |
|
87 | const setValueWithExpirationTime = async (key, value, json = true, expirationTime) => {
|
88 | if (!(await isConnected())) {
|
89 | return;
|
90 | }
|
91 | client.setAsync(key, json ? toJSON(value) : value, 'EX', expirationTime);
|
92 | };
|
93 |
|
94 | const getValue = async (key, json = true) => {
|
95 | try {
|
96 |
|
97 |
|
98 |
|
99 |
|
100 | const value = await client.getAsync(key);
|
101 | return json ? JSON.parse(value) : value;
|
102 | } catch (error) {
|
103 | return null;
|
104 | }
|
105 | };
|
106 | const mergeValue = async (key, value, json = true) => {
|
107 | const oldValue = await getValue(key);
|
108 | const mergedValue = Object.assign({}, oldValue, value);
|
109 | return setValue(key, mergedValue, json);
|
110 | };
|
111 |
|
112 | const expire = (key, time) => {
|
113 | client.expire(key, time);
|
114 | };
|
115 |
|
116 | const remove = key => {
|
117 | expire(key, 1);
|
118 | };
|
119 |
|
120 | const publish = (channel, message) => client.publishAsync(channel, message);
|
121 | const alreadySubscribedChannel = channel =>
|
122 | subscribedChannels.find(_channel => _channel === channel);
|
123 | const alreadySubscribedKey = key => subscribedKeys.find(_key => _key === key);
|
124 |
|
125 | const registerDatabaseListener = processMessage => {
|
126 | listener.subscribe(CHANNEL_KEY_UPDATE);
|
127 | databaseListeners.push({ channel: CHANNEL_KEY_UPDATE, processMessage });
|
128 | };
|
129 |
|
130 | const registerChannelListener = (channel, processMessage) => {
|
131 | if (!alreadySubscribedChannel(channel)) {
|
132 | subscribedChannels.push(channel);
|
133 | listener.subscribe(channel);
|
134 | }
|
135 | channelListeners.push({ channel, processMessage });
|
136 | };
|
137 |
|
138 | const registerKeyListener = (key, processMessage) => {
|
139 | if (!alreadySubscribedKey(key)) {
|
140 | subscribedKeys.push(key);
|
141 | listener.subscribe(CHANNEL_KEY_UPDATE, key);
|
142 | }
|
143 | keyListeners.push({ channel: CHANNEL_KEY_UPDATE, key, processMessage });
|
144 | };
|
145 |
|
146 | listener.on('message', async (channel, message) => {
|
147 | let registeredListeners = [];
|
148 | const messageKey = message;
|
149 |
|
150 | if (channel === CHANNEL_KEY_UPDATE) {
|
151 | registeredListeners = keyListeners.filter(keyListener => keyListener.key === messageKey);
|
152 | registeredListeners = registeredListeners.concat(databaseListeners);
|
153 |
|
154 | message = await getValue(messageKey);
|
155 | } else {
|
156 | registeredListeners = channelListeners.filter(listener => listener.channel === channel);
|
157 | }
|
158 |
|
159 | registeredListeners.forEach(({ channel, key, processMessage }) =>
|
160 | processMessage(message, key || messageKey, channel),
|
161 | );
|
162 | });
|
163 |
|
164 | return {
|
165 | setValue,
|
166 | setValueWithExpirationTime,
|
167 | getValue,
|
168 | remove,
|
169 | expire,
|
170 | mergeValue,
|
171 | publish,
|
172 | registerChannelListener,
|
173 | registerKeyListener,
|
174 | registerDatabaseListener,
|
175 | };
|
176 | };
|