UNPKG

11.5 kBJavaScriptView Raw
1// Transaction
2// -------
3const Bluebird = require('bluebird');
4const { EventEmitter } = require('events');
5const Debug = require('debug');
6
7const makeKnex = require('./util/make-knex');
8
9const debug = Debug('knex:tx');
10
11const { uniqueId, isUndefined } = require('lodash');
12
13// Acts as a facade for a Promise, keeping the internal state
14// and managing any child transactions.
15class Transaction extends EventEmitter {
16 constructor(client, container, config, outerTx) {
17 super();
18
19 const txid = (this.txid = uniqueId('trx'));
20
21 // If there is no container provided, assume user wants to get instance of transaction and use it directly
22 if (!container) {
23 // Default behaviour for new style of transactions is not to reject on rollback
24 if (!config || isUndefined(config.doNotRejectOnRollback)) {
25 this.doNotRejectOnRollback = true;
26 } else {
27 this.doNotRejectOnRollback = config.doNotRejectOnRollback;
28 }
29
30 this.initPromise = new Promise((resolve, reject) => {
31 this.initRejectFn = reject;
32 container = (transactor) => {
33 resolve(transactor);
34 };
35 });
36 } else {
37 // Default behaviour for old style of transactions is to reject on rollback
38 if (!config || isUndefined(config.doNotRejectOnRollback)) {
39 this.doNotRejectOnRollback = false;
40 } else {
41 this.doNotRejectOnRollback = config.doNotRejectOnRollback;
42 }
43 }
44
45 this.client = client;
46 this.logger = client.logger;
47 this.outerTx = outerTx;
48 this.trxClient = undefined;
49 this._debug = client.config && client.config.debug;
50
51 debug(
52 '%s: Starting %s transaction',
53 txid,
54 outerTx ? 'nested' : 'top level'
55 );
56
57 this._promise = Bluebird.using(
58 this.acquireConnection(client, config, txid),
59 (connection) => {
60 const trxClient = (this.trxClient = makeTxClient(
61 this,
62 client,
63 connection
64 ));
65 const init = client.transacting
66 ? this.savepoint(connection)
67 : this.begin(connection);
68 const executionPromise = new Bluebird((resolver, rejecter) => {
69 this._resolver = resolver;
70 this._rejecter = rejecter;
71 });
72
73 init
74 .then(() => {
75 return makeTransactor(this, connection, trxClient);
76 })
77 .then((transactor) => {
78 if (this.initPromise) {
79 transactor.executionPromise = executionPromise.catch((err) => {
80 throw err;
81 });
82 } else {
83 transactor.executionPromise = executionPromise;
84 }
85
86 // If we've returned a "thenable" from the transaction container, assume
87 // the rollback and commit are chained to this object's success / failure.
88 // Directly thrown errors are treated as automatic rollbacks.
89 let result;
90 try {
91 result = container(transactor);
92 } catch (err) {
93 result = Bluebird.reject(err);
94 }
95 if (result && result.then && typeof result.then === 'function') {
96 result
97 .then((val) => {
98 return transactor.commit(val);
99 })
100 .catch((err) => {
101 return transactor.rollback(err);
102 });
103 }
104 return null;
105 })
106 .catch((e) => {
107 return this._rejecter(e);
108 });
109
110 return executionPromise;
111 }
112 ).catch((err) => {
113 if (this.initRejectFn) {
114 this.initRejectFn(err);
115 } else {
116 throw err;
117 }
118 });
119
120 this._completed = false;
121
122 // If there's a wrapping transaction, we need to wait for any older sibling
123 // transactions to settle (commit or rollback) before we can start, and we
124 // need to register ourselves with the parent transaction so any younger
125 // siblings can wait for us to complete before they can start.
126 this._previousSibling = Bluebird.resolve(true);
127 if (outerTx) {
128 if (outerTx._lastChild) this._previousSibling = outerTx._lastChild;
129 outerTx._lastChild = this._promise;
130 }
131 }
132
133 isCompleted() {
134 return (
135 this._completed || (this.outerTx && this.outerTx.isCompleted()) || false
136 );
137 }
138
139 begin(conn) {
140 return this.query(conn, 'BEGIN;');
141 }
142
143 savepoint(conn) {
144 return this.query(conn, `SAVEPOINT ${this.txid};`);
145 }
146
147 commit(conn, value) {
148 return this.query(conn, 'COMMIT;', 1, value);
149 }
150
151 release(conn, value) {
152 return this.query(conn, `RELEASE SAVEPOINT ${this.txid};`, 1, value);
153 }
154
155 rollback(conn, error) {
156 return this.query(conn, 'ROLLBACK', 2, error)
157 .timeout(5000)
158 .catch(Bluebird.TimeoutError, () => {
159 this._rejecter(error);
160 });
161 }
162
163 rollbackTo(conn, error) {
164 return this.query(conn, `ROLLBACK TO SAVEPOINT ${this.txid}`, 2, error)
165 .timeout(5000)
166 .catch(Bluebird.TimeoutError, () => {
167 this._rejecter(error);
168 });
169 }
170
171 query(conn, sql, status, value) {
172 const q = this.trxClient
173 .query(conn, sql)
174 .catch((err) => {
175 status = 2;
176 value = err;
177 this._completed = true;
178 debug('%s error running transaction query', this.txid);
179 })
180 .then((res) => {
181 if (status === 1) {
182 this._resolver(value);
183 }
184 if (status === 2) {
185 if (isUndefined(value)) {
186 if (this.doNotRejectOnRollback && /^ROLLBACK\b/i.test(sql)) {
187 this._resolver();
188 return;
189 }
190
191 value = new Error(`Transaction rejected with non-error: ${value}`);
192 }
193 this._rejecter(value);
194 }
195 return res;
196 });
197 if (status === 1 || status === 2) {
198 this._completed = true;
199 }
200 return q;
201 }
202
203 debug(enabled) {
204 this._debug = arguments.length ? enabled : true;
205 return this;
206 }
207
208 // Acquire a connection and create a disposer - either using the one passed
209 // via config or getting one off the client. The disposer will be called once
210 // the original promise is marked completed.
211 acquireConnection(client, config, txid) {
212 const configConnection = config && config.connection;
213 return new Bluebird((resolve, reject) => {
214 try {
215 resolve(configConnection || client.acquireConnection());
216 } catch (e) {
217 reject(e);
218 }
219 })
220 .then(function(connection) {
221 connection.__knexTxId = txid;
222
223 return connection;
224 })
225 .disposer(function(connection) {
226 if (!configConnection) {
227 debug('%s: releasing connection', txid);
228 client.releaseConnection(connection);
229 } else {
230 debug('%s: not releasing external connection', txid);
231 }
232 });
233 }
234}
235
236// The transactor is a full featured knex object, with a "commit", a "rollback"
237// and a "savepoint" function. The "savepoint" is just sugar for creating a new
238// transaction. If the rollback is run inside a savepoint, it rolls back to the
239// last savepoint - otherwise it rolls back the transaction.
240function makeTransactor(trx, connection, trxClient) {
241 const transactor = makeKnex(trxClient);
242
243 transactor.withUserParams = () => {
244 throw new Error(
245 'Cannot set user params on a transaction - it can only inherit params from main knex instance'
246 );
247 };
248
249 transactor.isTransaction = true;
250 transactor.userParams = trx.userParams || {};
251
252 transactor.transaction = function(container, options) {
253 if (!options) {
254 options = { doNotRejectOnRollback: true };
255 } else if (isUndefined(options.doNotRejectOnRollback)) {
256 options.doNotRejectOnRollback = true;
257 }
258
259 if (container) {
260 return trxClient.transaction(container, options, trx);
261 } else {
262 return new Promise((resolve, _reject) => {
263 trxClient.transaction(
264 (nestedTrx) => {
265 resolve(nestedTrx);
266 },
267 options,
268 trx
269 );
270 });
271 }
272 };
273 transactor.savepoint = function(container, options) {
274 return transactor.transaction(container, options);
275 };
276
277 if (trx.client.transacting) {
278 transactor.commit = (value) => trx.release(connection, value);
279 transactor.rollback = (error) => trx.rollbackTo(connection, error);
280 } else {
281 transactor.commit = (value) => trx.commit(connection, value);
282 transactor.rollback = (error) => trx.rollback(connection, error);
283 }
284
285 transactor.isCompleted = () => trx.isCompleted();
286
287 return transactor;
288}
289
290// We need to make a client object which always acquires the same
291// connection and does not release back into the pool.
292function makeTxClient(trx, client, connection) {
293 const trxClient = Object.create(client.constructor.prototype);
294 trxClient.version = client.version;
295 trxClient.config = client.config;
296 trxClient.driver = client.driver;
297 trxClient.connectionSettings = client.connectionSettings;
298 trxClient.transacting = true;
299 trxClient.valueForUndefined = client.valueForUndefined;
300 trxClient.logger = client.logger;
301
302 trxClient.on('query', function(arg) {
303 trx.emit('query', arg);
304 client.emit('query', arg);
305 });
306
307 trxClient.on('query-error', function(err, obj) {
308 trx.emit('query-error', err, obj);
309 client.emit('query-error', err, obj);
310 });
311
312 trxClient.on('query-response', function(response, obj, builder) {
313 trx.emit('query-response', response, obj, builder);
314 client.emit('query-response', response, obj, builder);
315 });
316
317 const _query = trxClient.query;
318 trxClient.query = function(conn, obj) {
319 const completed = trx.isCompleted();
320 return new Bluebird(function(resolve, reject) {
321 try {
322 if (conn !== connection)
323 throw new Error('Invalid connection for transaction query.');
324 if (completed) completedError(trx, obj);
325 resolve(_query.call(trxClient, conn, obj));
326 } catch (e) {
327 reject(e);
328 }
329 });
330 };
331 const _stream = trxClient.stream;
332 trxClient.stream = function(conn, obj, stream, options) {
333 const completed = trx.isCompleted();
334 return new Bluebird(function(resolve, reject) {
335 try {
336 if (conn !== connection)
337 throw new Error('Invalid connection for transaction query.');
338 if (completed) completedError(trx, obj);
339 resolve(_stream.call(trxClient, conn, obj, stream, options));
340 } catch (e) {
341 reject(e);
342 }
343 });
344 };
345 trxClient.acquireConnection = function() {
346 return Bluebird.resolve(connection);
347 };
348 trxClient.releaseConnection = function() {
349 return Bluebird.resolve();
350 };
351
352 return trxClient;
353}
354
355function completedError(trx, obj) {
356 const sql = typeof obj === 'string' ? obj : obj && obj.sql;
357 debug('%s: Transaction completed: %s', trx.txid, sql);
358 throw new Error(
359 'Transaction query already complete, run with DEBUG=knex:tx for more info'
360 );
361}
362
363const promiseInterface = [
364 'then',
365 'bind',
366 'catch',
367 'finally',
368 'asCallback',
369 'spread',
370 'map',
371 'reduce',
372 'thenReturn',
373 'return',
374 'yield',
375 'ensure',
376 'exec',
377 'reflect',
378 'get',
379 'mapSeries',
380 'delay',
381];
382
383// Creates methods which proxy promise interface methods to
384// internal transaction resolution promise
385promiseInterface.forEach(function(method) {
386 Transaction.prototype[method] = function() {
387 return this._promise[method].apply(this._promise, arguments);
388 };
389});
390
391module.exports = Transaction;