UNPKG

13.6 kBJavaScriptView Raw
1"use strict";
2/*
3 * @adonisjs/lucid
4 *
5 * (c) Harminder Virk <virk@adonisjs.com>
6 *
7 * For the full copyright and license information, please view the LICENSE
8 * file that was distributed with this source code.
9 */
10var __importDefault = (this && this.__importDefault) || function (mod) {
11 return (mod && mod.__esModule) ? mod : { "default": mod };
12};
13Object.defineProperty(exports, "__esModule", { value: true });
14exports.Connection = void 0;
15const knex_1 = __importDefault(require("knex"));
16const events_1 = require("events");
17const utils_1 = require("@poppinss/utils");
18const knex_dynamic_connection_1 = require("knex-dynamic-connection");
19const helpers_1 = require("knex/lib/util/helpers");
20const Logger_1 = require("./Logger");
21/**
22 * Connection class manages a given database connection. Internally it uses
23 * knex to build the database connection with appropriate database
24 * driver.
25 */
26class Connection extends events_1.EventEmitter {
27 constructor(name, config, logger) {
28 super();
29 Object.defineProperty(this, "name", {
30 enumerable: true,
31 configurable: true,
32 writable: true,
33 value: name
34 });
35 Object.defineProperty(this, "config", {
36 enumerable: true,
37 configurable: true,
38 writable: true,
39 value: config
40 });
41 Object.defineProperty(this, "logger", {
42 enumerable: true,
43 configurable: true,
44 writable: true,
45 value: logger
46 });
47 /**
48 * Reference to knex. The instance is created once the `open`
49 * method is invoked
50 */
51 Object.defineProperty(this, "client", {
52 enumerable: true,
53 configurable: true,
54 writable: true,
55 value: void 0
56 });
57 /**
58 * Read client when read/write replicas are defined in the config, otherwise
59 * it is a reference to the `client`.
60 */
61 Object.defineProperty(this, "readClient", {
62 enumerable: true,
63 configurable: true,
64 writable: true,
65 value: void 0
66 });
67 /**
68 * Connection dialect name
69 */
70 Object.defineProperty(this, "dialectName", {
71 enumerable: true,
72 configurable: true,
73 writable: true,
74 value: (0, helpers_1.resolveClientNameWithAliases)(this.config.client)
75 });
76 /**
77 * A boolean to know if connection operates on read/write
78 * replicas
79 */
80 Object.defineProperty(this, "hasReadWriteReplicas", {
81 enumerable: true,
82 configurable: true,
83 writable: true,
84 value: !!(this.config.replicas &&
85 this.config.replicas.read &&
86 this.config.replicas.write)
87 });
88 /**
89 * Config for one or more read replicas. Only exists, when replicas are
90 * defined
91 */
92 Object.defineProperty(this, "readReplicas", {
93 enumerable: true,
94 configurable: true,
95 writable: true,
96 value: []
97 });
98 /**
99 * The round robin counter for reading config
100 */
101 Object.defineProperty(this, "roundRobinCounter", {
102 enumerable: true,
103 configurable: true,
104 writable: true,
105 value: 0
106 });
107 this.validateConfig();
108 }
109 /**
110 * Validates the config to ensure that read/write replicas are defined
111 * properly.
112 */
113 validateConfig() {
114 if (this.config.replicas) {
115 if (!this.config.replicas.read || !this.config.replicas.write) {
116 throw new utils_1.Exception('Make sure to define read/write replicas or use connection property', 500, 'E_INCOMPLETE_REPLICAS_CONFIG');
117 }
118 if (!this.config.replicas.read.connection || !this.config.replicas.read.connection) {
119 throw new utils_1.Exception('Make sure to define connection property inside read/write replicas', 500, 'E_INVALID_REPLICAS_CONFIG');
120 }
121 }
122 }
123 /**
124 * Cleanup references
125 */
126 cleanup() {
127 this.client = undefined;
128 this.readClient = undefined;
129 this.readReplicas = [];
130 this.roundRobinCounter = 0;
131 }
132 /**
133 * Does cleanup by removing knex reference and removing all listeners.
134 * For the same of simplicity, we get rid of both read and write
135 * clients, when anyone of them disconnects.
136 */
137 monitorPoolResources() {
138 /**
139 * Pool has destroyed and hence we must cleanup resources
140 * as well.
141 */
142 this.pool.on('poolDestroySuccess', () => {
143 this.logger.trace({ connection: this.name }, 'pool destroyed, cleaning up resource');
144 this.cleanup();
145 this.emit('disconnect', this);
146 this.removeAllListeners();
147 });
148 if (this.readPool !== this.pool) {
149 this.readPool.on('poolDestroySuccess', () => {
150 this.logger.trace({ connection: this.name }, 'pool destroyed, cleaning up resource');
151 this.cleanup();
152 this.emit('disconnect', this);
153 this.removeAllListeners();
154 });
155 }
156 }
157 /**
158 * Returns normalized config object for write replica to be
159 * used by knex
160 */
161 getWriteConfig() {
162 if (!this.config.replicas) {
163 return this.config;
164 }
165 const { replicas, ...config } = this.config;
166 /**
167 * Give preference to the replica write connection when and merge values from
168 * the main connection object when defined.
169 */
170 if (typeof replicas.write.connection === 'string' || typeof config.connection === 'string') {
171 config.connection = replicas.write.connection;
172 }
173 else {
174 config.connection = Object.assign({}, config.connection, replicas.write.connection);
175 }
176 /**
177 * Add pool to the config when pool config defined on main connection
178 * or the write replica
179 */
180 if (config.pool || replicas.write.pool) {
181 config.pool = Object.assign({}, config.pool, replicas.write.pool);
182 }
183 return config;
184 }
185 /**
186 * Returns the config for read replicas.
187 */
188 getReadConfig() {
189 if (!this.config.replicas) {
190 return this.config;
191 }
192 const { replicas, ...config } = this.config;
193 /**
194 * Reading replicas and storing them as a reference, so that we
195 * can pick a config from replicas as round robin.
196 */
197 this.readReplicas = replicas.read.connection.map((one) => {
198 if (typeof one === 'string' || typeof config.connection === 'string') {
199 return one;
200 }
201 else {
202 return Object.assign({}, config.connection, one);
203 }
204 });
205 /**
206 * Add database property on the main connection, since knexjs needs it
207 * internally
208 */
209 config.connection = {
210 database: this.readReplicas[0].database,
211 };
212 /**
213 * Add pool to the config when pool config defined on main connection
214 * or the read replica
215 */
216 if (config.pool || replicas.read.pool) {
217 config.pool = Object.assign({}, config.pool, replicas.read.pool);
218 }
219 return config;
220 }
221 /**
222 * Resolves connection config for the writer connection
223 */
224 writeConfigResolver(originalConfig) {
225 return originalConfig.connection;
226 }
227 /**
228 * Resolves connection config for the reader connection
229 */
230 readConfigResolver(originalConfig) {
231 if (!this.readReplicas.length) {
232 return originalConfig.connection;
233 }
234 const index = this.roundRobinCounter++ % this.readReplicas.length;
235 this.logger.trace({ connection: this.name }, `round robin using host at ${index} index`);
236 return this.readReplicas[index];
237 }
238 /**
239 * Creates the write connection.
240 */
241 setupWriteConnection() {
242 this.client = (0, knex_1.default)(Object.assign({ log: new Logger_1.Logger(this.name, this.logger) }, this.getWriteConfig(), {
243 debug: false,
244 }));
245 (0, knex_dynamic_connection_1.patchKnex)(this.client, this.writeConfigResolver.bind(this));
246 }
247 /**
248 * Creates the read connection. If there aren't any replicas in use, then
249 * it will use the write client instead.
250 */
251 setupReadConnection() {
252 if (!this.hasReadWriteReplicas) {
253 this.readClient = this.client;
254 return;
255 }
256 this.logger.trace({ connection: this.name }, 'setting up read/write replicas');
257 this.readClient = (0, knex_1.default)(Object.assign({ log: new Logger_1.Logger(this.name, this.logger) }, this.getReadConfig(), {
258 debug: false,
259 }));
260 (0, knex_dynamic_connection_1.patchKnex)(this.readClient, this.readConfigResolver.bind(this));
261 }
262 /**
263 * Checks all the read hosts by running a query on them. Stops
264 * after first error.
265 */
266 async checkReadHosts() {
267 const configCopy = Object.assign({ log: new Logger_1.Logger(this.name, this.logger) }, this.config, {
268 debug: false,
269 });
270 let error = null;
271 // eslint-disable-next-line @typescript-eslint/naming-convention
272 for (let _ of this.readReplicas) {
273 configCopy.connection = this.readConfigResolver(this.config);
274 this.logger.trace({ connection: this.name }, 'spawing health check read connection');
275 const client = (0, knex_1.default)(configCopy);
276 try {
277 if (this.dialectName === 'oracledb') {
278 await client.raw('SELECT 1 + 1 AS result FROM dual');
279 }
280 else {
281 await client.raw('SELECT 1 + 1 AS result');
282 }
283 }
284 catch (err) {
285 error = err;
286 }
287 /**
288 * Cleanup client connection
289 */
290 await client.destroy();
291 this.logger.trace({ connection: this.name }, 'destroying health check read connection');
292 /**
293 * Return early when there is an error
294 */
295 if (error) {
296 break;
297 }
298 }
299 return error;
300 }
301 /**
302 * Checks for the write host
303 */
304 async checkWriteHost() {
305 try {
306 if (this.dialectName === 'oracledb') {
307 await this.client.raw('SELECT 1 + 1 AS result FROM dual');
308 }
309 else {
310 await this.client.raw('SELECT 1 + 1 AS result');
311 }
312 }
313 catch (error) {
314 return error;
315 }
316 }
317 /**
318 * Returns the pool instance for the given connection
319 */
320 get pool() {
321 return this.client ? this.client.client.pool : null;
322 }
323 /**
324 * Returns the pool instance for the read connection. When replicas are
325 * not in use, then read/write pools are same.
326 */
327 get readPool() {
328 return this.readClient ? this.readClient.client.pool : null;
329 }
330 /**
331 * Returns a boolean indicating if the connection is ready for making
332 * database queries. If not, one must call `connect`.
333 */
334 get ready() {
335 return !!(this.client || this.readClient);
336 }
337 /**
338 * Opens the connection by creating knex instance
339 */
340 connect() {
341 try {
342 this.setupWriteConnection();
343 this.setupReadConnection();
344 this.monitorPoolResources();
345 this.emit('connect', this);
346 }
347 catch (error) {
348 this.emit('error', error, this);
349 throw error;
350 }
351 }
352 /**
353 * Closes DB connection by destroying knex instance. The `connection`
354 * object must be free for garbage collection.
355 *
356 * In case of error this method will emit `close:error` event followed
357 * by the `close` event.
358 */
359 async disconnect() {
360 this.logger.trace({ connection: this.name }, 'destroying connection');
361 /**
362 * Disconnect write client
363 */
364 if (this.client) {
365 try {
366 await this.client.destroy();
367 }
368 catch (error) {
369 this.emit('disconnect:error', error, this);
370 }
371 }
372 /**
373 * Disconnect read client when it exists and both clients
374 * aren't same
375 */
376 if (this.readClient && this.readClient !== this.client) {
377 try {
378 await this.readClient.destroy();
379 }
380 catch (error) {
381 this.emit('disconnect:error', error, this);
382 }
383 }
384 }
385 /**
386 * Returns the healthcheck report for the connection
387 */
388 async getReport() {
389 const error = await this.checkWriteHost();
390 let readError;
391 if (!error && this.hasReadWriteReplicas) {
392 readError = await this.checkReadHosts();
393 }
394 return {
395 connection: this.name,
396 message: readError
397 ? 'Unable to reach one of the read hosts'
398 : error
399 ? 'Unable to reach the database server'
400 : 'Connection is healthy',
401 error: error || readError || null,
402 };
403 }
404}
405exports.Connection = Connection;