UNPKG

7.62 kBJavaScriptView Raw
1/*!
2 * Module dependencies.
3 */
4
5'use strict';
6
7const MongooseError = require('../error/mongooseError');
8const Readable = require('stream').Readable;
9const promiseOrCallback = require('../helpers/promiseOrCallback');
10const eachAsync = require('../helpers/cursor/eachAsync');
11const util = require('util');
12
13/**
14 * An AggregationCursor is a concurrency primitive for processing aggregation
15 * results one document at a time. It is analogous to QueryCursor.
16 *
17 * An AggregationCursor fulfills the Node.js streams3 API,
18 * in addition to several other mechanisms for loading documents from MongoDB
19 * one at a time.
20 *
21 * Creating an AggregationCursor executes the model's pre aggregate hooks,
22 * but **not** the model's post aggregate hooks.
23 *
24 * Unless you're an advanced user, do **not** instantiate this class directly.
25 * Use [`Aggregate#cursor()`](/docs/api.html#aggregate_Aggregate-cursor) instead.
26 *
27 * @param {Aggregate} agg
28 * @param {Object} options
29 * @inherits Readable
30 * @event `cursor`: Emitted when the cursor is created
31 * @event `error`: Emitted when an error occurred
32 * @event `data`: Emitted when the stream is flowing and the next doc is ready
33 * @event `end`: Emitted when the stream is exhausted
34 * @api public
35 */
36
37function AggregationCursor(agg) {
38 Readable.call(this, { objectMode: true });
39
40 this.cursor = null;
41 this.agg = agg;
42 this._transforms = [];
43 const model = agg._model;
44 delete agg.options.cursor.useMongooseAggCursor;
45 this._mongooseOptions = {};
46
47 _init(model, this, agg);
48}
49
50util.inherits(AggregationCursor, Readable);
51
52/*!
53 * ignore
54 */
55
56function _init(model, c, agg) {
57 if (!model.collection.buffer) {
58 model.hooks.execPre('aggregate', agg, function() {
59 c.cursor = model.collection.aggregate(agg._pipeline, agg.options || {});
60 c.emit('cursor', c.cursor);
61 });
62 } else {
63 model.collection.emitter.once('queue', function() {
64 model.hooks.execPre('aggregate', agg, function() {
65 c.cursor = model.collection.aggregate(agg._pipeline, agg.options || {});
66 c.emit('cursor', c.cursor);
67 });
68 });
69 }
70}
71
72/*!
73 * Necessary to satisfy the Readable API
74 */
75
76AggregationCursor.prototype._read = function() {
77 const _this = this;
78 _next(this, function(error, doc) {
79 if (error) {
80 return _this.emit('error', error);
81 }
82 if (!doc) {
83 _this.push(null);
84 _this.cursor.close(function(error) {
85 if (error) {
86 return _this.emit('error', error);
87 }
88 setTimeout(function() {
89 // on node >= 14 streams close automatically (gh-8834)
90 const isNotClosedAutomatically = !_this.destroyed;
91 if (isNotClosedAutomatically) {
92 _this.emit('close');
93 }
94 }, 0);
95 });
96 return;
97 }
98 _this.push(doc);
99 });
100};
101
102if (Symbol.asyncIterator != null) {
103 const msg = 'Mongoose does not support using async iterators with an ' +
104 'existing aggregation cursor. See http://bit.ly/mongoose-async-iterate-aggregation';
105
106 AggregationCursor.prototype[Symbol.asyncIterator] = function() {
107 throw new MongooseError(msg);
108 };
109}
110
111/**
112 * Registers a transform function which subsequently maps documents retrieved
113 * via the streams interface or `.next()`
114 *
115 * ####Example
116 *
117 * // Map documents returned by `data` events
118 * Thing.
119 * find({ name: /^hello/ }).
120 * cursor().
121 * map(function (doc) {
122 * doc.foo = "bar";
123 * return doc;
124 * })
125 * on('data', function(doc) { console.log(doc.foo); });
126 *
127 * // Or map documents returned by `.next()`
128 * var cursor = Thing.find({ name: /^hello/ }).
129 * cursor().
130 * map(function (doc) {
131 * doc.foo = "bar";
132 * return doc;
133 * });
134 * cursor.next(function(error, doc) {
135 * console.log(doc.foo);
136 * });
137 *
138 * @param {Function} fn
139 * @return {AggregationCursor}
140 * @api public
141 * @method map
142 */
143
144AggregationCursor.prototype.map = function(fn) {
145 this._transforms.push(fn);
146 return this;
147};
148
149/*!
150 * Marks this cursor as errored
151 */
152
153AggregationCursor.prototype._markError = function(error) {
154 this._error = error;
155 return this;
156};
157
158/**
159 * Marks this cursor as closed. Will stop streaming and subsequent calls to
160 * `next()` will error.
161 *
162 * @param {Function} callback
163 * @return {Promise}
164 * @api public
165 * @method close
166 * @emits close
167 * @see MongoDB driver cursor#close http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#close
168 */
169
170AggregationCursor.prototype.close = function(callback) {
171 return promiseOrCallback(callback, cb => {
172 this.cursor.close(error => {
173 if (error) {
174 cb(error);
175 return this.listeners('error').length > 0 && this.emit('error', error);
176 }
177 this.emit('close');
178 cb(null);
179 });
180 });
181};
182
183/**
184 * Get the next document from this cursor. Will return `null` when there are
185 * no documents left.
186 *
187 * @param {Function} callback
188 * @return {Promise}
189 * @api public
190 * @method next
191 */
192
193AggregationCursor.prototype.next = function(callback) {
194 return promiseOrCallback(callback, cb => {
195 _next(this, cb);
196 });
197};
198
199/**
200 * Execute `fn` for every document in the cursor. If `fn` returns a promise,
201 * will wait for the promise to resolve before iterating on to the next one.
202 * Returns a promise that resolves when done.
203 *
204 * @param {Function} fn
205 * @param {Object} [options]
206 * @param {Number} [options.parallel] the number of promises to execute in parallel. Defaults to 1.
207 * @param {Function} [callback] executed when all docs have been processed
208 * @return {Promise}
209 * @api public
210 * @method eachAsync
211 */
212
213AggregationCursor.prototype.eachAsync = function(fn, opts, callback) {
214 const _this = this;
215 if (typeof opts === 'function') {
216 callback = opts;
217 opts = {};
218 }
219 opts = opts || {};
220
221 return eachAsync(function(cb) { return _next(_this, cb); }, fn, opts, callback);
222};
223
224/*!
225 * ignore
226 */
227
228AggregationCursor.prototype.transformNull = function(val) {
229 if (arguments.length === 0) {
230 val = true;
231 }
232 this._mongooseOptions.transformNull = val;
233 return this;
234};
235
236/**
237 * Adds a [cursor flag](http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#addCursorFlag).
238 * Useful for setting the `noCursorTimeout` and `tailable` flags.
239 *
240 * @param {String} flag
241 * @param {Boolean} value
242 * @return {AggregationCursor} this
243 * @api public
244 * @method addCursorFlag
245 */
246
247AggregationCursor.prototype.addCursorFlag = function(flag, value) {
248 const _this = this;
249 _waitForCursor(this, function() {
250 _this.cursor.addCursorFlag(flag, value);
251 });
252 return this;
253};
254
255/*!
256 * ignore
257 */
258
259function _waitForCursor(ctx, cb) {
260 if (ctx.cursor) {
261 return cb();
262 }
263 ctx.once('cursor', function() {
264 cb();
265 });
266}
267
268/*!
269 * Get the next doc from the underlying cursor and mongooseify it
270 * (populate, etc.)
271 */
272
273function _next(ctx, cb) {
274 let callback = cb;
275 if (ctx._transforms.length) {
276 callback = function(err, doc) {
277 if (err || (doc === null && !ctx._mongooseOptions.transformNull)) {
278 return cb(err, doc);
279 }
280 cb(err, ctx._transforms.reduce(function(doc, fn) {
281 return fn(doc);
282 }, doc));
283 };
284 }
285
286 if (ctx._error) {
287 return process.nextTick(function() {
288 callback(ctx._error);
289 });
290 }
291
292 if (ctx.cursor) {
293 return ctx.cursor.next(function(error, doc) {
294 if (error) {
295 return callback(error);
296 }
297 if (!doc) {
298 return callback(null, null);
299 }
300
301 callback(null, doc);
302 });
303 } else {
304 ctx.once('cursor', function() {
305 _next(ctx, cb);
306 });
307 }
308}
309
310module.exports = AggregationCursor;