UNPKG

30.1 kBJavaScriptView Raw
1'use strict';
2
3/*!
4 * Module dependencies
5 */
6
7const AggregationCursor = require('./cursor/AggregationCursor');
8const Query = require('./query');
9const util = require('util');
10const utils = require('./utils');
11const read = Query.prototype.read;
12const readConcern = Query.prototype.readConcern;
13
14/**
15 * Aggregate constructor used for building aggregation pipelines. Do not
16 * instantiate this class directly, use [Model.aggregate()](/docs/api.html#aggregate_aggregate) instead.
17 *
18 * ####Example:
19 *
20 * const aggregate = Model.aggregate([
21 * { $project: { a: 1, b: 1 } },
22 * { $skip: 5 }
23 * ]);
24 *
25 * Model.
26 * aggregate([{ $match: { age: { $gte: 21 }}}]).
27 * unwind('tags').
28 * exec(callback);
29 *
30 * ####Note:
31 *
32 * - The documents returned are plain javascript objects, not mongoose documents (since any shape of document can be returned).
33 * - Mongoose does **not** cast pipeline stages. The below will **not** work unless `_id` is a string in the database
34 *
35 * ```javascript
36 * new Aggregate([{ $match: { _id: '00000000000000000000000a' } }]);
37 * // Do this instead to cast to an ObjectId
38 * new Aggregate([{ $match: { _id: mongoose.Types.ObjectId('00000000000000000000000a') } }]);
39 * ```
40 *
41 * @see MongoDB http://docs.mongodb.org/manual/applications/aggregation/
42 * @see driver http://mongodb.github.com/node-mongodb-native/api-generated/collection.html#aggregate
43 * @param {Array} [pipeline] aggregation pipeline as an array of objects
44 * @api public
45 */
46
47function Aggregate(pipeline) {
48 this._pipeline = [];
49 this._model = undefined;
50 this.options = {};
51
52 if (arguments.length === 1 && util.isArray(pipeline)) {
53 this.append.apply(this, pipeline);
54 }
55}
56
57/**
58 * Binds this aggregate to a model.
59 *
60 * @param {Model} model the model to which the aggregate is to be bound
61 * @return {Aggregate}
62 * @api public
63 */
64
65Aggregate.prototype.model = function(model) {
66 this._model = model;
67 if (model.schema != null) {
68 if (this.options.readPreference == null &&
69 model.schema.options.read != null) {
70 this.options.readPreference = model.schema.options.read;
71 }
72 if (this.options.collation == null &&
73 model.schema.options.collation != null) {
74 this.options.collation = model.schema.options.collation;
75 }
76 }
77 return this;
78};
79
80/**
81 * Appends new operators to this aggregate pipeline
82 *
83 * ####Examples:
84 *
85 * aggregate.append({ $project: { field: 1 }}, { $limit: 2 });
86 *
87 * // or pass an array
88 * var pipeline = [{ $match: { daw: 'Logic Audio X' }} ];
89 * aggregate.append(pipeline);
90 *
91 * @param {Object} ops operator(s) to append
92 * @return {Aggregate}
93 * @api public
94 */
95
96Aggregate.prototype.append = function() {
97 const args = (arguments.length === 1 && util.isArray(arguments[0]))
98 ? arguments[0]
99 : utils.args(arguments);
100
101 if (!args.every(isOperator)) {
102 throw new Error('Arguments must be aggregate pipeline operators');
103 }
104
105 this._pipeline = this._pipeline.concat(args);
106
107 return this;
108};
109
110/**
111 * Appends a new $addFields operator to this aggregate pipeline.
112 * Requires MongoDB v3.4+ to work
113 *
114 * ####Examples:
115 *
116 * // adding new fields based on existing fields
117 * aggregate.addFields({
118 * newField: '$b.nested'
119 * , plusTen: { $add: ['$val', 10]}
120 * , sub: {
121 * name: '$a'
122 * }
123 * })
124 *
125 * // etc
126 * aggregate.addFields({ salary_k: { $divide: [ "$salary", 1000 ] } });
127 *
128 * @param {Object} arg field specification
129 * @see $addFields https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/
130 * @return {Aggregate}
131 * @api public
132 */
133Aggregate.prototype.addFields = function(arg) {
134 const fields = {};
135 if (typeof arg === 'object' && !util.isArray(arg)) {
136 Object.keys(arg).forEach(function(field) {
137 fields[field] = arg[field];
138 });
139 } else {
140 throw new Error('Invalid addFields() argument. Must be an object');
141 }
142 return this.append({$addFields: fields});
143};
144
145/**
146 * Appends a new $project operator to this aggregate pipeline.
147 *
148 * Mongoose query [selection syntax](#query_Query-select) is also supported.
149 *
150 * ####Examples:
151 *
152 * // include a, include b, exclude _id
153 * aggregate.project("a b -_id");
154 *
155 * // or you may use object notation, useful when
156 * // you have keys already prefixed with a "-"
157 * aggregate.project({a: 1, b: 1, _id: 0});
158 *
159 * // reshaping documents
160 * aggregate.project({
161 * newField: '$b.nested'
162 * , plusTen: { $add: ['$val', 10]}
163 * , sub: {
164 * name: '$a'
165 * }
166 * })
167 *
168 * // etc
169 * aggregate.project({ salary_k: { $divide: [ "$salary", 1000 ] } });
170 *
171 * @param {Object|String} arg field specification
172 * @see projection http://docs.mongodb.org/manual/reference/aggregation/project/
173 * @return {Aggregate}
174 * @api public
175 */
176
177Aggregate.prototype.project = function(arg) {
178 const fields = {};
179
180 if (typeof arg === 'object' && !util.isArray(arg)) {
181 Object.keys(arg).forEach(function(field) {
182 fields[field] = arg[field];
183 });
184 } else if (arguments.length === 1 && typeof arg === 'string') {
185 arg.split(/\s+/).forEach(function(field) {
186 if (!field) {
187 return;
188 }
189 const include = field[0] === '-' ? 0 : 1;
190 if (include === 0) {
191 field = field.substring(1);
192 }
193 fields[field] = include;
194 });
195 } else {
196 throw new Error('Invalid project() argument. Must be string or object');
197 }
198
199 return this.append({$project: fields});
200};
201
202/**
203 * Appends a new custom $group operator to this aggregate pipeline.
204 *
205 * ####Examples:
206 *
207 * aggregate.group({ _id: "$department" });
208 *
209 * @see $group http://docs.mongodb.org/manual/reference/aggregation/group/
210 * @method group
211 * @memberOf Aggregate
212 * @instance
213 * @param {Object} arg $group operator contents
214 * @return {Aggregate}
215 * @api public
216 */
217
218/**
219 * Appends a new custom $match operator to this aggregate pipeline.
220 *
221 * ####Examples:
222 *
223 * aggregate.match({ department: { $in: [ "sales", "engineering" ] } });
224 *
225 * @see $match http://docs.mongodb.org/manual/reference/aggregation/match/
226 * @method match
227 * @memberOf Aggregate
228 * @instance
229 * @param {Object} arg $match operator contents
230 * @return {Aggregate}
231 * @api public
232 */
233
234/**
235 * Appends a new $skip operator to this aggregate pipeline.
236 *
237 * ####Examples:
238 *
239 * aggregate.skip(10);
240 *
241 * @see $skip http://docs.mongodb.org/manual/reference/aggregation/skip/
242 * @method skip
243 * @memberOf Aggregate
244 * @instance
245 * @param {Number} num number of records to skip before next stage
246 * @return {Aggregate}
247 * @api public
248 */
249
250/**
251 * Appends a new $limit operator to this aggregate pipeline.
252 *
253 * ####Examples:
254 *
255 * aggregate.limit(10);
256 *
257 * @see $limit http://docs.mongodb.org/manual/reference/aggregation/limit/
258 * @method limit
259 * @memberOf Aggregate
260 * @instance
261 * @param {Number} num maximum number of records to pass to the next stage
262 * @return {Aggregate}
263 * @api public
264 */
265
266/**
267 * Appends a new $geoNear operator to this aggregate pipeline.
268 *
269 * ####NOTE:
270 *
271 * **MUST** be used as the first operator in the pipeline.
272 *
273 * ####Examples:
274 *
275 * aggregate.near({
276 * near: [40.724, -73.997],
277 * distanceField: "dist.calculated", // required
278 * maxDistance: 0.008,
279 * query: { type: "public" },
280 * includeLocs: "dist.location",
281 * uniqueDocs: true,
282 * num: 5
283 * });
284 *
285 * @see $geoNear http://docs.mongodb.org/manual/reference/aggregation/geoNear/
286 * @method near
287 * @memberOf Aggregate
288 * @instance
289 * @param {Object} arg
290 * @return {Aggregate}
291 * @api public
292 */
293
294Aggregate.prototype.near = function(arg) {
295 const op = {};
296 op.$geoNear = arg;
297 return this.append(op);
298};
299
300/*!
301 * define methods
302 */
303
304'group match skip limit out'.split(' ').forEach(function($operator) {
305 Aggregate.prototype[$operator] = function(arg) {
306 const op = {};
307 op['$' + $operator] = arg;
308 return this.append(op);
309 };
310});
311
312/**
313 * Appends new custom $unwind operator(s) to this aggregate pipeline.
314 *
315 * Note that the `$unwind` operator requires the path name to start with '$'.
316 * Mongoose will prepend '$' if the specified field doesn't start '$'.
317 *
318 * ####Examples:
319 *
320 * aggregate.unwind("tags");
321 * aggregate.unwind("a", "b", "c");
322 *
323 * @see $unwind http://docs.mongodb.org/manual/reference/aggregation/unwind/
324 * @param {String} fields the field(s) to unwind
325 * @return {Aggregate}
326 * @api public
327 */
328
329Aggregate.prototype.unwind = function() {
330 const args = utils.args(arguments);
331
332 const res = [];
333 for (let i = 0; i < args.length; ++i) {
334 const arg = args[i];
335 if (arg && typeof arg === 'object') {
336 res.push({ $unwind: arg });
337 } else if (typeof arg === 'string') {
338 res.push({
339 $unwind: (arg && arg.charAt(0) === '$') ? arg : '$' + arg
340 });
341 } else {
342 throw new Error('Invalid arg "' + arg + '" to unwind(), ' +
343 'must be string or object');
344 }
345 }
346
347 return this.append.apply(this, res);
348};
349
350/**
351 * Appends a new $replaceRoot operator to this aggregate pipeline.
352 *
353 * Note that the `$replaceRoot` operator requires field strings to start with '$'.
354 * If you are passing in a string Mongoose will prepend '$' if the specified field doesn't start '$'.
355 * If you are passing in an object the strings in your expression will not be altered.
356 *
357 * ####Examples:
358 *
359 * aggregate.replaceRoot("user");
360 *
361 * aggregate.replaceRoot({ x: { $concat: ['$this', '$that'] } });
362 *
363 * @see $replaceRoot https://docs.mongodb.org/manual/reference/operator/aggregation/replaceRoot
364 * @param {String|Object} the field or document which will become the new root document
365 * @return {Aggregate}
366 * @api public
367 */
368
369Aggregate.prototype.replaceRoot = function(newRoot) {
370 let ret;
371
372 if (typeof newRoot === 'string') {
373 ret = newRoot.startsWith('$') ? newRoot : '$' + newRoot;
374 } else {
375 ret = newRoot;
376 }
377
378 return this.append({
379 $replaceRoot: {
380 newRoot: ret
381 }
382 });
383};
384
385/**
386 * Appends a new $count operator to this aggregate pipeline.
387 *
388 * ####Examples:
389 *
390 * aggregate.count("userCount");
391 *
392 * @see $count https://docs.mongodb.org/manual/reference/operator/aggregation/count
393 * @param {String} the name of the count field
394 * @return {Aggregate}
395 * @api public
396 */
397
398Aggregate.prototype.count = function(countName) {
399 return this.append({ $count: countName });
400};
401
402/**
403 * Appends a new $sortByCount operator to this aggregate pipeline. Accepts either a string field name
404 * or a pipeline object.
405 *
406 * Note that the `$sortByCount` operator requires the new root to start with '$'.
407 * Mongoose will prepend '$' if the specified field name doesn't start with '$'.
408 *
409 * ####Examples:
410 *
411 * aggregate.sortByCount('users');
412 * aggregate.sortByCount({ $mergeObjects: [ "$employee", "$business" ] })
413 *
414 * @see $sortByCount https://docs.mongodb.com/manual/reference/operator/aggregation/sortByCount/
415 * @param {Object|String} arg
416 * @return {Aggregate} this
417 * @api public
418 */
419
420Aggregate.prototype.sortByCount = function(arg) {
421 if (arg && typeof arg === 'object') {
422 return this.append({ $sortByCount: arg });
423 } else if (typeof arg === 'string') {
424 return this.append({
425 $sortByCount: (arg && arg.charAt(0) === '$') ? arg : '$' + arg
426 });
427 } else {
428 throw new TypeError('Invalid arg "' + arg + '" to sortByCount(), ' +
429 'must be string or object');
430 }
431};
432
433/**
434 * Appends new custom $lookup operator(s) to this aggregate pipeline.
435 *
436 * ####Examples:
437 *
438 * aggregate.lookup({ from: 'users', localField: 'userId', foreignField: '_id', as: 'users' });
439 *
440 * @see $lookup https://docs.mongodb.org/manual/reference/operator/aggregation/lookup/#pipe._S_lookup
441 * @param {Object} options to $lookup as described in the above link
442 * @return {Aggregate}
443 * @api public
444 */
445
446Aggregate.prototype.lookup = function(options) {
447 return this.append({$lookup: options});
448};
449
450/**
451 * Appends new custom $graphLookup operator(s) to this aggregate pipeline, performing a recursive search on a collection.
452 *
453 * Note that graphLookup can only consume at most 100MB of memory, and does not allow disk use even if `{ allowDiskUse: true }` is specified.
454 *
455 * #### Examples:
456 * // Suppose we have a collection of courses, where a document might look like `{ _id: 0, name: 'Calculus', prerequisite: 'Trigonometry'}` and `{ _id: 0, name: 'Trigonometry', prerequisite: 'Algebra' }`
457 * aggregate.graphLookup({ from: 'courses', startWith: '$prerequisite', connectFromField: 'prerequisite', connectToField: 'name', as: 'prerequisites', maxDepth: 3 }) // this will recursively search the 'courses' collection up to 3 prerequisites
458 *
459 * @see $graphLookup https://docs.mongodb.com/manual/reference/operator/aggregation/graphLookup/#pipe._S_graphLookup
460 * @param {Object} options to $graphLookup as described in the above link
461 * @return {Aggregate}
462 * @api public
463 */
464
465Aggregate.prototype.graphLookup = function(options) {
466 const cloneOptions = {};
467 if (options) {
468 if (!utils.isObject(options)) {
469 throw new TypeError('Invalid graphLookup() argument. Must be an object.');
470 }
471
472 utils.mergeClone(cloneOptions, options);
473 const startWith = cloneOptions.startWith;
474
475 if (startWith && typeof startWith === 'string') {
476 cloneOptions.startWith = cloneOptions.startWith.charAt(0) === '$' ?
477 cloneOptions.startWith :
478 '$' + cloneOptions.startWith;
479 }
480
481 }
482 return this.append({ $graphLookup: cloneOptions });
483};
484
485/**
486 * Appends new custom $sample operator(s) to this aggregate pipeline.
487 *
488 * ####Examples:
489 *
490 * aggregate.sample(3); // Add a pipeline that picks 3 random documents
491 *
492 * @see $sample https://docs.mongodb.org/manual/reference/operator/aggregation/sample/#pipe._S_sample
493 * @param {Number} size number of random documents to pick
494 * @return {Aggregate}
495 * @api public
496 */
497
498Aggregate.prototype.sample = function(size) {
499 return this.append({$sample: {size: size}});
500};
501
502/**
503 * Appends a new $sort operator to this aggregate pipeline.
504 *
505 * If an object is passed, values allowed are `asc`, `desc`, `ascending`, `descending`, `1`, and `-1`.
506 *
507 * If a string is passed, it must be a space delimited list of path names. The sort order of each path is ascending unless the path name is prefixed with `-` which will be treated as descending.
508 *
509 * ####Examples:
510 *
511 * // these are equivalent
512 * aggregate.sort({ field: 'asc', test: -1 });
513 * aggregate.sort('field -test');
514 *
515 * @see $sort http://docs.mongodb.org/manual/reference/aggregation/sort/
516 * @param {Object|String} arg
517 * @return {Aggregate} this
518 * @api public
519 */
520
521Aggregate.prototype.sort = function(arg) {
522 // TODO refactor to reuse the query builder logic
523
524 const sort = {};
525
526 if (arg.constructor.name === 'Object') {
527 const desc = ['desc', 'descending', -1];
528 Object.keys(arg).forEach(function(field) {
529 // If sorting by text score, skip coercing into 1/-1
530 if (arg[field] instanceof Object && arg[field].$meta) {
531 sort[field] = arg[field];
532 return;
533 }
534 sort[field] = desc.indexOf(arg[field]) === -1 ? 1 : -1;
535 });
536 } else if (arguments.length === 1 && typeof arg === 'string') {
537 arg.split(/\s+/).forEach(function(field) {
538 if (!field) {
539 return;
540 }
541 const ascend = field[0] === '-' ? -1 : 1;
542 if (ascend === -1) {
543 field = field.substring(1);
544 }
545 sort[field] = ascend;
546 });
547 } else {
548 throw new TypeError('Invalid sort() argument. Must be a string or object.');
549 }
550
551 return this.append({$sort: sort});
552};
553
554/**
555 * Sets the readPreference option for the aggregation query.
556 *
557 * ####Example:
558 *
559 * Model.aggregate(..).read('primaryPreferred').exec(callback)
560 *
561 * @param {String} pref one of the listed preference options or their aliases
562 * @param {Array} [tags] optional tags for this query
563 * @return {Aggregate} this
564 * @api public
565 * @see mongodb http://docs.mongodb.org/manual/applications/replication/#read-preference
566 * @see driver http://mongodb.github.com/node-mongodb-native/driver-articles/anintroductionto1_1and2_2.html#read-preferences
567 */
568
569Aggregate.prototype.read = function(pref, tags) {
570 if (!this.options) {
571 this.options = {};
572 }
573 read.call(this, pref, tags);
574 return this;
575};
576
577/**
578 * Sets the readConcern level for the aggregation query.
579 *
580 * ####Example:
581 *
582 * Model.aggregate(..).readConcern('majority').exec(callback)
583 *
584 * @param {String} level one of the listed read concern level or their aliases
585 * @see mongodb https://docs.mongodb.com/manual/reference/read-concern/
586 * @return {Aggregate} this
587 * @api public
588 */
589
590Aggregate.prototype.readConcern = function(level) {
591 if (!this.options) {
592 this.options = {};
593 }
594 readConcern.call(this, level);
595 return this;
596};
597
598/**
599 * Appends a new $redact operator to this aggregate pipeline.
600 *
601 * If 3 arguments are supplied, Mongoose will wrap them with if-then-else of $cond operator respectively
602 * If `thenExpr` or `elseExpr` is string, make sure it starts with $$, like `$$DESCEND`, `$$PRUNE` or `$$KEEP`.
603 *
604 * ####Example:
605 *
606 * Model.aggregate(...)
607 * .redact({
608 * $cond: {
609 * if: { $eq: [ '$level', 5 ] },
610 * then: '$$PRUNE',
611 * else: '$$DESCEND'
612 * }
613 * })
614 * .exec();
615 *
616 * // $redact often comes with $cond operator, you can also use the following syntax provided by mongoose
617 * Model.aggregate(...)
618 * .redact({ $eq: [ '$level', 5 ] }, '$$PRUNE', '$$DESCEND')
619 * .exec();
620 *
621 * @param {Object} expression redact options or conditional expression
622 * @param {String|Object} [thenExpr] true case for the condition
623 * @param {String|Object} [elseExpr] false case for the condition
624 * @return {Aggregate} this
625 * @see $redact https://docs.mongodb.com/manual/reference/operator/aggregation/redact/
626 * @api public
627 */
628
629Aggregate.prototype.redact = function(expression, thenExpr, elseExpr) {
630 if (arguments.length === 3) {
631 if ((typeof thenExpr === 'string' && !thenExpr.startsWith('$$')) ||
632 (typeof elseExpr === 'string' && !elseExpr.startsWith('$$'))) {
633 throw new Error('If thenExpr or elseExpr is string, it must start with $$. e.g. $$DESCEND, $$PRUNE, $$KEEP');
634 }
635
636 expression = {
637 $cond: {
638 if: expression,
639 then: thenExpr,
640 else: elseExpr
641 }
642 };
643 } else if (arguments.length !== 1) {
644 throw new TypeError('Invalid arguments');
645 }
646
647 return this.append({$redact: expression});
648};
649
650/**
651 * Execute the aggregation with explain
652 *
653 * ####Example:
654 *
655 * Model.aggregate(..).explain(callback)
656 *
657 * @param {Function} callback
658 * @return {Promise}
659 */
660
661Aggregate.prototype.explain = function(callback) {
662 return utils.promiseOrCallback(callback, cb => {
663 if (!this._pipeline.length) {
664 const err = new Error('Aggregate has empty pipeline');
665 return cb(err);
666 }
667
668 prepareDiscriminatorPipeline(this);
669
670 this._model.collection.
671 aggregate(this._pipeline, this.options || {}).
672 explain(function(error, result) {
673 if (error) {
674 return cb(error);
675 }
676 cb(null, result);
677 });
678 });
679};
680
681/**
682 * Sets the allowDiskUse option for the aggregation query (ignored for < 2.6.0)
683 *
684 * ####Example:
685 *
686 * await Model.aggregate([{ $match: { foo: 'bar' } }]).allowDiskUse(true);
687 *
688 * @param {Boolean} value Should tell server it can use hard drive to store data during aggregation.
689 * @param {Array} [tags] optional tags for this query
690 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
691 */
692
693Aggregate.prototype.allowDiskUse = function(value) {
694 this.options.allowDiskUse = value;
695 return this;
696};
697
698/**
699 * Sets the hint option for the aggregation query (ignored for < 3.6.0)
700 *
701 * ####Example:
702 *
703 * Model.aggregate(..).hint({ qty: 1, category: 1 } }).exec(callback)
704 *
705 * @param {Object|String} value a hint object or the index name
706 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
707 */
708
709Aggregate.prototype.hint = function(value) {
710 this.options.hint = value;
711 return this;
712};
713
714/**
715 * Sets the session for this aggregation. Useful for [transactions](/docs/transactions.html).
716 *
717 * ####Example:
718 *
719 * const session = await Model.startSession();
720 * await Model.aggregate(..).session(session);
721 *
722 * @param {ClientSession} session
723 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
724 */
725
726Aggregate.prototype.session = function(session) {
727 if (session == null) {
728 delete this.options.session;
729 } else {
730 this.options.session = session;
731 }
732 return this;
733};
734
735/**
736 * Lets you set arbitrary options, for middleware or plugins.
737 *
738 * ####Example:
739 *
740 * var agg = Model.aggregate(..).option({ allowDiskUse: true }); // Set the `allowDiskUse` option
741 * agg.options; // `{ allowDiskUse: true }`
742 *
743 * @param {Object} options keys to merge into current options
744 * @param [options.maxTimeMS] number limits the time this aggregation will run, see [MongoDB docs on `maxTimeMS`](https://docs.mongodb.com/manual/reference/operator/meta/maxTimeMS/)
745 * @param [options.allowDiskUse] boolean if true, the MongoDB server will use the hard drive to store data during this aggregation
746 * @param [options.collation] object see [`Aggregate.prototype.collation()`](./docs/api.html#aggregate_Aggregate-collation)
747 * @param [options.session] ClientSession see [`Aggregate.prototype.session()`](./docs/api.html#aggregate_Aggregate-session)
748 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
749 * @return {Aggregate} this
750 * @api public
751 */
752
753Aggregate.prototype.option = function(value) {
754 for (const key in value) {
755 this.options[key] = value[key];
756 }
757 return this;
758};
759
760/**
761 * Sets the cursor option option for the aggregation query (ignored for < 2.6.0).
762 * Note the different syntax below: .exec() returns a cursor object, and no callback
763 * is necessary.
764 *
765 * ####Example:
766 *
767 * var cursor = Model.aggregate(..).cursor({ batchSize: 1000 }).exec();
768 * cursor.each(function(error, doc) {
769 * // use doc
770 * });
771 *
772 * @param {Object} options
773 * @param {Number} options.batchSize set the cursor batch size
774 * @param {Boolean} [options.useMongooseAggCursor] use experimental mongoose-specific aggregation cursor (for `eachAsync()` and other query cursor semantics)
775 * @return {Aggregate} this
776 * @api public
777 * @see mongodb http://mongodb.github.io/node-mongodb-native/2.0/api/AggregationCursor.html
778 */
779
780Aggregate.prototype.cursor = function(options) {
781 if (!this.options) {
782 this.options = {};
783 }
784 this.options.cursor = options || {};
785 return this;
786};
787
788/**
789 * Adds a [cursor flag](http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#addCursorFlag)
790 *
791 * ####Example:
792 *
793 * Model.aggregate(..).addCursorFlag('noCursorTimeout', true).exec();
794 *
795 * @param {String} flag
796 * @param {Boolean} value
797 * @return {Aggregate} this
798 * @api public
799 * @see mongodb http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#addCursorFlag
800 */
801
802Aggregate.prototype.addCursorFlag = function(flag, value) {
803 if (!this.options) {
804 this.options = {};
805 }
806 this.options[flag] = value;
807 return this;
808};
809
810/**
811 * Adds a collation
812 *
813 * ####Example:
814 *
815 * Model.aggregate(..).collation({ locale: 'en_US', strength: 1 }).exec();
816 *
817 * @param {Object} collation options
818 * @return {Aggregate} this
819 * @api public
820 * @see mongodb http://mongodb.github.io/node-mongodb-native/2.2/api/Collection.html#aggregate
821 */
822
823Aggregate.prototype.collation = function(collation) {
824 if (!this.options) {
825 this.options = {};
826 }
827 this.options.collation = collation;
828 return this;
829};
830
831/**
832 * Combines multiple aggregation pipelines.
833 *
834 * ####Example:
835 *
836 * Model.aggregate(...)
837 * .facet({
838 * books: [{ groupBy: '$author' }],
839 * price: [{ $bucketAuto: { groupBy: '$price', buckets: 2 } }]
840 * })
841 * .exec();
842 *
843 * // Output: { books: [...], price: [{...}, {...}] }
844 *
845 * @param {Object} facet options
846 * @return {Aggregate} this
847 * @see $facet https://docs.mongodb.com/v3.4/reference/operator/aggregation/facet/
848 * @api public
849 */
850
851Aggregate.prototype.facet = function(options) {
852 return this.append({$facet: options});
853};
854
855/**
856 * Returns the current pipeline
857 *
858 * ####Example:
859 *
860 * MyModel.aggregate().match({ test: 1 }).pipeline(); // [{ $match: { test: 1 } }]
861 *
862 * @return {Array}
863 * @api public
864 */
865
866
867Aggregate.prototype.pipeline = function() {
868 return this._pipeline;
869};
870
871/**
872 * Executes the aggregate pipeline on the currently bound Model.
873 *
874 * ####Example:
875 *
876 * aggregate.exec(callback);
877 *
878 * // Because a promise is returned, the `callback` is optional.
879 * var promise = aggregate.exec();
880 * promise.then(..);
881 *
882 * @see Promise #promise_Promise
883 * @param {Function} [callback]
884 * @return {Promise}
885 * @api public
886 */
887
888Aggregate.prototype.exec = function(callback) {
889 if (!this._model) {
890 throw new Error('Aggregate not bound to any Model');
891 }
892 const model = this._model;
893 const options = utils.clone(this.options || {});
894 const pipeline = this._pipeline;
895 const collection = this._model.collection;
896
897 if (options && options.cursor) {
898 return new AggregationCursor(this);
899 }
900
901 return utils.promiseOrCallback(callback, cb => {
902 if (!pipeline.length) {
903 const err = new Error('Aggregate has empty pipeline');
904 return cb(err);
905 }
906
907 prepareDiscriminatorPipeline(this);
908
909 model.hooks.execPre('aggregate', this, error => {
910 if (error) {
911 const _opts = { error: error };
912 return model.hooks.execPost('aggregate', this, [null], _opts, error => {
913 cb(error);
914 });
915 }
916
917 collection.aggregate(pipeline, options, (error, cursor) => {
918 if (error) {
919 const _opts = { error: error };
920 return model.hooks.execPost('aggregate', this, [null], _opts, error => {
921 if (error) {
922 return cb(error);
923 }
924 return cb(null);
925 });
926 }
927 cursor.toArray((error, result) => {
928 const _opts = { error: error };
929 model.hooks.execPost('aggregate', this, [result], _opts, (error, result) => {
930 if (error) {
931 return cb(error);
932 }
933
934 cb(null, result);
935 });
936 });
937 });
938 });
939 });
940};
941
942/**
943 * Provides promise for aggregate.
944 *
945 * ####Example:
946 *
947 * Model.aggregate(..).then(successCallback, errorCallback);
948 *
949 * @see Promise #promise_Promise
950 * @param {Function} [resolve] successCallback
951 * @param {Function} [reject] errorCallback
952 * @return {Promise}
953 */
954Aggregate.prototype.then = function(resolve, reject) {
955 return this.exec().then(resolve, reject);
956};
957
958/**
959 * Returns an asyncIterator for use with [`for/await/of` loops](http://bit.ly/async-iterators)
960 * This function *only* works for `find()` queries.
961 * You do not need to call this function explicitly, the JavaScript runtime
962 * will call it for you.
963 *
964 * ####Example
965 *
966 * for await (const doc of Model.find().sort({ name: 1 })) {
967 * console.log(doc.name);
968 * }
969 *
970 * Node.js 10.x supports async iterators natively without any flags. You can
971 * enable async iterators in Node.js 8.x using the [`--harmony_async_iteration` flag](https://github.com/tc39/proposal-async-iteration/issues/117#issuecomment-346695187).
972 *
973 * **Note:** This function is not if `Symbol.asyncIterator` is undefined. If
974 * `Symbol.asyncIterator` is undefined, that means your Node.js version does not
975 * support async iterators.
976 *
977 * @method Symbol.asyncIterator
978 * @memberOf Query
979 * @instance
980 * @api public
981 */
982
983if (Symbol.asyncIterator != null) {
984 Aggregate.prototype[Symbol.asyncIterator] = function() {
985 return this.cursor({ useMongooseAggCursor: true }).
986 exec().
987 transformNull().
988 map(doc => {
989 return doc == null ? { done: true } : { value: doc, done: false };
990 });
991 };
992}
993
994/*!
995 * Helpers
996 */
997
998/**
999 * Checks whether an object is likely a pipeline operator
1000 *
1001 * @param {Object} obj object to check
1002 * @return {Boolean}
1003 * @api private
1004 */
1005
1006function isOperator(obj) {
1007 if (typeof obj !== 'object') {
1008 return false;
1009 }
1010
1011 const k = Object.keys(obj);
1012
1013 return k.length === 1 && k.some(key => { return key[0] === '$'; });
1014}
1015
1016/*!
1017 * Adds the appropriate `$match` pipeline step to the top of an aggregate's
1018 * pipeline, should it's model is a non-root discriminator type. This is
1019 * analogous to the `prepareDiscriminatorCriteria` function in `lib/query.js`.
1020 *
1021 * @param {Aggregate} aggregate Aggregate to prepare
1022 */
1023
1024Aggregate._prepareDiscriminatorPipeline = prepareDiscriminatorPipeline;
1025
1026function prepareDiscriminatorPipeline(aggregate) {
1027 const schema = aggregate._model.schema;
1028 const discriminatorMapping = schema && schema.discriminatorMapping;
1029
1030 if (discriminatorMapping && !discriminatorMapping.isRoot) {
1031 const originalPipeline = aggregate._pipeline;
1032 const discriminatorKey = discriminatorMapping.key;
1033 const discriminatorValue = discriminatorMapping.value;
1034
1035 // If the first pipeline stage is a match and it doesn't specify a `__t`
1036 // key, add the discriminator key to it. This allows for potential
1037 // aggregation query optimizations not to be disturbed by this feature.
1038 if (originalPipeline[0] && originalPipeline[0].$match && !originalPipeline[0].$match[discriminatorKey]) {
1039 originalPipeline[0].$match[discriminatorKey] = discriminatorValue;
1040 // `originalPipeline` is a ref, so there's no need for
1041 // aggregate._pipeline = originalPipeline
1042 } else if (originalPipeline[0] && originalPipeline[0].$geoNear) {
1043 originalPipeline[0].$geoNear.query =
1044 originalPipeline[0].$geoNear.query || {};
1045 originalPipeline[0].$geoNear.query[discriminatorKey] = discriminatorValue;
1046 } else {
1047 const match = {};
1048 match[discriminatorKey] = discriminatorValue;
1049 aggregate._pipeline.unshift({ $match: match });
1050 }
1051 }
1052}
1053
1054/*!
1055 * Exports
1056 */
1057
1058module.exports = Aggregate;