1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.SentinelIterator = void 0;
|
4 | const net_1 = require("net");
|
5 | const utils_1 = require("../../utils");
|
6 | const tls_1 = require("tls");
|
7 | const SentinelIterator_1 = require("./SentinelIterator");
|
8 | exports.SentinelIterator = SentinelIterator_1.default;
|
9 | const AbstractConnector_1 = require("../AbstractConnector");
|
10 | const Redis_1 = require("../../Redis");
|
11 | const FailoverDetector_1 = require("./FailoverDetector");
|
12 | const debug = (0, utils_1.Debug)("SentinelConnector");
|
13 | class SentinelConnector extends AbstractConnector_1.default {
|
14 | constructor(options) {
|
15 | super(options.disconnectTimeout);
|
16 | this.options = options;
|
17 | this.emitter = null;
|
18 | this.failoverDetector = null;
|
19 | if (!this.options.sentinels.length) {
|
20 | throw new Error("Requires at least one sentinel to connect to.");
|
21 | }
|
22 | if (!this.options.name) {
|
23 | throw new Error("Requires the name of master.");
|
24 | }
|
25 | this.sentinelIterator = new SentinelIterator_1.default(this.options.sentinels);
|
26 | }
|
27 | check(info) {
|
28 | const roleMatches = !info.role || this.options.role === info.role;
|
29 | if (!roleMatches) {
|
30 | debug("role invalid, expected %s, but got %s", this.options.role, info.role);
|
31 |
|
32 |
|
33 |
|
34 | this.sentinelIterator.next();
|
35 | this.sentinelIterator.next();
|
36 | this.sentinelIterator.reset(true);
|
37 | }
|
38 | return roleMatches;
|
39 | }
|
40 | disconnect() {
|
41 | super.disconnect();
|
42 | if (this.failoverDetector) {
|
43 | this.failoverDetector.cleanup();
|
44 | }
|
45 | }
|
46 | connect(eventEmitter) {
|
47 | this.connecting = true;
|
48 | this.retryAttempts = 0;
|
49 | let lastError;
|
50 | const connectToNext = async () => {
|
51 | const endpoint = this.sentinelIterator.next();
|
52 | if (endpoint.done) {
|
53 | this.sentinelIterator.reset(false);
|
54 | const retryDelay = typeof this.options.sentinelRetryStrategy === "function"
|
55 | ? this.options.sentinelRetryStrategy(++this.retryAttempts)
|
56 | : null;
|
57 | let errorMsg = typeof retryDelay !== "number"
|
58 | ? "All sentinels are unreachable and retry is disabled."
|
59 | : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.`;
|
60 | if (lastError) {
|
61 | errorMsg += ` Last error: ${lastError.message}`;
|
62 | }
|
63 | debug(errorMsg);
|
64 | const error = new Error(errorMsg);
|
65 | if (typeof retryDelay === "number") {
|
66 | eventEmitter("error", error);
|
67 | await new Promise((resolve) => setTimeout(resolve, retryDelay));
|
68 | return connectToNext();
|
69 | }
|
70 | else {
|
71 | throw error;
|
72 | }
|
73 | }
|
74 | let resolved = null;
|
75 | let err = null;
|
76 | try {
|
77 | resolved = await this.resolve(endpoint.value);
|
78 | }
|
79 | catch (error) {
|
80 | err = error;
|
81 | }
|
82 | if (!this.connecting) {
|
83 | throw new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG);
|
84 | }
|
85 | const endpointAddress = endpoint.value.host + ":" + endpoint.value.port;
|
86 | if (resolved) {
|
87 | debug("resolved: %s:%s from sentinel %s", resolved.host, resolved.port, endpointAddress);
|
88 | if (this.options.enableTLSForSentinelMode && this.options.tls) {
|
89 | Object.assign(resolved, this.options.tls);
|
90 | this.stream = (0, tls_1.connect)(resolved);
|
91 | this.stream.once("secureConnect", this.initFailoverDetector.bind(this));
|
92 | }
|
93 | else {
|
94 | this.stream = (0, net_1.createConnection)(resolved);
|
95 | this.stream.once("connect", this.initFailoverDetector.bind(this));
|
96 | }
|
97 | this.stream.once("error", (err) => {
|
98 | this.firstError = err;
|
99 | });
|
100 | return this.stream;
|
101 | }
|
102 | else {
|
103 | const errorMsg = err
|
104 | ? "failed to connect to sentinel " +
|
105 | endpointAddress +
|
106 | " because " +
|
107 | err.message
|
108 | : "connected to sentinel " +
|
109 | endpointAddress +
|
110 | " successfully, but got an invalid reply: " +
|
111 | resolved;
|
112 | debug(errorMsg);
|
113 | eventEmitter("sentinelError", new Error(errorMsg));
|
114 | if (err) {
|
115 | lastError = err;
|
116 | }
|
117 | return connectToNext();
|
118 | }
|
119 | };
|
120 | return connectToNext();
|
121 | }
|
122 | async updateSentinels(client) {
|
123 | if (!this.options.updateSentinels) {
|
124 | return;
|
125 | }
|
126 | const result = await client.sentinel("sentinels", this.options.name);
|
127 | if (!Array.isArray(result)) {
|
128 | return;
|
129 | }
|
130 | result
|
131 | .map(utils_1.packObject)
|
132 | .forEach((sentinel) => {
|
133 | const flags = sentinel.flags ? sentinel.flags.split(",") : [];
|
134 | if (flags.indexOf("disconnected") === -1 &&
|
135 | sentinel.ip &&
|
136 | sentinel.port) {
|
137 | const endpoint = this.sentinelNatResolve(addressResponseToAddress(sentinel));
|
138 | if (this.sentinelIterator.add(endpoint)) {
|
139 | debug("adding sentinel %s:%s", endpoint.host, endpoint.port);
|
140 | }
|
141 | }
|
142 | });
|
143 | debug("Updated internal sentinels: %s", this.sentinelIterator);
|
144 | }
|
145 | async resolveMaster(client) {
|
146 | const result = await client.sentinel("get-master-addr-by-name", this.options.name);
|
147 | await this.updateSentinels(client);
|
148 | return this.sentinelNatResolve(Array.isArray(result)
|
149 | ? { host: result[0], port: Number(result[1]) }
|
150 | : null);
|
151 | }
|
152 | async resolveSlave(client) {
|
153 | const result = await client.sentinel("slaves", this.options.name);
|
154 | if (!Array.isArray(result)) {
|
155 | return null;
|
156 | }
|
157 | const availableSlaves = result
|
158 | .map(utils_1.packObject)
|
159 | .filter((slave) => slave.flags && !slave.flags.match(/(disconnected|s_down|o_down)/));
|
160 | return this.sentinelNatResolve(selectPreferredSentinel(availableSlaves, this.options.preferredSlaves));
|
161 | }
|
162 | sentinelNatResolve(item) {
|
163 | if (!item || !this.options.natMap)
|
164 | return item;
|
165 | return this.options.natMap[`${item.host}:${item.port}`] || item;
|
166 | }
|
167 | connectToSentinel(endpoint, options) {
|
168 | const redis = new Redis_1.default({
|
169 | port: endpoint.port || 26379,
|
170 | host: endpoint.host,
|
171 | username: this.options.sentinelUsername || null,
|
172 | password: this.options.sentinelPassword || null,
|
173 | family: endpoint.family ||
|
174 |
|
175 | ("path" in this.options && this.options.path
|
176 | ? undefined
|
177 | :
|
178 | this.options.family),
|
179 | tls: this.options.sentinelTLS,
|
180 | retryStrategy: null,
|
181 | enableReadyCheck: false,
|
182 | connectTimeout: this.options.connectTimeout,
|
183 | commandTimeout: this.options.sentinelCommandTimeout,
|
184 | ...options,
|
185 | });
|
186 |
|
187 | return redis;
|
188 | }
|
189 | async resolve(endpoint) {
|
190 | const client = this.connectToSentinel(endpoint);
|
191 |
|
192 | client.on("error", noop);
|
193 | try {
|
194 | if (this.options.role === "slave") {
|
195 | return await this.resolveSlave(client);
|
196 | }
|
197 | else {
|
198 | return await this.resolveMaster(client);
|
199 | }
|
200 | }
|
201 | finally {
|
202 | client.disconnect();
|
203 | }
|
204 | }
|
205 | async initFailoverDetector() {
|
206 | var _a;
|
207 | if (!this.options.failoverDetector) {
|
208 | return;
|
209 | }
|
210 |
|
211 | this.sentinelIterator.reset(true);
|
212 | const sentinels = [];
|
213 |
|
214 | while (sentinels.length < this.options.sentinelMaxConnections) {
|
215 | const { done, value } = this.sentinelIterator.next();
|
216 | if (done) {
|
217 | break;
|
218 | }
|
219 | const client = this.connectToSentinel(value, {
|
220 | lazyConnect: true,
|
221 | retryStrategy: this.options.sentinelReconnectStrategy,
|
222 | });
|
223 | client.on("reconnecting", () => {
|
224 | var _a;
|
225 |
|
226 | (_a = this.emitter) === null || _a === void 0 ? void 0 : _a.emit("sentinelReconnecting");
|
227 | });
|
228 | sentinels.push({ address: value, client });
|
229 | }
|
230 | this.sentinelIterator.reset(false);
|
231 | if (this.failoverDetector) {
|
232 |
|
233 | this.failoverDetector.cleanup();
|
234 | }
|
235 | this.failoverDetector = new FailoverDetector_1.FailoverDetector(this, sentinels);
|
236 | await this.failoverDetector.subscribe();
|
237 |
|
238 | (_a = this.emitter) === null || _a === void 0 ? void 0 : _a.emit("failoverSubscribed");
|
239 | }
|
240 | }
|
241 | exports.default = SentinelConnector;
|
242 | function selectPreferredSentinel(availableSlaves, preferredSlaves) {
|
243 | if (availableSlaves.length === 0) {
|
244 | return null;
|
245 | }
|
246 | let selectedSlave;
|
247 | if (typeof preferredSlaves === "function") {
|
248 | selectedSlave = preferredSlaves(availableSlaves);
|
249 | }
|
250 | else if (preferredSlaves !== null && typeof preferredSlaves === "object") {
|
251 | const preferredSlavesArray = Array.isArray(preferredSlaves)
|
252 | ? preferredSlaves
|
253 | : [preferredSlaves];
|
254 |
|
255 | preferredSlavesArray.sort((a, b) => {
|
256 |
|
257 | if (!a.prio) {
|
258 | a.prio = 1;
|
259 | }
|
260 | if (!b.prio) {
|
261 | b.prio = 1;
|
262 | }
|
263 |
|
264 | if (a.prio < b.prio) {
|
265 | return -1;
|
266 | }
|
267 | if (a.prio > b.prio) {
|
268 | return 1;
|
269 | }
|
270 | return 0;
|
271 | });
|
272 |
|
273 | for (let p = 0; p < preferredSlavesArray.length; p++) {
|
274 | for (let a = 0; a < availableSlaves.length; a++) {
|
275 | const slave = availableSlaves[a];
|
276 | if (slave.ip === preferredSlavesArray[p].ip) {
|
277 | if (slave.port === preferredSlavesArray[p].port) {
|
278 | selectedSlave = slave;
|
279 | break;
|
280 | }
|
281 | }
|
282 | }
|
283 | if (selectedSlave) {
|
284 | break;
|
285 | }
|
286 | }
|
287 | }
|
288 |
|
289 | if (!selectedSlave) {
|
290 | selectedSlave = (0, utils_1.sample)(availableSlaves);
|
291 | }
|
292 | return addressResponseToAddress(selectedSlave);
|
293 | }
|
294 | function addressResponseToAddress(input) {
|
295 | return { host: input.ip, port: Number(input.port) };
|
296 | }
|
297 | function noop() { }
|