UNPKG

11.6 kBJavaScriptView Raw
1// Transaction
2// -------
3const Bluebird = require('bluebird');
4const { EventEmitter } = require('events');
5const Debug = require('debug');
6
7const makeKnex = require('./util/make-knex');
8
9const debug = Debug('knex:tx');
10
11const { uniqueId, isUndefined } = require('lodash');
12
13// Acts as a facade for a Promise, keeping the internal state
14// and managing any child transactions.
15class Transaction extends EventEmitter {
16 constructor(client, container, config, outerTx) {
17 super();
18
19 const txid = (this.txid = uniqueId('trx'));
20
21 // If there is no container provided, assume user wants to get instance of transaction and use it directly
22 if (!container) {
23 // Default behaviour for new style of transactions is not to reject on rollback
24 if (!config || isUndefined(config.doNotRejectOnRollback)) {
25 this.doNotRejectOnRollback = true;
26 } else {
27 this.doNotRejectOnRollback = config.doNotRejectOnRollback;
28 }
29
30 this.initPromise = new Promise((resolve, reject) => {
31 this.initRejectFn = reject;
32 container = (transactor) => {
33 resolve(transactor);
34 };
35 });
36 } else {
37 // Default behaviour for old style of transactions is to reject on rollback
38 if (!config || isUndefined(config.doNotRejectOnRollback)) {
39 this.doNotRejectOnRollback = false;
40 } else {
41 this.doNotRejectOnRollback = config.doNotRejectOnRollback;
42 }
43 }
44
45 this.client = client;
46 this.logger = client.logger;
47 this.outerTx = outerTx;
48 this.trxClient = undefined;
49 this._debug = client.config && client.config.debug;
50
51 debug(
52 '%s: Starting %s transaction',
53 txid,
54 outerTx ? 'nested' : 'top level'
55 );
56
57 this._promise = this.acquireConnection(config, (connection) => {
58 const trxClient = (this.trxClient = makeTxClient(
59 this,
60 client,
61 connection
62 ));
63 const init = client.transacting
64 ? this.savepoint(connection)
65 : this.begin(connection);
66 const executionPromise = new Bluebird((resolver, rejecter) => {
67 this._resolver = resolver;
68 this._rejecter = rejecter;
69 });
70
71 init
72 .then(() => {
73 return makeTransactor(this, connection, trxClient);
74 })
75 .then((transactor) => {
76 if (this.initPromise) {
77 transactor.executionPromise = executionPromise.catch((err) => {
78 throw err;
79 });
80 } else {
81 transactor.executionPromise = executionPromise;
82 }
83
84 // If we've returned a "thenable" from the transaction container, assume
85 // the rollback and commit are chained to this object's success / failure.
86 // Directly thrown errors are treated as automatic rollbacks.
87 let result;
88 try {
89 result = container(transactor);
90 } catch (err) {
91 result = Bluebird.reject(err);
92 }
93 if (result && result.then && typeof result.then === 'function') {
94 result
95 .then((val) => {
96 return transactor.commit(val);
97 })
98 .catch((err) => {
99 return transactor.rollback(err);
100 });
101 }
102 return null;
103 })
104 .catch((e) => {
105 return this._rejecter(e);
106 });
107
108 return executionPromise;
109 }).catch((err) => {
110 if (this.initRejectFn) {
111 this.initRejectFn(err);
112 } else {
113 throw err;
114 }
115 });
116
117 this._completed = false;
118
119 // If there's a wrapping transaction, we need to wait for any older sibling
120 // transactions to settle (commit or rollback) before we can start, and we
121 // need to register ourselves with the parent transaction so any younger
122 // siblings can wait for us to complete before they can start.
123 this._previousSibling = Bluebird.resolve(true);
124 if (outerTx) {
125 if (outerTx._lastChild) this._previousSibling = outerTx._lastChild;
126 outerTx._lastChild = this._promise;
127 }
128 }
129
130 isCompleted() {
131 return (
132 this._completed || (this.outerTx && this.outerTx.isCompleted()) || false
133 );
134 }
135
136 begin(conn) {
137 return this.query(conn, 'BEGIN;');
138 }
139
140 savepoint(conn) {
141 return this.query(conn, `SAVEPOINT ${this.txid};`);
142 }
143
144 commit(conn, value) {
145 return this.query(conn, 'COMMIT;', 1, value);
146 }
147
148 release(conn, value) {
149 return this.query(conn, `RELEASE SAVEPOINT ${this.txid};`, 1, value);
150 }
151
152 rollback(conn, error) {
153 return this.query(conn, 'ROLLBACK', 2, error)
154 .timeout(5000)
155 .catch(Bluebird.TimeoutError, () => {
156 this._rejecter(error);
157 });
158 }
159
160 rollbackTo(conn, error) {
161 return this.query(conn, `ROLLBACK TO SAVEPOINT ${this.txid}`, 2, error)
162 .timeout(5000)
163 .catch(Bluebird.TimeoutError, () => {
164 this._rejecter(error);
165 });
166 }
167
168 query(conn, sql, status, value) {
169 const q = this.trxClient
170 .query(conn, sql)
171 .catch((err) => {
172 status = 2;
173 value = err;
174 this._completed = true;
175 debug('%s error running transaction query', this.txid);
176 })
177 .then((res) => {
178 if (status === 1) {
179 this._resolver(value);
180 }
181 if (status === 2) {
182 if (isUndefined(value)) {
183 if (this.doNotRejectOnRollback && /^ROLLBACK\b/i.test(sql)) {
184 this._resolver();
185 return;
186 }
187
188 value = new Error(`Transaction rejected with non-error: ${value}`);
189 }
190 this._rejecter(value);
191 }
192 return res;
193 });
194 if (status === 1 || status === 2) {
195 this._completed = true;
196 }
197 return q;
198 }
199
200 debug(enabled) {
201 this._debug = arguments.length ? enabled : true;
202 return this;
203 }
204
205 // Acquire a connection and create a disposer - either using the one passed
206 // via config or getting one off the client. The disposer will be called once
207 // the original promise is marked completed.
208 acquireConnection(config, cb) {
209 const configConnection = config && config.connection;
210 return new Bluebird((resolve, reject) => {
211 try {
212 resolve(configConnection || this.client.acquireConnection());
213 } catch (e) {
214 reject(e);
215 }
216 })
217 .then((connection) => {
218 connection.__knexTxId = this.txid;
219
220 return (this._previousSibling
221 ? this._previousSibling.catch(() => {})
222 : Promise.resolve()
223 ).then(function() {
224 return connection;
225 });
226 })
227 .then(async (connection) => {
228 try {
229 return await cb(connection);
230 } finally {
231 if (!configConnection) {
232 debug('%s: releasing connection', this.txid);
233 this.client.releaseConnection(connection);
234 } else {
235 debug('%s: not releasing external connection', this.txid);
236 }
237 }
238 });
239 }
240}
241
242// The transactor is a full featured knex object, with a "commit", a "rollback"
243// and a "savepoint" function. The "savepoint" is just sugar for creating a new
244// transaction. If the rollback is run inside a savepoint, it rolls back to the
245// last savepoint - otherwise it rolls back the transaction.
246function makeTransactor(trx, connection, trxClient) {
247 const transactor = makeKnex(trxClient);
248
249 transactor.withUserParams = () => {
250 throw new Error(
251 'Cannot set user params on a transaction - it can only inherit params from main knex instance'
252 );
253 };
254
255 transactor.isTransaction = true;
256 transactor.userParams = trx.userParams || {};
257
258 transactor.transaction = function(container, options) {
259 if (!options) {
260 options = { doNotRejectOnRollback: true };
261 } else if (isUndefined(options.doNotRejectOnRollback)) {
262 options.doNotRejectOnRollback = true;
263 }
264
265 if (container) {
266 return trxClient.transaction(container, options, trx);
267 } else {
268 return new Promise((resolve, _reject) => {
269 trxClient.transaction(
270 (nestedTrx) => {
271 resolve(nestedTrx);
272 },
273 options,
274 trx
275 );
276 });
277 }
278 };
279 transactor.savepoint = function(container, options) {
280 return transactor.transaction(container, options);
281 };
282
283 if (trx.client.transacting) {
284 transactor.commit = (value) => trx.release(connection, value);
285 transactor.rollback = (error) => trx.rollbackTo(connection, error);
286 } else {
287 transactor.commit = (value) => trx.commit(connection, value);
288 transactor.rollback = (error) => trx.rollback(connection, error);
289 }
290
291 transactor.isCompleted = () => trx.isCompleted();
292
293 return transactor;
294}
295
296// We need to make a client object which always acquires the same
297// connection and does not release back into the pool.
298function makeTxClient(trx, client, connection) {
299 const trxClient = Object.create(client.constructor.prototype);
300 trxClient.version = client.version;
301 trxClient.config = client.config;
302 trxClient.driver = client.driver;
303 trxClient.connectionSettings = client.connectionSettings;
304 trxClient.transacting = true;
305 trxClient.valueForUndefined = client.valueForUndefined;
306 trxClient.logger = client.logger;
307
308 trxClient.on('query', function(arg) {
309 trx.emit('query', arg);
310 client.emit('query', arg);
311 });
312
313 trxClient.on('query-error', function(err, obj) {
314 trx.emit('query-error', err, obj);
315 client.emit('query-error', err, obj);
316 });
317
318 trxClient.on('query-response', function(response, obj, builder) {
319 trx.emit('query-response', response, obj, builder);
320 client.emit('query-response', response, obj, builder);
321 });
322
323 const _query = trxClient.query;
324 trxClient.query = function(conn, obj) {
325 const completed = trx.isCompleted();
326 return new Bluebird(function(resolve, reject) {
327 try {
328 if (conn !== connection)
329 throw new Error('Invalid connection for transaction query.');
330 if (completed) completedError(trx, obj);
331 resolve(_query.call(trxClient, conn, obj));
332 } catch (e) {
333 reject(e);
334 }
335 });
336 };
337 const _stream = trxClient.stream;
338 trxClient.stream = function(conn, obj, stream, options) {
339 const completed = trx.isCompleted();
340 return new Bluebird(function(resolve, reject) {
341 try {
342 if (conn !== connection)
343 throw new Error('Invalid connection for transaction query.');
344 if (completed) completedError(trx, obj);
345 resolve(_stream.call(trxClient, conn, obj, stream, options));
346 } catch (e) {
347 reject(e);
348 }
349 });
350 };
351 trxClient.acquireConnection = function() {
352 return Bluebird.resolve(connection);
353 };
354 trxClient.releaseConnection = function() {
355 return Bluebird.resolve();
356 };
357
358 return trxClient;
359}
360
361function completedError(trx, obj) {
362 const sql = typeof obj === 'string' ? obj : obj && obj.sql;
363 debug('%s: Transaction completed: %s', trx.txid, sql);
364 throw new Error(
365 'Transaction query already complete, run with DEBUG=knex:tx for more info'
366 );
367}
368
369const promiseInterface = [
370 'then',
371 'bind',
372 'catch',
373 'finally',
374 'asCallback',
375 'spread',
376 'map',
377 'reduce',
378 'thenReturn',
379 'return',
380 'yield',
381 'ensure',
382 'exec',
383 'reflect',
384 'get',
385 'mapSeries',
386 'delay',
387];
388
389// Creates methods which proxy promise interface methods to
390// internal transaction resolution promise
391promiseInterface.forEach(function(method) {
392 Transaction.prototype[method] = function() {
393 return this._promise[method].apply(this._promise, arguments);
394 };
395});
396
397module.exports = Transaction;