UNPKG

6.08 kBJavaScriptView Raw
1"use strict";
2
3var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault");
4
5Object.defineProperty(exports, "__esModule", {
6 value: true
7});
8exports["default"] = void 0;
9
10var _classCallCheck2 = _interopRequireDefault(require("@babel/runtime/helpers/classCallCheck"));
11
12var _createClass2 = _interopRequireDefault(require("@babel/runtime/helpers/createClass"));
13
14var _neo4jDriverCore = require("neo4j-driver-core");
15
16var _rxjs = require("rxjs");
17
18var _operators = require("rxjs/operators");
19
20/**
21 * Copyright (c) "Neo4j"
22 * Neo4j Sweden AB [http://neo4j.com]
23 *
24 * This file is part of Neo4j.
25 *
26 * Licensed under the Apache License, Version 2.0 (the "License");
27 * you may not use this file except in compliance with the License.
28 * You may obtain a copy of the License at
29 *
30 * http://www.apache.org/licenses/LICENSE-2.0
31 *
32 * Unless required by applicable law or agreed to in writing, software
33 * distributed under the License is distributed on an "AS IS" BASIS,
34 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
35 * See the License for the specific language governing permissions and
36 * limitations under the License.
37 */
38var States = {
39 READY: 0,
40 STREAMING: 1,
41 COMPLETED: 2
42};
43/**
44 * The reactive result interface.
45 */
46
47var RxResult = /*#__PURE__*/function () {
48 /**
49 * @constructor
50 * @protected
51 * @param {Observable<Result>} result - An observable of single Result instance to relay requests.
52 */
53 function RxResult(result) {
54 (0, _classCallCheck2["default"])(this, RxResult);
55 var replayedResult = result.pipe((0, _operators.publishReplay)(1), (0, _operators.refCount)());
56 this._result = replayedResult;
57 this._keys = replayedResult.pipe((0, _operators.flatMap)(function (r) {
58 return (0, _rxjs.from)(r.keys());
59 }), (0, _operators.publishReplay)(1), (0, _operators.refCount)());
60 this._records = new _rxjs.Subject();
61 this._summary = new _rxjs.ReplaySubject();
62 this._state = States.READY;
63 }
64 /**
65 * Returns an observable that exposes a single item containing field names
66 * returned by the executing query.
67 *
68 * Errors raised by actual query execution can surface on the returned
69 * observable stream.
70 *
71 * @public
72 * @returns {Observable<string[]>} - An observable stream (with exactly one element) of field names.
73 */
74
75
76 (0, _createClass2["default"])(RxResult, [{
77 key: "keys",
78 value: function keys() {
79 return this._keys;
80 }
81 /**
82 * Returns an observable that exposes each record returned by the executing query.
83 *
84 * Errors raised during the streaming phase can surface on the returned observable stream.
85 *
86 * @public
87 * @returns {Observable<Record>} - An observable stream of records.
88 */
89
90 }, {
91 key: "records",
92 value: function records() {
93 var _this = this;
94
95 return this._result.pipe((0, _operators.flatMap)(function (result) {
96 return new _rxjs.Observable(function (recordsObserver) {
97 return _this._startStreaming({
98 result: result,
99 recordsObserver: recordsObserver
100 });
101 });
102 }));
103 }
104 /**
105 * Returns an observable that exposes a single item of {@link ResultSummary} that is generated by
106 * the server after the streaming of the executing query is completed.
107 *
108 * *Subscribing to this stream before subscribing to records() stream causes the results to be discarded on the server.*
109 *
110 * @public
111 * @returns {Observable<ResultSummary>} - An observable stream (with exactly one element) of result summary.
112 */
113
114 }, {
115 key: "consume",
116 value: function consume() {
117 var _this2 = this;
118
119 return this._result.pipe((0, _operators.flatMap)(function (result) {
120 return new _rxjs.Observable(function (summaryObserver) {
121 return _this2._startStreaming({
122 result: result,
123 summaryObserver: summaryObserver
124 });
125 });
126 }));
127 }
128 }, {
129 key: "_startStreaming",
130 value: function _startStreaming() {
131 var _this3 = this;
132
133 var _ref = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : {},
134 result = _ref.result,
135 _ref$recordsObserver = _ref.recordsObserver,
136 recordsObserver = _ref$recordsObserver === void 0 ? null : _ref$recordsObserver,
137 _ref$summaryObserver = _ref.summaryObserver,
138 summaryObserver = _ref$summaryObserver === void 0 ? null : _ref$summaryObserver;
139
140 var subscriptions = [];
141
142 if (summaryObserver) {
143 subscriptions.push(this._summary.subscribe(summaryObserver));
144 }
145
146 if (this._state < States.STREAMING) {
147 this._state = States.STREAMING;
148
149 if (recordsObserver) {
150 subscriptions.push(this._records.subscribe(recordsObserver));
151 }
152
153 subscriptions.push({
154 unsubscribe: function unsubscribe() {
155 if (result._cancel) {
156 result._cancel();
157 }
158 }
159 });
160
161 if (this._records.observers.length === 0) {
162 result._cancel();
163 }
164
165 result.subscribe({
166 onNext: function onNext(record) {
167 _this3._records.next(record);
168 },
169 onCompleted: function onCompleted(summary) {
170 _this3._records.complete();
171
172 _this3._summary.next(summary);
173
174 _this3._summary.complete();
175
176 _this3._state = States.COMPLETED;
177 },
178 onError: function onError(err) {
179 _this3._records.error(err);
180
181 _this3._summary.error(err);
182
183 _this3._state = States.COMPLETED;
184 }
185 });
186 } else if (recordsObserver) {
187 recordsObserver.error((0, _neo4jDriverCore.newError)('Streaming has already started/consumed with a previous records or summary subscription.'));
188 }
189
190 return function () {
191 subscriptions.forEach(function (s) {
192 return s.unsubscribe();
193 });
194 };
195 }
196 }]);
197 return RxResult;
198}();
199
200exports["default"] = RxResult;
\No newline at end of file