1 |
|
2 |
|
3 | const Bluebird = require('bluebird');
|
4 | const { EventEmitter } = require('events');
|
5 | const Debug = require('debug');
|
6 |
|
7 | const makeKnex = require('./util/make-knex');
|
8 |
|
9 | const debug = Debug('knex:tx');
|
10 |
|
11 | const { uniqueId, isUndefined } = require('lodash');
|
12 |
|
13 |
|
14 |
|
15 | class Transaction extends EventEmitter {
|
16 | constructor(client, container, config, outerTx) {
|
17 | super();
|
18 |
|
19 | const txid = (this.txid = uniqueId('trx'));
|
20 |
|
21 |
|
22 | if (!container) {
|
23 |
|
24 | if (!config || isUndefined(config.doNotRejectOnRollback)) {
|
25 | this.doNotRejectOnRollback = true;
|
26 | } else {
|
27 | this.doNotRejectOnRollback = config.doNotRejectOnRollback;
|
28 | }
|
29 |
|
30 | this.initPromise = new Promise((resolve, reject) => {
|
31 | this.initRejectFn = reject;
|
32 | container = (transactor) => {
|
33 | resolve(transactor);
|
34 | };
|
35 | });
|
36 | } else {
|
37 |
|
38 | if (!config || isUndefined(config.doNotRejectOnRollback)) {
|
39 | this.doNotRejectOnRollback = false;
|
40 | } else {
|
41 | this.doNotRejectOnRollback = config.doNotRejectOnRollback;
|
42 | }
|
43 | }
|
44 |
|
45 | this.client = client;
|
46 | this.logger = client.logger;
|
47 | this.outerTx = outerTx;
|
48 | this.trxClient = undefined;
|
49 | this._debug = client.config && client.config.debug;
|
50 |
|
51 | debug(
|
52 | '%s: Starting %s transaction',
|
53 | txid,
|
54 | outerTx ? 'nested' : 'top level'
|
55 | );
|
56 |
|
57 | this._promise = this.acquireConnection(config, (connection) => {
|
58 | const trxClient = (this.trxClient = makeTxClient(
|
59 | this,
|
60 | client,
|
61 | connection
|
62 | ));
|
63 | const init = client.transacting
|
64 | ? this.savepoint(connection)
|
65 | : this.begin(connection);
|
66 | const executionPromise = new Bluebird((resolver, rejecter) => {
|
67 | this._resolver = resolver;
|
68 | this._rejecter = rejecter;
|
69 | });
|
70 |
|
71 | init
|
72 | .then(() => {
|
73 | return makeTransactor(this, connection, trxClient);
|
74 | })
|
75 | .then((transactor) => {
|
76 | if (this.initPromise) {
|
77 | transactor.executionPromise = executionPromise.catch((err) => {
|
78 | throw err;
|
79 | });
|
80 | } else {
|
81 | transactor.executionPromise = executionPromise;
|
82 | }
|
83 |
|
84 |
|
85 |
|
86 |
|
87 | let result;
|
88 | try {
|
89 | result = container(transactor);
|
90 | } catch (err) {
|
91 | result = Bluebird.reject(err);
|
92 | }
|
93 | if (result && result.then && typeof result.then === 'function') {
|
94 | result
|
95 | .then((val) => {
|
96 | return transactor.commit(val);
|
97 | })
|
98 | .catch((err) => {
|
99 | return transactor.rollback(err);
|
100 | });
|
101 | }
|
102 | return null;
|
103 | })
|
104 | .catch((e) => {
|
105 | return this._rejecter(e);
|
106 | });
|
107 |
|
108 | return executionPromise;
|
109 | }).catch((err) => {
|
110 | if (this.initRejectFn) {
|
111 | this.initRejectFn(err);
|
112 | } else {
|
113 | throw err;
|
114 | }
|
115 | });
|
116 |
|
117 | this._completed = false;
|
118 |
|
119 |
|
120 |
|
121 |
|
122 |
|
123 | this._previousSibling = Bluebird.resolve(true);
|
124 | if (outerTx) {
|
125 | if (outerTx._lastChild) this._previousSibling = outerTx._lastChild;
|
126 | outerTx._lastChild = this._promise;
|
127 | }
|
128 | }
|
129 |
|
130 | isCompleted() {
|
131 | return (
|
132 | this._completed || (this.outerTx && this.outerTx.isCompleted()) || false
|
133 | );
|
134 | }
|
135 |
|
136 | begin(conn) {
|
137 | return this.query(conn, 'BEGIN;');
|
138 | }
|
139 |
|
140 | savepoint(conn) {
|
141 | return this.query(conn, `SAVEPOINT ${this.txid};`);
|
142 | }
|
143 |
|
144 | commit(conn, value) {
|
145 | return this.query(conn, 'COMMIT;', 1, value);
|
146 | }
|
147 |
|
148 | release(conn, value) {
|
149 | return this.query(conn, `RELEASE SAVEPOINT ${this.txid};`, 1, value);
|
150 | }
|
151 |
|
152 | rollback(conn, error) {
|
153 | return this.query(conn, 'ROLLBACK', 2, error)
|
154 | .timeout(5000)
|
155 | .catch(Bluebird.TimeoutError, () => {
|
156 | this._rejecter(error);
|
157 | });
|
158 | }
|
159 |
|
160 | rollbackTo(conn, error) {
|
161 | return this.query(conn, `ROLLBACK TO SAVEPOINT ${this.txid}`, 2, error)
|
162 | .timeout(5000)
|
163 | .catch(Bluebird.TimeoutError, () => {
|
164 | this._rejecter(error);
|
165 | });
|
166 | }
|
167 |
|
168 | query(conn, sql, status, value) {
|
169 | const q = this.trxClient
|
170 | .query(conn, sql)
|
171 | .catch((err) => {
|
172 | status = 2;
|
173 | value = err;
|
174 | this._completed = true;
|
175 | debug('%s error running transaction query', this.txid);
|
176 | })
|
177 | .then((res) => {
|
178 | if (status === 1) {
|
179 | this._resolver(value);
|
180 | }
|
181 | if (status === 2) {
|
182 | if (isUndefined(value)) {
|
183 | if (this.doNotRejectOnRollback && /^ROLLBACK\b/i.test(sql)) {
|
184 | this._resolver();
|
185 | return;
|
186 | }
|
187 |
|
188 | value = new Error(`Transaction rejected with non-error: ${value}`);
|
189 | }
|
190 | this._rejecter(value);
|
191 | }
|
192 | return res;
|
193 | });
|
194 | if (status === 1 || status === 2) {
|
195 | this._completed = true;
|
196 | }
|
197 | return q;
|
198 | }
|
199 |
|
200 | debug(enabled) {
|
201 | this._debug = arguments.length ? enabled : true;
|
202 | return this;
|
203 | }
|
204 |
|
205 |
|
206 |
|
207 |
|
208 | acquireConnection(config, cb) {
|
209 | const configConnection = config && config.connection;
|
210 | return new Bluebird((resolve, reject) => {
|
211 | try {
|
212 | resolve(configConnection || this.client.acquireConnection());
|
213 | } catch (e) {
|
214 | reject(e);
|
215 | }
|
216 | })
|
217 | .then((connection) => {
|
218 | connection.__knexTxId = this.txid;
|
219 |
|
220 | return (this._previousSibling
|
221 | ? this._previousSibling.catch(() => {})
|
222 | : Promise.resolve()
|
223 | ).then(function() {
|
224 | return connection;
|
225 | });
|
226 | })
|
227 | .then(async (connection) => {
|
228 | try {
|
229 | return await cb(connection);
|
230 | } finally {
|
231 | if (!configConnection) {
|
232 | debug('%s: releasing connection', this.txid);
|
233 | this.client.releaseConnection(connection);
|
234 | } else {
|
235 | debug('%s: not releasing external connection', this.txid);
|
236 | }
|
237 | }
|
238 | });
|
239 | }
|
240 | }
|
241 |
|
242 |
|
243 |
|
244 |
|
245 |
|
246 | function makeTransactor(trx, connection, trxClient) {
|
247 | const transactor = makeKnex(trxClient);
|
248 |
|
249 | transactor.withUserParams = () => {
|
250 | throw new Error(
|
251 | 'Cannot set user params on a transaction - it can only inherit params from main knex instance'
|
252 | );
|
253 | };
|
254 |
|
255 | transactor.isTransaction = true;
|
256 | transactor.userParams = trx.userParams || {};
|
257 |
|
258 | transactor.transaction = function(container, options) {
|
259 | if (!options) {
|
260 | options = { doNotRejectOnRollback: true };
|
261 | } else if (isUndefined(options.doNotRejectOnRollback)) {
|
262 | options.doNotRejectOnRollback = true;
|
263 | }
|
264 |
|
265 | if (container) {
|
266 | return trxClient.transaction(container, options, trx);
|
267 | } else {
|
268 | return new Promise((resolve, _reject) => {
|
269 | trxClient.transaction(
|
270 | (nestedTrx) => {
|
271 | resolve(nestedTrx);
|
272 | },
|
273 | options,
|
274 | trx
|
275 | );
|
276 | });
|
277 | }
|
278 | };
|
279 | transactor.savepoint = function(container, options) {
|
280 | return transactor.transaction(container, options);
|
281 | };
|
282 |
|
283 | if (trx.client.transacting) {
|
284 | transactor.commit = (value) => trx.release(connection, value);
|
285 | transactor.rollback = (error) => trx.rollbackTo(connection, error);
|
286 | } else {
|
287 | transactor.commit = (value) => trx.commit(connection, value);
|
288 | transactor.rollback = (error) => trx.rollback(connection, error);
|
289 | }
|
290 |
|
291 | transactor.isCompleted = () => trx.isCompleted();
|
292 |
|
293 | return transactor;
|
294 | }
|
295 |
|
296 |
|
297 |
|
298 | function makeTxClient(trx, client, connection) {
|
299 | const trxClient = Object.create(client.constructor.prototype);
|
300 | trxClient.version = client.version;
|
301 | trxClient.config = client.config;
|
302 | trxClient.driver = client.driver;
|
303 | trxClient.connectionSettings = client.connectionSettings;
|
304 | trxClient.transacting = true;
|
305 | trxClient.valueForUndefined = client.valueForUndefined;
|
306 | trxClient.logger = client.logger;
|
307 |
|
308 | trxClient.on('query', function(arg) {
|
309 | trx.emit('query', arg);
|
310 | client.emit('query', arg);
|
311 | });
|
312 |
|
313 | trxClient.on('query-error', function(err, obj) {
|
314 | trx.emit('query-error', err, obj);
|
315 | client.emit('query-error', err, obj);
|
316 | });
|
317 |
|
318 | trxClient.on('query-response', function(response, obj, builder) {
|
319 | trx.emit('query-response', response, obj, builder);
|
320 | client.emit('query-response', response, obj, builder);
|
321 | });
|
322 |
|
323 | const _query = trxClient.query;
|
324 | trxClient.query = function(conn, obj) {
|
325 | const completed = trx.isCompleted();
|
326 | return new Bluebird(function(resolve, reject) {
|
327 | try {
|
328 | if (conn !== connection)
|
329 | throw new Error('Invalid connection for transaction query.');
|
330 | if (completed) completedError(trx, obj);
|
331 | resolve(_query.call(trxClient, conn, obj));
|
332 | } catch (e) {
|
333 | reject(e);
|
334 | }
|
335 | });
|
336 | };
|
337 | const _stream = trxClient.stream;
|
338 | trxClient.stream = function(conn, obj, stream, options) {
|
339 | const completed = trx.isCompleted();
|
340 | return new Bluebird(function(resolve, reject) {
|
341 | try {
|
342 | if (conn !== connection)
|
343 | throw new Error('Invalid connection for transaction query.');
|
344 | if (completed) completedError(trx, obj);
|
345 | resolve(_stream.call(trxClient, conn, obj, stream, options));
|
346 | } catch (e) {
|
347 | reject(e);
|
348 | }
|
349 | });
|
350 | };
|
351 | trxClient.acquireConnection = function() {
|
352 | return Bluebird.resolve(connection);
|
353 | };
|
354 | trxClient.releaseConnection = function() {
|
355 | return Bluebird.resolve();
|
356 | };
|
357 |
|
358 | return trxClient;
|
359 | }
|
360 |
|
361 | function completedError(trx, obj) {
|
362 | const sql = typeof obj === 'string' ? obj : obj && obj.sql;
|
363 | debug('%s: Transaction completed: %s', trx.txid, sql);
|
364 | throw new Error(
|
365 | 'Transaction query already complete, run with DEBUG=knex:tx for more info'
|
366 | );
|
367 | }
|
368 |
|
369 | const promiseInterface = [
|
370 | 'then',
|
371 | 'bind',
|
372 | 'catch',
|
373 | 'finally',
|
374 | 'asCallback',
|
375 | 'spread',
|
376 | 'map',
|
377 | 'reduce',
|
378 | 'thenReturn',
|
379 | 'return',
|
380 | 'yield',
|
381 | 'ensure',
|
382 | 'exec',
|
383 | 'reflect',
|
384 | 'get',
|
385 | 'mapSeries',
|
386 | 'delay',
|
387 | ];
|
388 |
|
389 |
|
390 |
|
391 | promiseInterface.forEach(function(method) {
|
392 | Transaction.prototype[method] = function() {
|
393 | return this._promise[method].apply(this._promise, arguments);
|
394 | };
|
395 | });
|
396 |
|
397 | module.exports = Transaction;
|