UNPKG

7.37 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6
7var _promise = require('babel-runtime/core-js/promise');
8
9var _promise2 = _interopRequireDefault(_promise);
10
11var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck');
12
13var _classCallCheck3 = _interopRequireDefault(_classCallCheck2);
14
15var _createClass2 = require('babel-runtime/helpers/createClass');
16
17var _createClass3 = _interopRequireDefault(_createClass2);
18
19var _resultSummary = require('./result-summary');
20
21var _resultSummary2 = _interopRequireDefault(_resultSummary);
22
23var _connectionHolder = require('./internal/connection-holder');
24
25function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
26
27/**
28 * Copyright (c) 2002-2018 Neo4j Sweden AB [http://neo4j.com]
29 *
30 * This file is part of Neo4j.
31 *
32 * Licensed under the Apache License, Version 2.0 (the "License");
33 * you may not use this file except in compliance with the License.
34 * You may obtain a copy of the License at
35 *
36 * http://www.apache.org/licenses/LICENSE-2.0
37 *
38 * Unless required by applicable law or agreed to in writing, software
39 * distributed under the License is distributed on an "AS IS" BASIS,
40 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
41 * See the License for the specific language governing permissions and
42 * limitations under the License.
43 */
44
45var DEFAULT_ON_ERROR = function DEFAULT_ON_ERROR(error) {
46 console.log('Uncaught error when processing result: ' + error);
47};
48var DEFAULT_ON_COMPLETED = function DEFAULT_ON_COMPLETED(summary) {};
49
50/**
51 * A stream of {@link Record} representing the result of a statement.
52 * Can be consumed eagerly as {@link Promise} resolved with array of records and {@link ResultSummary}
53 * summary, or rejected with error that contains {@link string} code and {@link string} message.
54 * Alternatively can be consumed lazily using <code>Result.subscribe()</code> function.
55 * @access public
56 */
57
58var Result = function () {
59 /**
60 * Inject the observer to be used.
61 * @constructor
62 * @access private
63 * @param {StreamObserver} streamObserver
64 * @param {mixed} statement - Cypher statement to execute
65 * @param {Object} parameters - Map with parameters to use in statement
66 * @param metaSupplier function, when called provides metadata
67 * @param {ConnectionHolder} connectionHolder - to be notified when result is either fully consumed or error happened.
68 */
69 function Result(streamObserver, statement, parameters, metaSupplier, connectionHolder) {
70 (0, _classCallCheck3.default)(this, Result);
71
72 this._stack = captureStacktrace();
73 this._streamObserver = streamObserver;
74 this._p = null;
75 this._statement = statement;
76 this._parameters = parameters || {};
77 this._metaSupplier = metaSupplier || function () {
78 return {};
79 };
80 this._connectionHolder = connectionHolder || _connectionHolder.EMPTY_CONNECTION_HOLDER;
81 }
82
83 /**
84 * Create and return new Promise
85 * @return {Promise} new Promise.
86 * @access private
87 */
88
89
90 (0, _createClass3.default)(Result, [{
91 key: '_createPromise',
92 value: function _createPromise() {
93 if (this._p) {
94 return;
95 }
96 var self = this;
97 this._p = new _promise2.default(function (resolve, reject) {
98 var records = [];
99 var observer = {
100 onNext: function onNext(record) {
101 records.push(record);
102 },
103 onCompleted: function onCompleted(summary) {
104 resolve({ records: records, summary: summary });
105 },
106 onError: function onError(error) {
107 reject(error);
108 }
109 };
110 self.subscribe(observer);
111 });
112 }
113
114 /**
115 * Waits for all results and calls the passed in function with the results.
116 * Cannot be combined with the <code>Result.subscribe()</code> function.
117 *
118 * @param {function(result: {records:Array<Record>, summary: ResultSummary})} onFulfilled - function to be called
119 * when finished.
120 * @param {function(error: {message:string, code:string})} onRejected - function to be called upon errors.
121 * @return {Promise} promise.
122 */
123
124 }, {
125 key: 'then',
126 value: function then(onFulfilled, onRejected) {
127 this._createPromise();
128 return this._p.then(onFulfilled, onRejected);
129 }
130
131 /**
132 * Catch errors when using promises.
133 * Cannot be used with the subscribe function.
134 * @param {function(error: Neo4jError)} onRejected - Function to be called upon errors.
135 * @return {Promise} promise.
136 */
137
138 }, {
139 key: 'catch',
140 value: function _catch(onRejected) {
141 this._createPromise();
142 return this._p.catch(onRejected);
143 }
144
145 /**
146 * Stream records to observer as they come in, this is a more efficient method
147 * of handling the results, and allows you to handle arbitrarily large results.
148 *
149 * @param {Object} observer - Observer object
150 * @param {function(record: Record)} observer.onNext - handle records, one by one.
151 * @param {function(summary: ResultSummary)} observer.onCompleted - handle stream tail, the result summary.
152 * @param {function(error: {message:string, code:string})} observer.onError - handle errors.
153 * @return
154 */
155
156 }, {
157 key: 'subscribe',
158 value: function subscribe(observer) {
159 var _this = this;
160
161 var self = this;
162
163 var onCompletedOriginal = observer.onCompleted || DEFAULT_ON_COMPLETED;
164 var onCompletedWrapper = function onCompletedWrapper(metadata) {
165 var additionalMeta = self._metaSupplier();
166 for (var key in additionalMeta) {
167 if (additionalMeta.hasOwnProperty(key)) {
168 metadata[key] = additionalMeta[key];
169 }
170 }
171 var sum = new _resultSummary2.default(_this._statement, _this._parameters, metadata);
172
173 // notify connection holder that the used connection is not needed any more because result has
174 // been fully consumed; call the original onCompleted callback after that
175 self._connectionHolder.releaseConnection().then(function () {
176 onCompletedOriginal.call(observer, sum);
177 });
178 };
179 observer.onCompleted = onCompletedWrapper;
180
181 var onErrorOriginal = observer.onError || DEFAULT_ON_ERROR;
182 var onErrorWrapper = function onErrorWrapper(error) {
183 // notify connection holder that the used connection is not needed any more because error happened
184 // and result can't bee consumed any further; call the original onError callback after that
185 self._connectionHolder.releaseConnection().then(function () {
186 replaceStacktrace(error, _this._stack);
187 onErrorOriginal.call(observer, error);
188 });
189 };
190 observer.onError = onErrorWrapper;
191
192 this._streamObserver.subscribe(observer);
193 }
194 }]);
195 return Result;
196}();
197
198function captureStacktrace() {
199 var error = new Error('');
200 if (error.stack) {
201 return error.stack.replace(/^Error(\n\r)*/, ''); // we don't need the 'Error\n' part, if only it exists
202 }
203 return null;
204}
205
206function replaceStacktrace(error, newStack) {
207 if (newStack) {
208 // Error.prototype.toString() concatenates error.name and error.message nicely
209 // then we add the rest of the stack trace
210 error.stack = error.toString() + '\n' + newStack;
211 }
212}
213
214exports.default = Result;
\No newline at end of file