UNPKG

4.97 kBJavaScriptView Raw
1const redis = require('redis');
2const { promisifyAll } = require('bluebird');
3const _ = require('lodash');
4
5promisifyAll(redis.RedisClient.prototype);
6promisifyAll(redis.Multi.prototype);
7
8module.exports = configurationParam => {
9 const CHANNEL_KEY_UPDATE = '__keyevent@0__:set';
10
11 const configuration = generateConfiguration(configurationParam);
12 const client = redis.createClient(configuration);
13 const listener = redis.createClient(configuration);
14 const databaseListeners = [];
15 const channelListeners = [];
16 const keyListeners = [];
17 const subscribedChannels = [];
18 const subscribedKeys = [];
19
20 const clientOnReady = client => {
21 const name = configurationParam.name || process.env.PLATFORM;
22
23 if (name) {
24 try {
25 client.internal_send_command({ command: 'CLIENT', args: ['SETNAME', name] });
26 } catch (error) {
27 console.error('redis client set name error');
28 }
29 }
30
31 setInterval(() => client.ping(), 60000);
32 };
33
34 const clientOnError = (client, error) => {
35 console.error('redis error', error.message);
36 };
37
38 client.on('ready', () => clientOnReady(client));
39 client.on('error', error => clientOnError(client, error));
40
41 listener.on('ready', () => clientOnReady(listener));
42 listener.on('error', error => clientOnError(listener, error));
43
44 function toJSON(value) {
45 let cache = [];
46 const string = JSON.stringify(value, function(key, value) {
47 if (typeof value === 'object' && value !== null) {
48 if (cache.indexOf(value) !== -1) {
49 // Duplicate reference found, discard key
50 return;
51 }
52 // Store value in our collection
53 cache.push(value);
54 }
55 return value;
56 });
57 cache = null; // Enable garbage collection
58
59 return string;
60 }
61
62 function generateConfiguration(conf) {
63 return _.defaults(conf, {
64 retry_strategy: () => 1000,
65 });
66 }
67
68 const delay = ms => new Promise(res => setTimeout(res, ms));
69
70 const isConnected = async () => {
71 if (client.connected) {
72 return true;
73 }
74
75 await delay(1000); // wait for redis connect
76 return client.connected;
77 };
78
79 const setValue = async (key, value, json = true) => {
80 if (!(await isConnected())) {
81 return;
82 }
83
84 client.setAsync(key, json ? toJSON(value) : value);
85 };
86
87 const setValueWithExpirationTime = async (key, value, json = true, expirationTime) => {
88 if (!(await isConnected())) {
89 return;
90 }
91 client.setAsync(key, json ? toJSON(value) : value, 'EX', expirationTime);
92 };
93
94 const getValue = async (key, json = true) => {
95 try {
96 // if (!await isConnected()) {
97 // return null;
98 // }
99
100 const value = await client.getAsync(key);
101 return json ? JSON.parse(value) : value;
102 } catch (error) {
103 return null;
104 }
105 };
106 const mergeValue = async (key, value, json = true) => {
107 const oldValue = await getValue(key);
108 const mergedValue = Object.assign({}, oldValue, value);
109 return setValue(key, mergedValue, json);
110 };
111
112 const expire = (key, time) => {
113 client.expire(key, time);
114 };
115
116 const remove = key => {
117 expire(key, 1);
118 };
119
120 const publish = (channel, message) => client.publishAsync(channel, message);
121 const alreadySubscribedChannel = channel =>
122 subscribedChannels.find(_channel => _channel === channel);
123 const alreadySubscribedKey = key => subscribedKeys.find(_key => _key === key);
124
125 const registerDatabaseListener = processMessage => {
126 listener.subscribe(CHANNEL_KEY_UPDATE);
127 databaseListeners.push({ channel: CHANNEL_KEY_UPDATE, processMessage });
128 };
129
130 const registerChannelListener = (channel, processMessage) => {
131 if (!alreadySubscribedChannel(channel)) {
132 subscribedChannels.push(channel);
133 listener.subscribe(channel);
134 }
135 channelListeners.push({ channel, processMessage });
136 };
137
138 const registerKeyListener = (key, processMessage) => {
139 if (!alreadySubscribedKey(key)) {
140 subscribedKeys.push(key);
141 listener.subscribe(CHANNEL_KEY_UPDATE, key);
142 }
143 keyListeners.push({ channel: CHANNEL_KEY_UPDATE, key, processMessage });
144 };
145
146 listener.on('message', async (channel, message) => {
147 let registeredListeners = [];
148 const messageKey = message;
149
150 if (channel === CHANNEL_KEY_UPDATE) {
151 registeredListeners = keyListeners.filter(keyListener => keyListener.key === messageKey);
152 registeredListeners = registeredListeners.concat(databaseListeners);
153
154 message = await getValue(messageKey);
155 } else {
156 registeredListeners = channelListeners.filter(listener => listener.channel === channel);
157 }
158
159 registeredListeners.forEach(({ channel, key, processMessage }) =>
160 processMessage(message, key || messageKey, channel),
161 );
162 });
163
164 return {
165 setValue,
166 setValueWithExpirationTime,
167 getValue,
168 remove,
169 expire,
170 mergeValue,
171 publish,
172 registerChannelListener,
173 registerKeyListener,
174 registerDatabaseListener,
175 };
176};