1 | const mysql = require('mysql');
|
2 | const helper = require('think-helper');
|
3 | const assert = require('assert');
|
4 | const Debounce = require('think-debounce');
|
5 | const thinkInstance = require('think-instance');
|
6 |
|
7 | const debug = require('debug')('think-mysql');
|
8 | const debounceInstance = new Debounce();
|
9 | const QUERY = Symbol('think-mysql-query');
|
10 | const CONNECTION_LOST = Symbol('think-mysql-connection-lost');
|
11 |
|
12 | const defaultConfig = {
|
13 | port: 3306,
|
14 | host: '127.0.0.1',
|
15 | user: '',
|
16 | password: '',
|
17 | database: '',
|
18 | connectionLimit: 1,
|
19 | logger: console.log.bind(console),
|
20 | logConnect: false,
|
21 | logSql: false,
|
22 | acquireWaitTimeout: 0
|
23 | };
|
24 |
|
25 |
|
26 |
|
27 |
|
28 | const TRANSACTION = {
|
29 | start: 1,
|
30 | end: 2
|
31 | };
|
32 |
|
33 | const TRANSACTIONS = Symbol('transactions');
|
34 |
|
35 | class ThinkMysql {
|
36 | |
37 |
|
38 |
|
39 | constructor(config) {
|
40 | config = helper.extend({}, defaultConfig, config);
|
41 | this.config = config;
|
42 | this.maxRetryTimes = Math.max(config.connectionLimit + 1, 3);
|
43 | this.pool = mysql.createPool(helper.omit(config, 'logger,logConnect,logSql'));
|
44 |
|
45 | this.pool.on('acquire', connection => {
|
46 | debug(`acquire: Connection ${connection.threadId} acquired`);
|
47 | });
|
48 | this.pool.on('connection', () => {
|
49 | debug('connection: A new connection is made within the pool');
|
50 | });
|
51 | this.pool.on('enqueue', () => {
|
52 | debug('enqueue: Waiting for available connection slot');
|
53 | });
|
54 | this.pool.on('release', connection => {
|
55 | debug(`release: Connection ${connection.threadId} released`);
|
56 | });
|
57 |
|
58 |
|
59 | if (config.logConnect) {
|
60 | let connectionPath = '';
|
61 | if (config.socketPath) {
|
62 | connectionPath = config.socketPath;
|
63 | } else {
|
64 | connectionPath = `mysql://${config.user}:${config.password}@${config.host}:${config.port || 3306}/${config.database}`;
|
65 | }
|
66 | config.logger(connectionPath);
|
67 | }
|
68 | }
|
69 | |
70 |
|
71 |
|
72 | getConnection(connection) {
|
73 | if (connection && !connection[CONNECTION_LOST]) return Promise.resolve(connection);
|
74 | let promise;
|
75 | if (this.config.acquireWaitTimeout) {
|
76 | promise = new Promise((resolve, reject) => {
|
77 | this.pool.getConnection((err, connection) => {
|
78 | if (err) reject(err);
|
79 | else resolve(connection);
|
80 | clearTimeout(timer);
|
81 | });
|
82 | const timer = setTimeout(() => {
|
83 | const err = new Error('acquireWaitTimeout: ' + this.config.acquireWaitTimeout + 'ms');
|
84 | err.code = 'ACQUIRE_WAIT_TIMEOUT';
|
85 | reject(err);
|
86 | }, this.config.acquireWaitTimeout);
|
87 | });
|
88 | } else {
|
89 | promise = helper.promisify(this.pool.getConnection, this.pool)();
|
90 | }
|
91 | if (this.config.afterConnect) {
|
92 | return promise.then(connection => {
|
93 | return Promise.resolve(this.config.afterConnect(connection)).then(() => connection);
|
94 | });
|
95 | }
|
96 | return promise;
|
97 | }
|
98 | |
99 |
|
100 |
|
101 |
|
102 | startTrans(connection) {
|
103 | return this.getConnection(connection).then(connection => {
|
104 | if (connection[TRANSACTIONS] === undefined) {
|
105 | connection[TRANSACTIONS] = 0;
|
106 | }
|
107 | connection[TRANSACTIONS]++;
|
108 | if (connection[TRANSACTIONS] !== 1) return Promise.resolve();
|
109 |
|
110 | return this.query({
|
111 | sql: 'START TRANSACTION',
|
112 | transaction: TRANSACTION.start,
|
113 | debounce: false
|
114 | }, connection).then(() => connection);
|
115 | });
|
116 | }
|
117 | |
118 |
|
119 |
|
120 |
|
121 | commit(connection) {
|
122 | if (connection && connection[TRANSACTIONS]) {
|
123 | connection[TRANSACTIONS]--;
|
124 | if (connection[TRANSACTIONS] !== 0) return Promise.resolve();
|
125 | }
|
126 | return this.query({
|
127 | sql: 'COMMIT',
|
128 | transaction: TRANSACTION.end,
|
129 | debounce: false
|
130 | }, connection);
|
131 | }
|
132 | |
133 |
|
134 |
|
135 |
|
136 | rollback(connection) {
|
137 | if (connection && connection[TRANSACTIONS]) {
|
138 | connection[TRANSACTIONS]--;
|
139 | if (connection[TRANSACTIONS] !== 0) return Promise.resolve();
|
140 | }
|
141 | return this.query({
|
142 | sql: 'ROLLBACK',
|
143 | transaction: TRANSACTION.end,
|
144 | debounce: false
|
145 | }, connection);
|
146 | }
|
147 | |
148 |
|
149 |
|
150 |
|
151 |
|
152 | transaction(fn, connection) {
|
153 | assert(helper.isFunction(fn), 'fn must be a function');
|
154 | return this.getConnection(connection).then(connection => {
|
155 | return this.startTrans(connection).then(() => {
|
156 | return fn(connection);
|
157 | }).then(data => {
|
158 | return this.commit(connection).then(() => data);
|
159 | }).catch(err => {
|
160 | return this.rollback(connection).then(() => Promise.reject(err));
|
161 | });
|
162 | });
|
163 | }
|
164 | |
165 |
|
166 |
|
167 | [QUERY](sqlOptions, connection, startTime, times = 0) {
|
168 | const queryFn = helper.promisify(connection.query, connection);
|
169 | return queryFn(sqlOptions).catch(err => err).then(data => {
|
170 | this.releaseConnection(connection);
|
171 |
|
172 |
|
173 | if (helper.isError(data) && data.code === 'PROTOCOL_CONNECTION_LOST') {
|
174 | connection[CONNECTION_LOST] = true;
|
175 | if (times < this.maxRetryTimes) {
|
176 | return this.getConnection().then(connection => {
|
177 | return this[QUERY](sqlOptions, connection, startTime, times + 1);
|
178 | });
|
179 | }
|
180 | }
|
181 |
|
182 | if (this.config.logSql) {
|
183 | const endTime = Date.now();
|
184 | this.config.logger(`SQL: ${sqlOptions.sql}, Time: ${endTime - startTime}ms`);
|
185 | }
|
186 | if (helper.isError(data)) {
|
187 | data.message += `, SQL: ${sqlOptions.sql}`;
|
188 | return Promise.reject(data);
|
189 | }
|
190 | return data;
|
191 | });
|
192 | }
|
193 | |
194 |
|
195 |
|
196 | releaseConnection(connection) {
|
197 |
|
198 | if (connection.transaction !== TRANSACTION.start) {
|
199 |
|
200 | try {
|
201 | connection.release();
|
202 | } catch (e) {}
|
203 | }
|
204 | }
|
205 | |
206 |
|
207 |
|
208 |
|
209 |
|
210 |
|
211 |
|
212 |
|
213 |
|
214 | query(sqlOptions, connection) {
|
215 | if (helper.isString(sqlOptions)) {
|
216 | sqlOptions = {sql: sqlOptions};
|
217 | }
|
218 | if (sqlOptions.debounce === undefined) {
|
219 | if (this.config.debounce !== undefined) {
|
220 | sqlOptions.debounce = this.config.debounce;
|
221 | } else {
|
222 | sqlOptions.debounce = true;
|
223 | }
|
224 | }
|
225 | const startTime = Date.now();
|
226 | if (sqlOptions.debounce) {
|
227 | const key = JSON.stringify(sqlOptions);
|
228 | return debounceInstance.debounce(key, () => {
|
229 | return this.getConnection(connection).then(connection => {
|
230 | return this[QUERY](sqlOptions, connection, startTime);
|
231 | });
|
232 | });
|
233 | }
|
234 | return this.getConnection(connection).then(connection => {
|
235 |
|
236 | if (sqlOptions.transaction) {
|
237 | if (sqlOptions.transaction === TRANSACTION.start) {
|
238 | if (connection.transaction === TRANSACTION.start) return;
|
239 | } else if (sqlOptions.transaction === TRANSACTION.end) {
|
240 | if (connection.transaction !== TRANSACTION.start) {
|
241 | this.releaseConnection(connection);
|
242 | return;
|
243 | }
|
244 | }
|
245 | connection.transaction = sqlOptions.transaction;
|
246 | }
|
247 | return this[QUERY](sqlOptions, connection, startTime);
|
248 | });
|
249 | }
|
250 |
|
251 | |
252 |
|
253 |
|
254 |
|
255 |
|
256 | execute(sqlOptions, connection) {
|
257 | if (helper.isString(sqlOptions)) {
|
258 | sqlOptions = {sql: sqlOptions};
|
259 | }
|
260 | sqlOptions.debounce = false;
|
261 | return this.query(sqlOptions, connection);
|
262 | }
|
263 | |
264 |
|
265 |
|
266 |
|
267 | close() {
|
268 | return helper.promisify(this.pool.end, this.pool)();
|
269 | }
|
270 | }
|
271 | module.exports = thinkInstance(ThinkMysql);
|