1 |
|
2 |
|
3 | const Bluebird = require('bluebird');
|
4 | const { EventEmitter } = require('events');
|
5 | const Debug = require('debug');
|
6 |
|
7 | const makeKnex = require('./util/make-knex');
|
8 |
|
9 | const debug = Debug('knex:tx');
|
10 |
|
11 | const { uniqueId, isUndefined } = require('lodash');
|
12 |
|
13 |
|
14 |
|
15 | class Transaction extends EventEmitter {
|
16 | constructor(client, container, config, outerTx) {
|
17 | super();
|
18 |
|
19 | const txid = (this.txid = uniqueId('trx'));
|
20 |
|
21 |
|
22 | if (!container) {
|
23 |
|
24 | if (!config || isUndefined(config.doNotRejectOnRollback)) {
|
25 | this.doNotRejectOnRollback = true;
|
26 | } else {
|
27 | this.doNotRejectOnRollback = config.doNotRejectOnRollback;
|
28 | }
|
29 |
|
30 | this.initPromise = new Promise((resolve, reject) => {
|
31 | this.initRejectFn = reject;
|
32 | container = (transactor) => {
|
33 | resolve(transactor);
|
34 | };
|
35 | });
|
36 | } else {
|
37 |
|
38 | if (!config || isUndefined(config.doNotRejectOnRollback)) {
|
39 | this.doNotRejectOnRollback = false;
|
40 | } else {
|
41 | this.doNotRejectOnRollback = config.doNotRejectOnRollback;
|
42 | }
|
43 | }
|
44 |
|
45 | this.client = client;
|
46 | this.logger = client.logger;
|
47 | this.outerTx = outerTx;
|
48 | this.trxClient = undefined;
|
49 | this._debug = client.config && client.config.debug;
|
50 |
|
51 | debug(
|
52 | '%s: Starting %s transaction',
|
53 | txid,
|
54 | outerTx ? 'nested' : 'top level'
|
55 | );
|
56 |
|
57 | this._promise = Bluebird.using(
|
58 | this.acquireConnection(client, config, txid),
|
59 | (connection) => {
|
60 | const trxClient = (this.trxClient = makeTxClient(
|
61 | this,
|
62 | client,
|
63 | connection
|
64 | ));
|
65 | const init = client.transacting
|
66 | ? this.savepoint(connection)
|
67 | : this.begin(connection);
|
68 | const executionPromise = new Bluebird((resolver, rejecter) => {
|
69 | this._resolver = resolver;
|
70 | this._rejecter = rejecter;
|
71 | });
|
72 |
|
73 | init
|
74 | .then(() => {
|
75 | return makeTransactor(this, connection, trxClient);
|
76 | })
|
77 | .then((transactor) => {
|
78 | if (this.initPromise) {
|
79 | transactor.executionPromise = executionPromise.catch((err) => {
|
80 | throw err;
|
81 | });
|
82 | } else {
|
83 | transactor.executionPromise = executionPromise;
|
84 | }
|
85 |
|
86 |
|
87 |
|
88 |
|
89 | let result;
|
90 | try {
|
91 | result = container(transactor);
|
92 | } catch (err) {
|
93 | result = Bluebird.reject(err);
|
94 | }
|
95 | if (result && result.then && typeof result.then === 'function') {
|
96 | result
|
97 | .then((val) => {
|
98 | return transactor.commit(val);
|
99 | })
|
100 | .catch((err) => {
|
101 | return transactor.rollback(err);
|
102 | });
|
103 | }
|
104 | return null;
|
105 | })
|
106 | .catch((e) => {
|
107 | return this._rejecter(e);
|
108 | });
|
109 |
|
110 | return executionPromise;
|
111 | }
|
112 | ).catch((err) => {
|
113 | if (this.initRejectFn) {
|
114 | this.initRejectFn(err);
|
115 | } else {
|
116 | throw err;
|
117 | }
|
118 | });
|
119 |
|
120 | this._completed = false;
|
121 |
|
122 |
|
123 |
|
124 |
|
125 |
|
126 | this._previousSibling = Bluebird.resolve(true);
|
127 | if (outerTx) {
|
128 | if (outerTx._lastChild) this._previousSibling = outerTx._lastChild;
|
129 | outerTx._lastChild = this._promise;
|
130 | }
|
131 | }
|
132 |
|
133 | isCompleted() {
|
134 | return (
|
135 | this._completed || (this.outerTx && this.outerTx.isCompleted()) || false
|
136 | );
|
137 | }
|
138 |
|
139 | begin(conn) {
|
140 | return this.query(conn, 'BEGIN;');
|
141 | }
|
142 |
|
143 | savepoint(conn) {
|
144 | return this.query(conn, `SAVEPOINT ${this.txid};`);
|
145 | }
|
146 |
|
147 | commit(conn, value) {
|
148 | return this.query(conn, 'COMMIT;', 1, value);
|
149 | }
|
150 |
|
151 | release(conn, value) {
|
152 | return this.query(conn, `RELEASE SAVEPOINT ${this.txid};`, 1, value);
|
153 | }
|
154 |
|
155 | rollback(conn, error) {
|
156 | return this.query(conn, 'ROLLBACK', 2, error)
|
157 | .timeout(5000)
|
158 | .catch(Bluebird.TimeoutError, () => {
|
159 | this._rejecter(error);
|
160 | });
|
161 | }
|
162 |
|
163 | rollbackTo(conn, error) {
|
164 | return this.query(conn, `ROLLBACK TO SAVEPOINT ${this.txid}`, 2, error)
|
165 | .timeout(5000)
|
166 | .catch(Bluebird.TimeoutError, () => {
|
167 | this._rejecter(error);
|
168 | });
|
169 | }
|
170 |
|
171 | query(conn, sql, status, value) {
|
172 | const q = this.trxClient
|
173 | .query(conn, sql)
|
174 | .catch((err) => {
|
175 | status = 2;
|
176 | value = err;
|
177 | this._completed = true;
|
178 | debug('%s error running transaction query', this.txid);
|
179 | })
|
180 | .then((res) => {
|
181 | if (status === 1) {
|
182 | this._resolver(value);
|
183 | }
|
184 | if (status === 2) {
|
185 | if (isUndefined(value)) {
|
186 | if (this.doNotRejectOnRollback && /^ROLLBACK\b/i.test(sql)) {
|
187 | this._resolver();
|
188 | return;
|
189 | }
|
190 |
|
191 | value = new Error(`Transaction rejected with non-error: ${value}`);
|
192 | }
|
193 | this._rejecter(value);
|
194 | }
|
195 | return res;
|
196 | });
|
197 | if (status === 1 || status === 2) {
|
198 | this._completed = true;
|
199 | }
|
200 | return q;
|
201 | }
|
202 |
|
203 | debug(enabled) {
|
204 | this._debug = arguments.length ? enabled : true;
|
205 | return this;
|
206 | }
|
207 |
|
208 |
|
209 |
|
210 |
|
211 | acquireConnection(client, config, txid) {
|
212 | const configConnection = config && config.connection;
|
213 | return new Bluebird((resolve, reject) => {
|
214 | try {
|
215 | resolve(configConnection || client.acquireConnection());
|
216 | } catch (e) {
|
217 | reject(e);
|
218 | }
|
219 | })
|
220 | .then(function(connection) {
|
221 | connection.__knexTxId = txid;
|
222 |
|
223 | return connection;
|
224 | })
|
225 | .disposer(function(connection) {
|
226 | if (!configConnection) {
|
227 | debug('%s: releasing connection', txid);
|
228 | client.releaseConnection(connection);
|
229 | } else {
|
230 | debug('%s: not releasing external connection', txid);
|
231 | }
|
232 | });
|
233 | }
|
234 | }
|
235 |
|
236 |
|
237 |
|
238 |
|
239 |
|
240 | function makeTransactor(trx, connection, trxClient) {
|
241 | const transactor = makeKnex(trxClient);
|
242 |
|
243 | transactor.withUserParams = () => {
|
244 | throw new Error(
|
245 | 'Cannot set user params on a transaction - it can only inherit params from main knex instance'
|
246 | );
|
247 | };
|
248 |
|
249 | transactor.isTransaction = true;
|
250 | transactor.userParams = trx.userParams || {};
|
251 |
|
252 | transactor.transaction = function(container, options) {
|
253 | if (!options) {
|
254 | options = { doNotRejectOnRollback: true };
|
255 | } else if (isUndefined(options.doNotRejectOnRollback)) {
|
256 | options.doNotRejectOnRollback = true;
|
257 | }
|
258 |
|
259 | if (container) {
|
260 | return trxClient.transaction(container, options, trx);
|
261 | } else {
|
262 | return new Promise((resolve, _reject) => {
|
263 | trxClient.transaction(
|
264 | (nestedTrx) => {
|
265 | resolve(nestedTrx);
|
266 | },
|
267 | options,
|
268 | trx
|
269 | );
|
270 | });
|
271 | }
|
272 | };
|
273 | transactor.savepoint = function(container, options) {
|
274 | return transactor.transaction(container, options);
|
275 | };
|
276 |
|
277 | if (trx.client.transacting) {
|
278 | transactor.commit = (value) => trx.release(connection, value);
|
279 | transactor.rollback = (error) => trx.rollbackTo(connection, error);
|
280 | } else {
|
281 | transactor.commit = (value) => trx.commit(connection, value);
|
282 | transactor.rollback = (error) => trx.rollback(connection, error);
|
283 | }
|
284 |
|
285 | transactor.isCompleted = () => trx.isCompleted();
|
286 |
|
287 | return transactor;
|
288 | }
|
289 |
|
290 |
|
291 |
|
292 | function makeTxClient(trx, client, connection) {
|
293 | const trxClient = Object.create(client.constructor.prototype);
|
294 | trxClient.version = client.version;
|
295 | trxClient.config = client.config;
|
296 | trxClient.driver = client.driver;
|
297 | trxClient.connectionSettings = client.connectionSettings;
|
298 | trxClient.transacting = true;
|
299 | trxClient.valueForUndefined = client.valueForUndefined;
|
300 | trxClient.logger = client.logger;
|
301 |
|
302 | trxClient.on('query', function(arg) {
|
303 | trx.emit('query', arg);
|
304 | client.emit('query', arg);
|
305 | });
|
306 |
|
307 | trxClient.on('query-error', function(err, obj) {
|
308 | trx.emit('query-error', err, obj);
|
309 | client.emit('query-error', err, obj);
|
310 | });
|
311 |
|
312 | trxClient.on('query-response', function(response, obj, builder) {
|
313 | trx.emit('query-response', response, obj, builder);
|
314 | client.emit('query-response', response, obj, builder);
|
315 | });
|
316 |
|
317 | const _query = trxClient.query;
|
318 | trxClient.query = function(conn, obj) {
|
319 | const completed = trx.isCompleted();
|
320 | return new Bluebird(function(resolve, reject) {
|
321 | try {
|
322 | if (conn !== connection)
|
323 | throw new Error('Invalid connection for transaction query.');
|
324 | if (completed) completedError(trx, obj);
|
325 | resolve(_query.call(trxClient, conn, obj));
|
326 | } catch (e) {
|
327 | reject(e);
|
328 | }
|
329 | });
|
330 | };
|
331 | const _stream = trxClient.stream;
|
332 | trxClient.stream = function(conn, obj, stream, options) {
|
333 | const completed = trx.isCompleted();
|
334 | return new Bluebird(function(resolve, reject) {
|
335 | try {
|
336 | if (conn !== connection)
|
337 | throw new Error('Invalid connection for transaction query.');
|
338 | if (completed) completedError(trx, obj);
|
339 | resolve(_stream.call(trxClient, conn, obj, stream, options));
|
340 | } catch (e) {
|
341 | reject(e);
|
342 | }
|
343 | });
|
344 | };
|
345 | trxClient.acquireConnection = function() {
|
346 | return Bluebird.resolve(connection);
|
347 | };
|
348 | trxClient.releaseConnection = function() {
|
349 | return Bluebird.resolve();
|
350 | };
|
351 |
|
352 | return trxClient;
|
353 | }
|
354 |
|
355 | function completedError(trx, obj) {
|
356 | const sql = typeof obj === 'string' ? obj : obj && obj.sql;
|
357 | debug('%s: Transaction completed: %s', trx.txid, sql);
|
358 | throw new Error(
|
359 | 'Transaction query already complete, run with DEBUG=knex:tx for more info'
|
360 | );
|
361 | }
|
362 |
|
363 | const promiseInterface = [
|
364 | 'then',
|
365 | 'bind',
|
366 | 'catch',
|
367 | 'finally',
|
368 | 'asCallback',
|
369 | 'spread',
|
370 | 'map',
|
371 | 'reduce',
|
372 | 'thenReturn',
|
373 | 'return',
|
374 | 'yield',
|
375 | 'ensure',
|
376 | 'exec',
|
377 | 'reflect',
|
378 | 'get',
|
379 | 'mapSeries',
|
380 | 'delay',
|
381 | ];
|
382 |
|
383 |
|
384 |
|
385 | promiseInterface.forEach(function(method) {
|
386 | Transaction.prototype[method] = function() {
|
387 | return this._promise[method].apply(this._promise, arguments);
|
388 | };
|
389 | });
|
390 |
|
391 | module.exports = Transaction;
|