UNPKG

15.7 kBJavaScriptView Raw
1"use strict";
2
3var _interopRequireWildcard = require("@babel/runtime/helpers/interopRequireWildcard");
4
5var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault");
6
7Object.defineProperty(exports, "__esModule", {
8 value: true
9});
10exports["default"] = void 0;
11
12var _classCallCheck2 = _interopRequireDefault(require("@babel/runtime/helpers/classCallCheck"));
13
14var _createClass2 = _interopRequireDefault(require("@babel/runtime/helpers/createClass"));
15
16var _result = _interopRequireDefault(require("./result"));
17
18var _util = require("./internal/util");
19
20var _connectionHolder = _interopRequireWildcard(require("./internal/connection-holder"));
21
22var _bookmark = _interopRequireDefault(require("./internal/bookmark"));
23
24var _txConfig = _interopRequireDefault(require("./internal/tx-config"));
25
26var _streamObservers = require("./internal/stream-observers");
27
28var _error = require("./error");
29
30/**
31 * Copyright (c) 2002-2019 "Neo4j,"
32 * Neo4j Sweden AB [http://neo4j.com]
33 *
34 * This file is part of Neo4j.
35 *
36 * Licensed under the Apache License, Version 2.0 (the "License");
37 * you may not use this file except in compliance with the License.
38 * You may obtain a copy of the License at
39 *
40 * http://www.apache.org/licenses/LICENSE-2.0
41 *
42 * Unless required by applicable law or agreed to in writing, software
43 * distributed under the License is distributed on an "AS IS" BASIS,
44 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
45 * See the License for the specific language governing permissions and
46 * limitations under the License.
47 */
48
49/**
50 * Represents a transaction in the Neo4j database.
51 *
52 * @access public
53 */
54var Transaction =
55/*#__PURE__*/
56function () {
57 /**
58 * @constructor
59 * @param {ConnectionHolder} connectionHolder - the connection holder to get connection from.
60 * @param {function()} onClose - Function to be called when transaction is committed or rolled back.
61 * @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced.
62 * @param {boolean} reactive whether this transaction generates reactive streams
63 * @param {number} fetchSize - the record fetch size in each pulling batch.
64 */
65 function Transaction(_ref) {
66 var connectionHolder = _ref.connectionHolder,
67 onClose = _ref.onClose,
68 onBookmark = _ref.onBookmark,
69 reactive = _ref.reactive,
70 fetchSize = _ref.fetchSize;
71 (0, _classCallCheck2["default"])(this, Transaction);
72 this._connectionHolder = connectionHolder;
73 this._reactive = reactive;
74 this._state = _states.ACTIVE;
75 this._onClose = onClose;
76 this._onBookmark = onBookmark;
77 this._onError = this._onErrorCallback.bind(this);
78 this._onComplete = this._onCompleteCallback.bind(this);
79 this._fetchSize = fetchSize;
80 this._results = [];
81 }
82
83 (0, _createClass2["default"])(Transaction, [{
84 key: "_begin",
85 value: function _begin(bookmark, txConfig) {
86 var _this = this;
87
88 this._connectionHolder.getConnection().then(function (conn) {
89 return conn.protocol().beginTransaction({
90 bookmark: bookmark,
91 txConfig: txConfig,
92 mode: _this._connectionHolder.mode(),
93 database: _this._connectionHolder.database(),
94 beforeError: _this._onError,
95 afterComplete: _this._onComplete
96 });
97 })["catch"](function (error) {
98 return _this._onError(error);
99 });
100 }
101 /**
102 * Run Cypher query
103 * Could be called with a query object i.e.: `{text: "MATCH ...", parameters: {param: 1}}`
104 * or with the query and parameters as separate arguments.
105 * @param {mixed} query - Cypher query to execute
106 * @param {Object} parameters - Map with parameters to use in query
107 * @return {Result} New Result
108 */
109
110 }, {
111 key: "run",
112 value: function run(query, parameters) {
113 var _validateQueryAndPara = (0, _util.validateQueryAndParameters)(query, parameters),
114 validatedQuery = _validateQueryAndPara.validatedQuery,
115 params = _validateQueryAndPara.params;
116
117 var result = this._state.run(validatedQuery, params, {
118 connectionHolder: this._connectionHolder,
119 onError: this._onError,
120 onComplete: this._onComplete,
121 reactive: this._reactive,
122 fetchSize: this._fetchSize
123 });
124
125 this._results.push(result);
126
127 return result;
128 }
129 /**
130 * Commits the transaction and returns the result.
131 *
132 * After committing the transaction can no longer be used.
133 *
134 * @returns {Promise<void>} An empty promise if committed successfully or error if any error happened during commit.
135 */
136
137 }, {
138 key: "commit",
139 value: function commit() {
140 var committed = this._state.commit({
141 connectionHolder: this._connectionHolder,
142 onError: this._onError,
143 onComplete: this._onComplete,
144 pendingResults: this._results
145 });
146
147 this._state = committed.state; // clean up
148
149 this._onClose();
150
151 return new Promise(function (resolve, reject) {
152 committed.result.subscribe({
153 onCompleted: function onCompleted() {
154 return resolve();
155 },
156 onError: function onError(error) {
157 return reject(error);
158 }
159 });
160 });
161 }
162 /**
163 * Rollbacks the transaction.
164 *
165 * After rolling back, the transaction can no longer be used.
166 *
167 * @returns {Promise<void>} An empty promise if rolled back successfully or error if any error happened during
168 * rollback.
169 */
170
171 }, {
172 key: "rollback",
173 value: function rollback() {
174 var rolledback = this._state.rollback({
175 connectionHolder: this._connectionHolder,
176 onError: this._onError,
177 onComplete: this._onComplete,
178 pendingResults: this._results
179 });
180
181 this._state = rolledback.state; // clean up
182
183 this._onClose();
184
185 return new Promise(function (resolve, reject) {
186 rolledback.result.subscribe({
187 onCompleted: function onCompleted() {
188 return resolve();
189 },
190 onError: function onError(error) {
191 return reject(error);
192 }
193 });
194 });
195 }
196 /**
197 * Check if this transaction is active, which means commit and rollback did not happen.
198 * @return {boolean} `true` when not committed and not rolled back, `false` otherwise.
199 */
200
201 }, {
202 key: "isOpen",
203 value: function isOpen() {
204 return this._state === _states.ACTIVE;
205 }
206 }, {
207 key: "_onErrorCallback",
208 value: function _onErrorCallback(err) {
209 // error will be "acknowledged" by sending a RESET message
210 // database will then forget about this transaction and cleanup all corresponding resources
211 // it is thus safe to move this transaction to a FAILED state and disallow any further interactions with it
212 this._state = _states.FAILED;
213
214 this._onClose(); // release connection back to the pool
215
216
217 return this._connectionHolder.releaseConnection();
218 }
219 }, {
220 key: "_onCompleteCallback",
221 value: function _onCompleteCallback(meta) {
222 this._onBookmark(new _bookmark["default"](meta.bookmark));
223 }
224 }]);
225 return Transaction;
226}();
227
228var _states = {
229 // The transaction is running with no explicit success or failure marked
230 ACTIVE: {
231 commit: function commit(_ref2) {
232 var connectionHolder = _ref2.connectionHolder,
233 onError = _ref2.onError,
234 onComplete = _ref2.onComplete,
235 pendingResults = _ref2.pendingResults;
236 return {
237 result: finishTransaction(true, connectionHolder, onError, onComplete, pendingResults),
238 state: _states.SUCCEEDED
239 };
240 },
241 rollback: function rollback(_ref3) {
242 var connectionHolder = _ref3.connectionHolder,
243 onError = _ref3.onError,
244 onComplete = _ref3.onComplete,
245 pendingResults = _ref3.pendingResults;
246 return {
247 result: finishTransaction(false, connectionHolder, onError, onComplete, pendingResults),
248 state: _states.ROLLED_BACK
249 };
250 },
251 run: function run(query, parameters, _ref4) {
252 var connectionHolder = _ref4.connectionHolder,
253 onError = _ref4.onError,
254 onComplete = _ref4.onComplete,
255 reactive = _ref4.reactive,
256 fetchSize = _ref4.fetchSize;
257 // RUN in explicit transaction can't contain bookmarks and transaction configuration
258 // No need to include mode and database name as it shall be inclued in begin
259 var observerPromise = connectionHolder.getConnection().then(function (conn) {
260 return conn.protocol().run(query, parameters, {
261 bookmark: _bookmark["default"].empty(),
262 txConfig: _txConfig["default"].empty(),
263 beforeError: onError,
264 afterComplete: onComplete,
265 reactive: reactive,
266 fetchSize: fetchSize
267 });
268 })["catch"](function (error) {
269 return new _streamObservers.FailedObserver({
270 error: error,
271 onError: onError
272 });
273 });
274 return newCompletedResult(observerPromise, query, parameters);
275 }
276 },
277 // An error has occurred, transaction can no longer be used and no more messages will
278 // be sent for this transaction.
279 FAILED: {
280 commit: function commit(_ref5) {
281 var connectionHolder = _ref5.connectionHolder,
282 onError = _ref5.onError,
283 onComplete = _ref5.onComplete;
284 return {
285 result: newCompletedResult(new _streamObservers.FailedObserver({
286 error: (0, _error.newError)('Cannot commit this transaction, because it has been rolled back either because of an error or explicit termination.'),
287 onError: onError
288 }), 'COMMIT', {}),
289 state: _states.FAILED
290 };
291 },
292 rollback: function rollback(_ref6) {
293 var connectionHolder = _ref6.connectionHolder,
294 onError = _ref6.onError,
295 onComplete = _ref6.onComplete;
296 return {
297 result: newCompletedResult(new _streamObservers.CompletedObserver(), 'ROLLBACK', {}),
298 state: _states.FAILED
299 };
300 },
301 run: function run(query, parameters, _ref7) {
302 var connectionHolder = _ref7.connectionHolder,
303 onError = _ref7.onError,
304 onComplete = _ref7.onComplete,
305 reactive = _ref7.reactive;
306 return newCompletedResult(new _streamObservers.FailedObserver({
307 error: (0, _error.newError)('Cannot run query in this transaction, because it has been rolled back either because of an error or explicit termination.'),
308 onError: onError
309 }), query, parameters);
310 }
311 },
312 // This transaction has successfully committed
313 SUCCEEDED: {
314 commit: function commit(_ref8) {
315 var connectionHolder = _ref8.connectionHolder,
316 onError = _ref8.onError,
317 onComplete = _ref8.onComplete;
318 return {
319 result: newCompletedResult(new _streamObservers.FailedObserver({
320 error: (0, _error.newError)('Cannot commit this transaction, because it has already been committed.'),
321 onError: onError
322 }), 'COMMIT', {}),
323 state: _states.SUCCEEDED
324 };
325 },
326 rollback: function rollback(_ref9) {
327 var connectionHolder = _ref9.connectionHolder,
328 onError = _ref9.onError,
329 onComplete = _ref9.onComplete;
330 return {
331 result: newCompletedResult(new _streamObservers.FailedObserver({
332 error: (0, _error.newError)('Cannot rollback this transaction, because it has already been committed.'),
333 onError: onError
334 }), 'ROLLBACK', {}),
335 state: _states.SUCCEEDED
336 };
337 },
338 run: function run(query, parameters, _ref10) {
339 var connectionHolder = _ref10.connectionHolder,
340 onError = _ref10.onError,
341 onComplete = _ref10.onComplete,
342 reactive = _ref10.reactive;
343 return newCompletedResult(new _streamObservers.FailedObserver({
344 error: (0, _error.newError)('Cannot run query in this transaction, because it has already been committed.'),
345 onError: onError
346 }), query, parameters);
347 }
348 },
349 // This transaction has been rolled back
350 ROLLED_BACK: {
351 commit: function commit(_ref11) {
352 var connectionHolder = _ref11.connectionHolder,
353 onError = _ref11.onError,
354 onComplete = _ref11.onComplete;
355 return {
356 result: newCompletedResult(new _streamObservers.FailedObserver({
357 error: (0, _error.newError)('Cannot commit this transaction, because it has already been rolled back.'),
358 onError: onError
359 }), 'COMMIT', {}),
360 state: _states.ROLLED_BACK
361 };
362 },
363 rollback: function rollback(_ref12) {
364 var connectionHolder = _ref12.connectionHolder,
365 onError = _ref12.onError,
366 onComplete = _ref12.onComplete;
367 return {
368 result: newCompletedResult(new _streamObservers.FailedObserver({
369 error: (0, _error.newError)('Cannot rollback this transaction, because it has already been rolled back.')
370 }), 'ROLLBACK', {}),
371 state: _states.ROLLED_BACK
372 };
373 },
374 run: function run(query, parameters, _ref13) {
375 var connectionHolder = _ref13.connectionHolder,
376 onError = _ref13.onError,
377 onComplete = _ref13.onComplete,
378 reactive = _ref13.reactive;
379 return newCompletedResult(new _streamObservers.FailedObserver({
380 error: (0, _error.newError)('Cannot run query in this transaction, because it has already been rolled back.'),
381 onError: onError
382 }), query, parameters);
383 }
384 }
385 /**
386 *
387 * @param {boolean} commit
388 * @param {ConnectionHolder} connectionHolder
389 * @param {function(err:Error): any} onError
390 * @param {function(metadata:object): any} onComplete
391 * @param {list<Result>>}pendingResults all run results in this transaction
392 */
393
394};
395
396function finishTransaction(commit, connectionHolder, onError, onComplete, pendingResults) {
397 var observerPromise = connectionHolder.getConnection().then(function (connection) {
398 pendingResults.forEach(function (r) {
399 return r._cancel();
400 });
401 return Promise.all(pendingResults).then(function (results) {
402 if (commit) {
403 return connection.protocol().commitTransaction({
404 beforeError: onError,
405 afterComplete: onComplete
406 });
407 } else {
408 return connection.protocol().rollbackTransaction({
409 beforeError: onError,
410 afterComplete: onComplete
411 });
412 }
413 });
414 })["catch"](function (error) {
415 return new _streamObservers.FailedObserver({
416 error: error,
417 onError: onError
418 });
419 }); // for commit & rollback we need result that uses real connection holder and notifies it when
420 // connection is not needed and can be safely released to the pool
421
422 return new _result["default"](observerPromise, commit ? 'COMMIT' : 'ROLLBACK', {}, connectionHolder);
423}
424/**
425 * Creates a {@link Result} with empty connection holder.
426 * For cases when result represents an intermediate or failed action, does not require any metadata and does not
427 * need to influence real connection holder to release connections.
428 * @param {ResultStreamObserver} observer - an observer for the created result.
429 * @param {string} query - the cypher query that produced the result.
430 * @param {Object} parameters - the parameters for cypher query that produced the result.
431 * @return {Result} new result.
432 * @private
433 */
434
435
436function newCompletedResult(observerPromise, query, parameters) {
437 return new _result["default"](Promise.resolve(observerPromise), query, parameters, _connectionHolder.EMPTY_CONNECTION_HOLDER);
438}
439
440var _default = Transaction;
441exports["default"] = _default;
\No newline at end of file