UNPKG

7.98 kBJavaScriptView Raw
1"use strict";
2
3var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault");
4
5Object.defineProperty(exports, "__esModule", {
6 value: true
7});
8exports["default"] = void 0;
9
10var _classCallCheck2 = _interopRequireDefault(require("@babel/runtime/helpers/classCallCheck"));
11
12var _createClass2 = _interopRequireDefault(require("@babel/runtime/helpers/createClass"));
13
14var _rxjs = require("rxjs");
15
16var _operators = require("rxjs/operators");
17
18var _resultRx = _interopRequireDefault(require("./result-rx"));
19
20var _neo4jDriverCore = require("neo4j-driver-core");
21
22var _transactionRx = _interopRequireDefault(require("./transaction-rx"));
23
24var _retryLogicRx = _interopRequireDefault(require("./internal/retry-logic-rx"));
25
26/**
27 * Copyright (c) "Neo4j"
28 * Neo4j Sweden AB [http://neo4j.com]
29 *
30 * This file is part of Neo4j.
31 *
32 * Licensed under the Apache License, Version 2.0 (the "License");
33 * you may not use this file except in compliance with the License.
34 * You may obtain a copy of the License at
35 *
36 * http://www.apache.org/licenses/LICENSE-2.0
37 *
38 * Unless required by applicable law or agreed to in writing, software
39 * distributed under the License is distributed on an "AS IS" BASIS,
40 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
41 * See the License for the specific language governing permissions and
42 * limitations under the License.
43 */
44var _internal$constants = _neo4jDriverCore.internal.constants,
45 ACCESS_MODE_READ = _internal$constants.ACCESS_MODE_READ,
46 ACCESS_MODE_WRITE = _internal$constants.ACCESS_MODE_WRITE,
47 TxConfig = _neo4jDriverCore.internal.txConfig.TxConfig;
48/**
49 * A Reactive session, which provides the same functionality as {@link Session} but through a Reactive API.
50 */
51
52var RxSession = /*#__PURE__*/function () {
53 /**
54 * Constructs a reactive session with given default session instance and provided driver configuration.
55 *
56 * @protected
57 * @param {Object} param - Object parameter
58 * @param {Session} param.session - The underlying session instance to relay requests
59 */
60 function RxSession() {
61 var _ref = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : {},
62 session = _ref.session,
63 config = _ref.config;
64
65 (0, _classCallCheck2["default"])(this, RxSession);
66 this._session = session;
67 this._retryLogic = _createRetryLogic(config);
68 }
69 /**
70 * Creates a reactive result that will execute the query with the provided parameters and the provided
71 * transaction configuration that applies to the underlying auto-commit transaction.
72 *
73 * @public
74 * @param {string} query - Query to be executed.
75 * @param {Object} parameters - Parameter values to use in query execution.
76 * @param {TransactionConfig} transactionConfig - Configuration for the new auto-commit transaction.
77 * @returns {RxResult} - A reactive result
78 */
79
80
81 (0, _createClass2["default"])(RxSession, [{
82 key: "run",
83 value: function run(query, parameters, transactionConfig) {
84 var _this = this;
85
86 return new _resultRx["default"](new _rxjs.Observable(function (observer) {
87 try {
88 observer.next(_this._session.run(query, parameters, transactionConfig));
89 observer.complete();
90 } catch (err) {
91 observer.error(err);
92 }
93
94 return function () {};
95 }));
96 }
97 /**
98 * Starts a new explicit transaction with the provided transaction configuration.
99 *
100 * @public
101 * @param {TransactionConfig} transactionConfig - Configuration for the new transaction.
102 * @returns {Observable<RxTransaction>} - A reactive stream that will generate at most **one** RxTransaction instance.
103 */
104
105 }, {
106 key: "beginTransaction",
107 value: function beginTransaction(transactionConfig) {
108 return this._beginTransaction(this._session._mode, transactionConfig);
109 }
110 /**
111 * Executes the provided unit of work in a {@link READ} reactive transaction which is created with the provided
112 * transaction configuration.
113 * @public
114 * @param {function(txc: RxTransaction): Observable} work - A unit of work to be executed.
115 * @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver.
116 * @returns {Observable} - A reactive stream returned by the unit of work.
117 */
118
119 }, {
120 key: "readTransaction",
121 value: function readTransaction(work, transactionConfig) {
122 return this._runTransaction(ACCESS_MODE_READ, work, transactionConfig);
123 }
124 /**
125 * Executes the provided unit of work in a {@link WRITE} reactive transaction which is created with the provided
126 * transaction configuration.
127 * @public
128 * @param {function(txc: RxTransaction): Observable} work - A unit of work to be executed.
129 * @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver.
130 * @returns {Observable} - A reactive stream returned by the unit of work.
131 */
132
133 }, {
134 key: "writeTransaction",
135 value: function writeTransaction(work, transactionConfig) {
136 return this._runTransaction(ACCESS_MODE_WRITE, work, transactionConfig);
137 }
138 /**
139 * Closes this reactive session.
140 *
141 * @public
142 * @returns {Observable} - An empty reactive stream
143 */
144
145 }, {
146 key: "close",
147 value: function close() {
148 var _this2 = this;
149
150 return new _rxjs.Observable(function (observer) {
151 _this2._session.close().then(function () {
152 observer.complete();
153 })["catch"](function (err) {
154 return observer.error(err);
155 });
156 });
157 }
158 /**
159 * Returns the bookmark received following the last successfully completed query, which is executed
160 * either in an {@link RxTransaction} obtained from this session instance or directly through one of
161 * the {@link RxSession#run} method of this session instance.
162 *
163 * If no bookmark was received or if this transaction was rolled back, the bookmark value will not be
164 * changed.
165 *
166 * @public
167 * @returns {string}
168 */
169
170 }, {
171 key: "lastBookmark",
172 value: function lastBookmark() {
173 return this._session.lastBookmark();
174 }
175 /**
176 * @private
177 */
178
179 }, {
180 key: "_beginTransaction",
181 value: function _beginTransaction(accessMode, transactionConfig) {
182 var _this3 = this;
183
184 var txConfig = TxConfig.empty();
185
186 if (transactionConfig) {
187 txConfig = new TxConfig(transactionConfig);
188 }
189
190 return new _rxjs.Observable(function (observer) {
191 try {
192 observer.next(new _transactionRx["default"](_this3._session._beginTransaction(accessMode, txConfig)));
193 observer.complete();
194 } catch (err) {
195 observer.error(err);
196 }
197
198 return function () {};
199 });
200 }
201 /**
202 * @private
203 */
204
205 }, {
206 key: "_runTransaction",
207 value: function _runTransaction(accessMode, work, transactionConfig) {
208 var txConfig = TxConfig.empty();
209
210 if (transactionConfig) {
211 txConfig = new TxConfig(transactionConfig);
212 }
213
214 return this._retryLogic.retry(this._beginTransaction(accessMode, transactionConfig).pipe((0, _operators.flatMap)(function (txc) {
215 return (0, _rxjs.defer)(function () {
216 try {
217 return work(txc);
218 } catch (err) {
219 return (0, _rxjs.throwError)(err);
220 }
221 }).pipe((0, _operators.catchError)(function (err) {
222 return txc.rollback().pipe((0, _operators.concat)((0, _rxjs.throwError)(err)));
223 }), (0, _operators.concat)(txc.commit()));
224 })));
225 }
226 }]);
227 return RxSession;
228}();
229
230exports["default"] = RxSession;
231
232function _createRetryLogic(config) {
233 var maxRetryTimeout = config && config.maxTransactionRetryTime ? config.maxTransactionRetryTime : null;
234 return new _retryLogicRx["default"]({
235 maxRetryTimeout: maxRetryTimeout
236 });
237}
\No newline at end of file