UNPKG

8.3 kBJavaScriptView Raw
1var mysql = require('../');
2var Connection = require('./Connection');
3var EventEmitter = require('events').EventEmitter;
4var PoolConnection = require('./PoolConnection');
5var PoolConfig = require('./PoolConfig')
6
7class Pool extends EventEmitter {
8 constructor() {
9 super();
10 this._acquiringConnections = [];
11 this._allConnections = [];
12 this._freeConnections = [];
13 this._connectionQueue = [];
14 this._closed = false;
15 this.poolConfig = {
16 config: new PoolConfig({})
17 }
18 }
19
20 set poolConfig(options) {
21 this.config = options.config;
22 this.config.connectionConfig.pool = this;
23 }
24
25
26 getConnection(cb) {
27
28 if (this._closed) {
29 var err = new Error('Pool is closed.');
30 err.code = 'POOL_CLOSED';
31 process.nextTick(function () {
32 cb(err);
33 });
34 return;
35 }
36
37 var connection;
38 var pool = this;
39
40 if (this._freeConnections.length > 0) {
41 connection = this._freeConnections.shift();
42 this.acquireConnection(connection, cb);
43 return;
44 }
45
46 if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) {
47 connection = new PoolConnection(this, { config: this.config.newConnectionConfig() });
48
49 this._acquiringConnections.push(connection);
50 this._allConnections.push(connection);
51
52 connection.connect({ timeout: this.config.acquireTimeout }, function onConnect(err) {
53 spliceConnection(pool._acquiringConnections, connection);
54
55 if (pool._closed) {
56 err = new Error('Pool is closed.');
57 err.code = 'POOL_CLOSED';
58 }
59
60 if (err) {
61 pool._purgeConnection(connection);
62 cb(err);
63 return;
64 }
65
66 pool.emit('connection', connection);
67 pool.emit('acquire', connection);
68 cb(null, connection);
69 });
70 return;
71 }
72
73 if (!this.config.waitForConnections) {
74 process.nextTick(function () {
75 var err = new Error('No connections available.');
76 err.code = 'POOL_CONNLIMIT';
77 cb(err);
78 });
79 return;
80 }
81
82 this._enqueueCallback(cb);
83 };
84
85 acquireConnection(connection, cb) {
86 if (connection._pool !== this) {
87 throw new Error('Connection acquired from wrong pool.');
88 }
89
90 var changeUser = this._needsChangeUser(connection);
91 var pool = this;
92
93 this._acquiringConnections.push(connection);
94
95 function onOperationComplete(err) {
96 spliceConnection(pool._acquiringConnections, connection);
97
98 if (pool._closed) {
99 err = new Error('Pool is closed.');
100 err.code = 'POOL_CLOSED';
101 }
102
103 if (err) {
104 pool._connectionQueue.unshift(cb);
105 pool._purgeConnection(connection);
106 return;
107 }
108
109 if (changeUser) {
110 pool.emit('connection', connection);
111 }
112
113 pool.emit('acquire', connection);
114 cb(null, connection);
115 }
116
117 if (changeUser) {
118 // restore user back to pool configuration
119 connection.config = this.config.newConnectionConfig();
120 connection.changeUser({ timeout: this.config.acquireTimeout }, onOperationComplete);
121 } else {
122 // ping connection
123 connection.ping({ timeout: this.config.acquireTimeout }, onOperationComplete);
124 }
125 };
126
127 releaseConnection(connection) {
128
129 if (this._acquiringConnections.indexOf(connection) !== -1) {
130 // connection is being acquired
131 return;
132 }
133
134 if (connection._pool) {
135 if (connection._pool !== this) {
136 throw new Error('Connection released to wrong pool');
137 }
138
139 if (this._freeConnections.indexOf(connection) !== -1) {
140 // connection already in free connection pool
141 // this won't catch all double-release cases
142 throw new Error('Connection already released');
143 } else {
144 // add connection to end of free queue
145 this._freeConnections.push(connection);
146 this.emit('release', connection);
147 }
148 }
149
150 if (this._closed) {
151 // empty the connection queue
152 this._connectionQueue.splice(0).forEach(function (cb) {
153 var err = new Error('Pool is closed.');
154 err.code = 'POOL_CLOSED';
155 process.nextTick(function () {
156 cb(err);
157 });
158 });
159 } else if (this._connectionQueue.length) {
160 // get connection with next waiting callback
161 this.getConnection(this._connectionQueue.shift());
162 }
163 };
164
165 async end() {
166 this._closed = true;
167 let pool = this;
168 return await new Promise((resolve, reject) => {
169 var calledBack = false;
170 var waitingClose = 0;
171 var connectionLength = this._allConnections.length;
172 function onEnd(err) {
173 if (!calledBack && (err || --waitingClose <= 0)) {
174 pool._closed = false;//重启连接,方便下次重连
175 calledBack = true;
176 if(err){
177 reject({
178 code: -1,
179 msg: err
180 })
181 } else {
182 resolve({
183 code: 0,
184 msg: `成功关闭${connectionLength}条连接`
185 });
186 }
187 }
188 }
189
190 while (this._allConnections.length !== 0) {
191 waitingClose++;
192 this._purgeConnection(this._allConnections[0], onEnd);
193 }
194
195 if (waitingClose === 0) {
196 process.nextTick(onEnd);
197 }
198 });
199 };
200
201 async query(values) {
202 return await new Promise((resolve, reject) => {
203 var query = Connection.createQuery(values, (err, data) => {
204 if (err) {
205 reject(err);
206 } else {
207 resolve(data);
208 }
209 });
210
211 this.getConnection(function (err, conn) {
212 if (err) {
213 query.on('error', function () { });
214 query.end(err);
215 reject(err);
216 return;
217 }
218
219 // Release connection based off event
220 query.once('end', function () {
221 conn.release();
222 });
223
224 conn.query(query);
225 });
226 })
227 };
228
229 _enqueueCallback(callback) {
230
231 if (this.config.queueLimit && this._connectionQueue.length >= this.config.queueLimit) {
232 process.nextTick(function () {
233 var err = new Error('Queue limit reached.');
234 err.code = 'POOL_ENQUEUELIMIT';
235 callback(err);
236 });
237 return;
238 }
239
240 // Bind to domain, as dequeue will likely occur in a different domain
241 var cb = process.domain
242 ? process.domain.bind(callback)
243 : callback;
244
245 this._connectionQueue.push(cb);
246 this.emit('enqueue');
247 };
248
249 _needsChangeUser(connection) {
250 var connConfig = connection.config;
251 var poolConfig = this.config.connectionConfig;
252
253 // check if changeUser values are different
254 return connConfig.user !== poolConfig.user
255 || connConfig.database !== poolConfig.database
256 || connConfig.password !== poolConfig.password
257 || connConfig.charsetNumber !== poolConfig.charsetNumber;
258 };
259
260 _purgeConnection(connection, callback) {
261 var cb = callback || function () { };
262
263 if (connection.state === 'disconnected') {
264 connection.destroy();
265 }
266
267 this._removeConnection(connection);
268
269 if (connection.state !== 'disconnected' && !connection._protocol._quitSequence) {
270 connection._realEnd(cb);
271 return;
272 }
273
274 process.nextTick(cb);
275 };
276
277 _removeConnection(connection) {
278 connection._pool = null;
279
280 // Remove connection from all connections
281 spliceConnection(this._allConnections, connection);
282
283 // Remove connection from free connections
284 spliceConnection(this._freeConnections, connection);
285
286 this.releaseConnection(connection);
287 };
288
289 escape(value) {
290 return mysql.escape(value, this.config.connectionConfig.stringifyObjects, this.config.connectionConfig.timezone);
291 };
292
293 escapeId(value) {
294 return mysql.escapeId(value, false);
295 };
296}
297
298module.exports = new Pool();
299
300function spliceConnection(array, connection) {
301 var index;
302 if ((index = array.indexOf(connection)) !== -1) {
303 // Remove connection from all connections
304 array.splice(index, 1);
305 }
306}