UNPKG

11.7 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.SentinelIterator = void 0;
4const net_1 = require("net");
5const utils_1 = require("../../utils");
6const tls_1 = require("tls");
7const SentinelIterator_1 = require("./SentinelIterator");
8exports.SentinelIterator = SentinelIterator_1.default;
9const AbstractConnector_1 = require("../AbstractConnector");
10const Redis_1 = require("../../Redis");
11const FailoverDetector_1 = require("./FailoverDetector");
12const debug = (0, utils_1.Debug)("SentinelConnector");
13class SentinelConnector extends AbstractConnector_1.default {
14 constructor(options) {
15 super(options.disconnectTimeout);
16 this.options = options;
17 this.emitter = null;
18 this.failoverDetector = null;
19 if (!this.options.sentinels.length) {
20 throw new Error("Requires at least one sentinel to connect to.");
21 }
22 if (!this.options.name) {
23 throw new Error("Requires the name of master.");
24 }
25 this.sentinelIterator = new SentinelIterator_1.default(this.options.sentinels);
26 }
27 check(info) {
28 const roleMatches = !info.role || this.options.role === info.role;
29 if (!roleMatches) {
30 debug("role invalid, expected %s, but got %s", this.options.role, info.role);
31 // Start from the next item.
32 // Note that `reset` will move the cursor to the previous element,
33 // so we advance two steps here.
34 this.sentinelIterator.next();
35 this.sentinelIterator.next();
36 this.sentinelIterator.reset(true);
37 }
38 return roleMatches;
39 }
40 disconnect() {
41 super.disconnect();
42 if (this.failoverDetector) {
43 this.failoverDetector.cleanup();
44 }
45 }
46 connect(eventEmitter) {
47 this.connecting = true;
48 this.retryAttempts = 0;
49 let lastError;
50 const connectToNext = async () => {
51 const endpoint = this.sentinelIterator.next();
52 if (endpoint.done) {
53 this.sentinelIterator.reset(false);
54 const retryDelay = typeof this.options.sentinelRetryStrategy === "function"
55 ? this.options.sentinelRetryStrategy(++this.retryAttempts)
56 : null;
57 let errorMsg = typeof retryDelay !== "number"
58 ? "All sentinels are unreachable and retry is disabled."
59 : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.`;
60 if (lastError) {
61 errorMsg += ` Last error: ${lastError.message}`;
62 }
63 debug(errorMsg);
64 const error = new Error(errorMsg);
65 if (typeof retryDelay === "number") {
66 eventEmitter("error", error);
67 await new Promise((resolve) => setTimeout(resolve, retryDelay));
68 return connectToNext();
69 }
70 else {
71 throw error;
72 }
73 }
74 let resolved = null;
75 let err = null;
76 try {
77 resolved = await this.resolve(endpoint.value);
78 }
79 catch (error) {
80 err = error;
81 }
82 if (!this.connecting) {
83 throw new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG);
84 }
85 const endpointAddress = endpoint.value.host + ":" + endpoint.value.port;
86 if (resolved) {
87 debug("resolved: %s:%s from sentinel %s", resolved.host, resolved.port, endpointAddress);
88 if (this.options.enableTLSForSentinelMode && this.options.tls) {
89 Object.assign(resolved, this.options.tls);
90 this.stream = (0, tls_1.connect)(resolved);
91 this.stream.once("secureConnect", this.initFailoverDetector.bind(this));
92 }
93 else {
94 this.stream = (0, net_1.createConnection)(resolved);
95 this.stream.once("connect", this.initFailoverDetector.bind(this));
96 }
97 this.stream.once("error", (err) => {
98 this.firstError = err;
99 });
100 return this.stream;
101 }
102 else {
103 const errorMsg = err
104 ? "failed to connect to sentinel " +
105 endpointAddress +
106 " because " +
107 err.message
108 : "connected to sentinel " +
109 endpointAddress +
110 " successfully, but got an invalid reply: " +
111 resolved;
112 debug(errorMsg);
113 eventEmitter("sentinelError", new Error(errorMsg));
114 if (err) {
115 lastError = err;
116 }
117 return connectToNext();
118 }
119 };
120 return connectToNext();
121 }
122 async updateSentinels(client) {
123 if (!this.options.updateSentinels) {
124 return;
125 }
126 const result = await client.sentinel("sentinels", this.options.name);
127 if (!Array.isArray(result)) {
128 return;
129 }
130 result
131 .map(utils_1.packObject)
132 .forEach((sentinel) => {
133 const flags = sentinel.flags ? sentinel.flags.split(",") : [];
134 if (flags.indexOf("disconnected") === -1 &&
135 sentinel.ip &&
136 sentinel.port) {
137 const endpoint = this.sentinelNatResolve(addressResponseToAddress(sentinel));
138 if (this.sentinelIterator.add(endpoint)) {
139 debug("adding sentinel %s:%s", endpoint.host, endpoint.port);
140 }
141 }
142 });
143 debug("Updated internal sentinels: %s", this.sentinelIterator);
144 }
145 async resolveMaster(client) {
146 const result = await client.sentinel("get-master-addr-by-name", this.options.name);
147 await this.updateSentinels(client);
148 return this.sentinelNatResolve(Array.isArray(result)
149 ? { host: result[0], port: Number(result[1]) }
150 : null);
151 }
152 async resolveSlave(client) {
153 const result = await client.sentinel("slaves", this.options.name);
154 if (!Array.isArray(result)) {
155 return null;
156 }
157 const availableSlaves = result
158 .map(utils_1.packObject)
159 .filter((slave) => slave.flags && !slave.flags.match(/(disconnected|s_down|o_down)/));
160 return this.sentinelNatResolve(selectPreferredSentinel(availableSlaves, this.options.preferredSlaves));
161 }
162 sentinelNatResolve(item) {
163 if (!item || !this.options.natMap)
164 return item;
165 return this.options.natMap[`${item.host}:${item.port}`] || item;
166 }
167 connectToSentinel(endpoint, options) {
168 const redis = new Redis_1.default({
169 port: endpoint.port || 26379,
170 host: endpoint.host,
171 username: this.options.sentinelUsername || null,
172 password: this.options.sentinelPassword || null,
173 family: endpoint.family ||
174 // @ts-expect-error
175 ("path" in this.options && this.options.path
176 ? undefined
177 : // @ts-expect-error
178 this.options.family),
179 tls: this.options.sentinelTLS,
180 retryStrategy: null,
181 enableReadyCheck: false,
182 connectTimeout: this.options.connectTimeout,
183 commandTimeout: this.options.sentinelCommandTimeout,
184 ...options,
185 });
186 // @ts-expect-error
187 return redis;
188 }
189 async resolve(endpoint) {
190 const client = this.connectToSentinel(endpoint);
191 // ignore the errors since resolve* methods will handle them
192 client.on("error", noop);
193 try {
194 if (this.options.role === "slave") {
195 return await this.resolveSlave(client);
196 }
197 else {
198 return await this.resolveMaster(client);
199 }
200 }
201 finally {
202 client.disconnect();
203 }
204 }
205 async initFailoverDetector() {
206 var _a;
207 if (!this.options.failoverDetector) {
208 return;
209 }
210 // Move the current sentinel to the first position
211 this.sentinelIterator.reset(true);
212 const sentinels = [];
213 // In case of a large amount of sentinels, limit the number of concurrent connections
214 while (sentinels.length < this.options.sentinelMaxConnections) {
215 const { done, value } = this.sentinelIterator.next();
216 if (done) {
217 break;
218 }
219 const client = this.connectToSentinel(value, {
220 lazyConnect: true,
221 retryStrategy: this.options.sentinelReconnectStrategy,
222 });
223 client.on("reconnecting", () => {
224 var _a;
225 // Tests listen to this event
226 (_a = this.emitter) === null || _a === void 0 ? void 0 : _a.emit("sentinelReconnecting");
227 });
228 sentinels.push({ address: value, client });
229 }
230 this.sentinelIterator.reset(false);
231 if (this.failoverDetector) {
232 // Clean up previous detector
233 this.failoverDetector.cleanup();
234 }
235 this.failoverDetector = new FailoverDetector_1.FailoverDetector(this, sentinels);
236 await this.failoverDetector.subscribe();
237 // Tests listen to this event
238 (_a = this.emitter) === null || _a === void 0 ? void 0 : _a.emit("failoverSubscribed");
239 }
240}
241exports.default = SentinelConnector;
242function selectPreferredSentinel(availableSlaves, preferredSlaves) {
243 if (availableSlaves.length === 0) {
244 return null;
245 }
246 let selectedSlave;
247 if (typeof preferredSlaves === "function") {
248 selectedSlave = preferredSlaves(availableSlaves);
249 }
250 else if (preferredSlaves !== null && typeof preferredSlaves === "object") {
251 const preferredSlavesArray = Array.isArray(preferredSlaves)
252 ? preferredSlaves
253 : [preferredSlaves];
254 // sort by priority
255 preferredSlavesArray.sort((a, b) => {
256 // default the priority to 1
257 if (!a.prio) {
258 a.prio = 1;
259 }
260 if (!b.prio) {
261 b.prio = 1;
262 }
263 // lowest priority first
264 if (a.prio < b.prio) {
265 return -1;
266 }
267 if (a.prio > b.prio) {
268 return 1;
269 }
270 return 0;
271 });
272 // loop over preferred slaves and return the first match
273 for (let p = 0; p < preferredSlavesArray.length; p++) {
274 for (let a = 0; a < availableSlaves.length; a++) {
275 const slave = availableSlaves[a];
276 if (slave.ip === preferredSlavesArray[p].ip) {
277 if (slave.port === preferredSlavesArray[p].port) {
278 selectedSlave = slave;
279 break;
280 }
281 }
282 }
283 if (selectedSlave) {
284 break;
285 }
286 }
287 }
288 // if none of the preferred slaves are available, a random available slave is returned
289 if (!selectedSlave) {
290 selectedSlave = (0, utils_1.sample)(availableSlaves);
291 }
292 return addressResponseToAddress(selectedSlave);
293}
294function addressResponseToAddress(input) {
295 return { host: input.ip, port: Number(input.port) };
296}
297function noop() { }