UNPKG

6.21 kBJavaScriptView Raw
1"use strict";
2
3var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault");
4
5Object.defineProperty(exports, "__esModule", {
6 value: true
7});
8exports["default"] = void 0;
9
10var _classCallCheck2 = _interopRequireDefault(require("@babel/runtime/helpers/classCallCheck"));
11
12var _createClass2 = _interopRequireDefault(require("@babel/runtime/helpers/createClass"));
13
14var _error = require("./error");
15
16var _resultSummary = _interopRequireDefault(require("./result-summary"));
17
18var _rxjs = require("rxjs");
19
20var _operators = require("rxjs/operators");
21
22var _record = _interopRequireDefault(require("./record"));
23
24/**
25 * Copyright (c) 2002-2019 "Neo4j,"
26 * Neo4j Sweden AB [http://neo4j.com]
27 *
28 * This file is part of Neo4j.
29 *
30 * Licensed under the Apache License, Version 2.0 (the "License");
31 * you may not use this file except in compliance with the License.
32 * You may obtain a copy of the License at
33 *
34 * http://www.apache.org/licenses/LICENSE-2.0
35 *
36 * Unless required by applicable law or agreed to in writing, software
37 * distributed under the License is distributed on an "AS IS" BASIS,
38 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
39 * See the License for the specific language governing permissions and
40 * limitations under the License.
41 */
42var States = {
43 READY: 0,
44 STREAMING: 1,
45 COMPLETED: 2
46 /**
47 * The reactive result interface.
48 */
49
50};
51
52var RxResult =
53/*#__PURE__*/
54function () {
55 /**
56 * @constructor
57 * @protected
58 * @param {Observable<Result>} result - An observable of single Result instance to relay requests.
59 */
60 function RxResult(result) {
61 (0, _classCallCheck2["default"])(this, RxResult);
62 var replayedResult = result.pipe((0, _operators.publishReplay)(1), (0, _operators.refCount)());
63 this._result = replayedResult;
64 this._keys = replayedResult.pipe((0, _operators.flatMap)(function (r) {
65 return (0, _rxjs.from)(r.keys());
66 }), (0, _operators.publishReplay)(1), (0, _operators.refCount)());
67 this._records = new _rxjs.Subject();
68 this._summary = new _rxjs.ReplaySubject();
69 this._state = States.READY;
70 }
71 /**
72 * Returns an observable that exposes a single item containing field names
73 * returned by the executing query.
74 *
75 * Errors raised by actual query execution can surface on the returned
76 * observable stream.
77 *
78 * @public
79 * @returns {Observable<string[]>} - An observable stream (with exactly one element) of field names.
80 */
81
82
83 (0, _createClass2["default"])(RxResult, [{
84 key: "keys",
85 value: function keys() {
86 return this._keys;
87 }
88 /**
89 * Returns an observable that exposes each record returned by the executing query.
90 *
91 * Errors raised during the streaming phase can surface on the returned observable stream.
92 *
93 * @public
94 * @returns {Observable<Record>} - An observable stream of records.
95 */
96
97 }, {
98 key: "records",
99 value: function records() {
100 var _this = this;
101
102 return this._result.pipe((0, _operators.flatMap)(function (result) {
103 return new _rxjs.Observable(function (recordsObserver) {
104 return _this._startStreaming({
105 result: result,
106 recordsObserver: recordsObserver
107 });
108 });
109 }));
110 }
111 /**
112 * Returns an observable that exposes a single item of {@link ResultSummary} that is generated by
113 * the server after the streaming of the executing query is completed.
114 *
115 * *Subscribing to this stream before subscribing to records() stream causes the results to be discarded on the server.*
116 *
117 * @public
118 * @returns {Observable<ResultSummary>} - An observable stream (with exactly one element) of result summary.
119 */
120
121 }, {
122 key: "consume",
123 value: function consume() {
124 var _this2 = this;
125
126 return this._result.pipe((0, _operators.flatMap)(function (result) {
127 return new _rxjs.Observable(function (summaryObserver) {
128 return _this2._startStreaming({
129 result: result,
130 summaryObserver: summaryObserver
131 });
132 });
133 }));
134 }
135 }, {
136 key: "_startStreaming",
137 value: function _startStreaming() {
138 var _this3 = this;
139
140 var _ref = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : {},
141 result = _ref.result,
142 _ref$recordsObserver = _ref.recordsObserver,
143 recordsObserver = _ref$recordsObserver === void 0 ? null : _ref$recordsObserver,
144 _ref$summaryObserver = _ref.summaryObserver,
145 summaryObserver = _ref$summaryObserver === void 0 ? null : _ref$summaryObserver;
146
147 var subscriptions = [];
148
149 if (summaryObserver) {
150 subscriptions.push(this._summary.subscribe(summaryObserver));
151 }
152
153 if (this._state < States.STREAMING) {
154 this._state = States.STREAMING;
155
156 if (recordsObserver) {
157 subscriptions.push(this._records.subscribe(recordsObserver));
158 }
159
160 subscriptions.push({
161 unsubscribe: function unsubscribe() {
162 if (result._cancel) {
163 result._cancel();
164 }
165 }
166 });
167
168 if (this._records.observers.length === 0) {
169 result._cancel();
170 }
171
172 result.subscribe({
173 onNext: function onNext(record) {
174 _this3._records.next(record);
175 },
176 onCompleted: function onCompleted(summary) {
177 _this3._records.complete();
178
179 _this3._summary.next(summary);
180
181 _this3._summary.complete();
182
183 _this3._state = States.COMPLETED;
184 },
185 onError: function onError(err) {
186 _this3._records.error(err);
187
188 _this3._summary.error(err);
189
190 _this3._state = States.COMPLETED;
191 }
192 });
193 } else if (recordsObserver) {
194 recordsObserver.error((0, _error.newError)('Streaming has already started/consumed with a previous records or summary subscription.'));
195 }
196
197 return function () {
198 subscriptions.forEach(function (s) {
199 return s.unsubscribe();
200 });
201 };
202 }
203 }]);
204 return RxResult;
205}();
206
207exports["default"] = RxResult;
\No newline at end of file