UNPKG

8 kBJavaScriptView Raw
1const mysql = require('mysql');
2const helper = require('think-helper');
3const assert = require('assert');
4const Debounce = require('think-debounce');
5const thinkInstance = require('think-instance');
6
7const debug = require('debug')('think-mysql');
8const debounceInstance = new Debounce();
9const QUERY = Symbol('think-mysql-query');
10const CONNECTION_LOST = Symbol('think-mysql-connection-lost');
11
12const defaultConfig = {
13 port: 3306,
14 host: '127.0.0.1',
15 user: '',
16 password: '',
17 database: '',
18 connectionLimit: 1,
19 logger: console.log.bind(console), /* eslint no-console: ["error", { allow: ["log"] }] */
20 logConnect: false,
21 logSql: false,
22 acquireWaitTimeout: 0
23};
24
25/**
26 * transaction status
27 */
28const TRANSACTION = {
29 start: 1,
30 end: 2
31};
32
33const TRANSACTIONS = Symbol('transactions');
34
35class ThinkMysql {
36 /**
37 * @param {Object} config [connection options]
38 */
39 constructor(config) {
40 config = helper.extend({}, defaultConfig, config);
41 this.config = config;
42 this.maxRetryTimes = Math.max(config.connectionLimit + 1, 3);
43 this.pool = mysql.createPool(helper.omit(config, 'logger,logConnect,logSql'));
44
45 this.pool.on('acquire', connection => {
46 debug(`acquire: Connection ${connection.threadId} acquired`);
47 });
48 this.pool.on('connection', () => {
49 debug('connection: A new connection is made within the pool');
50 });
51 this.pool.on('enqueue', () => {
52 debug('enqueue: Waiting for available connection slot');
53 });
54 this.pool.on('release', connection => {
55 debug(`release: Connection ${connection.threadId} released`);
56 });
57
58 // log connect
59 if (config.logConnect) {
60 let connectionPath = '';
61 if (config.socketPath) {
62 connectionPath = config.socketPath;
63 } else {
64 connectionPath = `mysql://${config.user}:${config.password}@${config.host}:${config.port || 3306}/${config.database}`;
65 }
66 config.logger(connectionPath);
67 }
68 }
69 /**
70 * get connection
71 */
72 getConnection(connection) {
73 if (connection && !connection[CONNECTION_LOST]) return Promise.resolve(connection);
74 let promise;
75 if (this.config.acquireWaitTimeout) {
76 promise = new Promise((resolve, reject) => {
77 this.pool.getConnection((err, connection) => {
78 if (err) reject(err);
79 else resolve(connection);
80 clearTimeout(timer);
81 });
82 const timer = setTimeout(() => {
83 const err = new Error('acquireWaitTimeout: ' + this.config.acquireWaitTimeout + 'ms');
84 err.code = 'ACQUIRE_WAIT_TIMEOUT';
85 reject(err);
86 }, this.config.acquireWaitTimeout);
87 });
88 } else {
89 promise = helper.promisify(this.pool.getConnection, this.pool)();
90 }
91 if (this.config.afterConnect) {
92 return promise.then(connection => {
93 return Promise.resolve(this.config.afterConnect(connection)).then(() => connection);
94 });
95 }
96 return promise;
97 }
98 /**
99 * start transaction
100 * @param {Object} connection
101 */
102 startTrans(connection) {
103 return this.getConnection(connection).then(connection => {
104 if (connection[TRANSACTIONS] === undefined) {
105 connection[TRANSACTIONS] = 0;
106 }
107 connection[TRANSACTIONS]++;
108 if (connection[TRANSACTIONS] !== 1) return Promise.resolve();
109
110 return this.query({
111 sql: 'START TRANSACTION',
112 transaction: TRANSACTION.start,
113 debounce: false
114 }, connection).then(() => connection);
115 });
116 }
117 /**
118 * commit transaction
119 * @param {Object} connection
120 */
121 commit(connection) {
122 if (connection && connection[TRANSACTIONS]) {
123 connection[TRANSACTIONS]--;
124 if (connection[TRANSACTIONS] !== 0) return Promise.resolve();
125 }
126 return this.query({
127 sql: 'COMMIT',
128 transaction: TRANSACTION.end,
129 debounce: false
130 }, connection);
131 }
132 /**
133 * rollback transaction
134 * @param {Object} connection
135 */
136 rollback(connection) {
137 if (connection && connection[TRANSACTIONS]) {
138 connection[TRANSACTIONS]--;
139 if (connection[TRANSACTIONS] !== 0) return Promise.resolve();
140 }
141 return this.query({
142 sql: 'ROLLBACK',
143 transaction: TRANSACTION.end,
144 debounce: false
145 }, connection);
146 }
147 /**
148 * transaction
149 * @param {Function} fn
150 * @param {Object} connection
151 */
152 transaction(fn, connection) {
153 assert(helper.isFunction(fn), 'fn must be a function');
154 return this.getConnection(connection).then(connection => {
155 return this.startTrans(connection).then(() => {
156 return fn(connection);
157 }).then(data => {
158 return this.commit(connection).then(() => data);
159 }).catch(err => {
160 return this.rollback(connection).then(() => Promise.reject(err));
161 });
162 });
163 }
164 /**
165 * query data
166 */
167 [QUERY](sqlOptions, connection, startTime, times = 0) {
168 const queryFn = helper.promisify(connection.query, connection);
169 return queryFn(sqlOptions).catch(err => err).then(data => {
170 this.releaseConnection(connection);
171
172 // if server close connection, then retry it
173 if (helper.isError(data) && data.code === 'PROTOCOL_CONNECTION_LOST') {
174 connection[CONNECTION_LOST] = true;
175 if (times < this.maxRetryTimes) {
176 return this.getConnection().then(connection => {
177 return this[QUERY](sqlOptions, connection, startTime, times + 1);
178 });
179 }
180 }
181 // log sql
182 if (this.config.logSql) {
183 const endTime = Date.now();
184 this.config.logger(`SQL: ${sqlOptions.sql}, Time: ${endTime - startTime}ms`);
185 }
186 if (helper.isError(data)) {
187 data.message += `, SQL: ${sqlOptions.sql}`;
188 return Promise.reject(data);
189 }
190 return data;
191 });
192 }
193 /**
194 * release connection
195 */
196 releaseConnection(connection) {
197 // if not in transaction, release connection
198 if (connection.transaction !== TRANSACTION.start) {
199 // connection maybe already released, it will throw an Error
200 try {
201 connection.release();
202 } catch (e) {}
203 }
204 }
205 /**
206 * query({
207 * sql: 'SELECT * FROM `books` WHERE `author` = ?',
208 * timeout: 40000, // 40s
209 * values: ['David']
210 * })
211 * @param {Object} sqlOptions
212 * @param {Object} connection
213 */
214 query(sqlOptions, connection) {
215 if (helper.isString(sqlOptions)) {
216 sqlOptions = {sql: sqlOptions};
217 }
218 if (sqlOptions.debounce === undefined) {
219 if (this.config.debounce !== undefined) {
220 sqlOptions.debounce = this.config.debounce;
221 } else {
222 sqlOptions.debounce = true;
223 }
224 }
225 const startTime = Date.now();
226 if (sqlOptions.debounce) {
227 const key = JSON.stringify(sqlOptions);
228 return debounceInstance.debounce(key, () => {
229 return this.getConnection(connection).then(connection => {
230 return this[QUERY](sqlOptions, connection, startTime);
231 });
232 });
233 }
234 return this.getConnection(connection).then(connection => {
235 // set transaction status to connection
236 if (sqlOptions.transaction) {
237 if (sqlOptions.transaction === TRANSACTION.start) {
238 if (connection.transaction === TRANSACTION.start) return;
239 } else if (sqlOptions.transaction === TRANSACTION.end) {
240 if (connection.transaction !== TRANSACTION.start) {
241 this.releaseConnection(connection);
242 return;
243 }
244 }
245 connection.transaction = sqlOptions.transaction;
246 }
247 return this[QUERY](sqlOptions, connection, startTime);
248 });
249 }
250
251 /**
252 * execute
253 * @param {Array} args []
254 * @returns {Promise}
255 */
256 execute(sqlOptions, connection) {
257 if (helper.isString(sqlOptions)) {
258 sqlOptions = {sql: sqlOptions};
259 }
260 sqlOptions.debounce = false;
261 return this.query(sqlOptions, connection);
262 }
263 /**
264 * close
265 * @returns {Promise}
266 */
267 close() {
268 return helper.promisify(this.pool.end, this.pool)();
269 }
270}
271module.exports = thinkInstance(ThinkMysql);