UNPKG

32.2 kBJavaScriptView Raw
1'use strict';
2
3/*!
4 * Module dependencies
5 */
6
7const AggregationCursor = require('./cursor/AggregationCursor');
8const Query = require('./query');
9const util = require('util');
10const utils = require('./utils');
11const read = Query.prototype.read;
12const readConcern = Query.prototype.readConcern;
13
14/**
15 * Aggregate constructor used for building aggregation pipelines. Do not
16 * instantiate this class directly, use [Model.aggregate()](/docs/api.html#model_Model.aggregate) instead.
17 *
18 * ####Example:
19 *
20 * const aggregate = Model.aggregate([
21 * { $project: { a: 1, b: 1 } },
22 * { $skip: 5 }
23 * ]);
24 *
25 * Model.
26 * aggregate([{ $match: { age: { $gte: 21 }}}]).
27 * unwind('tags').
28 * exec(callback);
29 *
30 * ####Note:
31 *
32 * - The documents returned are plain javascript objects, not mongoose documents (since any shape of document can be returned).
33 * - Mongoose does **not** cast pipeline stages. The below will **not** work unless `_id` is a string in the database
34 *
35 * ```javascript
36 * new Aggregate([{ $match: { _id: '00000000000000000000000a' } }]);
37 * // Do this instead to cast to an ObjectId
38 * new Aggregate([{ $match: { _id: mongoose.Types.ObjectId('00000000000000000000000a') } }]);
39 * ```
40 *
41 * @see MongoDB http://docs.mongodb.org/manual/applications/aggregation/
42 * @see driver http://mongodb.github.com/node-mongodb-native/api-generated/collection.html#aggregate
43 * @param {Array} [pipeline] aggregation pipeline as an array of objects
44 * @api public
45 */
46
47function Aggregate(pipeline) {
48 this._pipeline = [];
49 this._model = undefined;
50 this.options = {};
51
52 if (arguments.length === 1 && util.isArray(pipeline)) {
53 this.append.apply(this, pipeline);
54 }
55}
56
57/**
58 * Contains options passed down to the [aggregate command](https://docs.mongodb.com/manual/reference/command/aggregate/).
59 * Supported options are:
60 *
61 * - `readPreference`
62 * - [`cursor`](./api.html#aggregate_Aggregate-cursor)
63 * - [`explain`](./api.html#aggregate_Aggregate-explain)
64 * - [`allowDiskUse`](./api.html#aggregate_Aggregate-allowDiskUse)
65 * - `maxTimeMS`
66 * - `bypassDocumentValidation`
67 * - `raw`
68 * - `promoteLongs`
69 * - `promoteValues`
70 * - `promoteBuffers`
71 * - [`collation`](./api.html#aggregate_Aggregate-collation)
72 * - `comment`
73 * - [`session`](./api.html#aggregate_Aggregate-session)
74 *
75 * @property options
76 * @memberOf Aggregate
77 * @api public
78 */
79
80Aggregate.prototype.options;
81
82/**
83 * Get/set the model that this aggregation will execute on.
84 *
85 * ####Example:
86 * const aggregate = MyModel.aggregate([{ $match: { answer: 42 } }]);
87 * aggregate.model() === MyModel; // true
88 *
89 * // Change the model. There's rarely any reason to do this.
90 * aggregate.model(SomeOtherModel);
91 * aggregate.model() === SomeOtherModel; // true
92 *
93 * @param {Model} [model] the model to which the aggregate is to be bound
94 * @return {Aggregate|Model} if model is passed, will return `this`, otherwise will return the model
95 * @api public
96 */
97
98Aggregate.prototype.model = function(model) {
99 if (arguments.length === 0) {
100 return this._model;
101 }
102
103 this._model = model;
104 if (model.schema != null) {
105 if (this.options.readPreference == null &&
106 model.schema.options.read != null) {
107 this.options.readPreference = model.schema.options.read;
108 }
109 if (this.options.collation == null &&
110 model.schema.options.collation != null) {
111 this.options.collation = model.schema.options.collation;
112 }
113 }
114 return this;
115};
116
117/**
118 * Appends new operators to this aggregate pipeline
119 *
120 * ####Examples:
121 *
122 * aggregate.append({ $project: { field: 1 }}, { $limit: 2 });
123 *
124 * // or pass an array
125 * var pipeline = [{ $match: { daw: 'Logic Audio X' }} ];
126 * aggregate.append(pipeline);
127 *
128 * @param {Object} ops operator(s) to append
129 * @return {Aggregate}
130 * @api public
131 */
132
133Aggregate.prototype.append = function() {
134 const args = (arguments.length === 1 && util.isArray(arguments[0]))
135 ? arguments[0]
136 : utils.args(arguments);
137
138 if (!args.every(isOperator)) {
139 throw new Error('Arguments must be aggregate pipeline operators');
140 }
141
142 this._pipeline = this._pipeline.concat(args);
143
144 return this;
145};
146
147/**
148 * Appends a new $addFields operator to this aggregate pipeline.
149 * Requires MongoDB v3.4+ to work
150 *
151 * ####Examples:
152 *
153 * // adding new fields based on existing fields
154 * aggregate.addFields({
155 * newField: '$b.nested'
156 * , plusTen: { $add: ['$val', 10]}
157 * , sub: {
158 * name: '$a'
159 * }
160 * })
161 *
162 * // etc
163 * aggregate.addFields({ salary_k: { $divide: [ "$salary", 1000 ] } });
164 *
165 * @param {Object} arg field specification
166 * @see $addFields https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/
167 * @return {Aggregate}
168 * @api public
169 */
170Aggregate.prototype.addFields = function(arg) {
171 const fields = {};
172 if (typeof arg === 'object' && !util.isArray(arg)) {
173 Object.keys(arg).forEach(function(field) {
174 fields[field] = arg[field];
175 });
176 } else {
177 throw new Error('Invalid addFields() argument. Must be an object');
178 }
179 return this.append({$addFields: fields});
180};
181
182/**
183 * Appends a new $project operator to this aggregate pipeline.
184 *
185 * Mongoose query [selection syntax](#query_Query-select) is also supported.
186 *
187 * ####Examples:
188 *
189 * // include a, include b, exclude _id
190 * aggregate.project("a b -_id");
191 *
192 * // or you may use object notation, useful when
193 * // you have keys already prefixed with a "-"
194 * aggregate.project({a: 1, b: 1, _id: 0});
195 *
196 * // reshaping documents
197 * aggregate.project({
198 * newField: '$b.nested'
199 * , plusTen: { $add: ['$val', 10]}
200 * , sub: {
201 * name: '$a'
202 * }
203 * })
204 *
205 * // etc
206 * aggregate.project({ salary_k: { $divide: [ "$salary", 1000 ] } });
207 *
208 * @param {Object|String} arg field specification
209 * @see projection http://docs.mongodb.org/manual/reference/aggregation/project/
210 * @return {Aggregate}
211 * @api public
212 */
213
214Aggregate.prototype.project = function(arg) {
215 const fields = {};
216
217 if (typeof arg === 'object' && !util.isArray(arg)) {
218 Object.keys(arg).forEach(function(field) {
219 fields[field] = arg[field];
220 });
221 } else if (arguments.length === 1 && typeof arg === 'string') {
222 arg.split(/\s+/).forEach(function(field) {
223 if (!field) {
224 return;
225 }
226 const include = field[0] === '-' ? 0 : 1;
227 if (include === 0) {
228 field = field.substring(1);
229 }
230 fields[field] = include;
231 });
232 } else {
233 throw new Error('Invalid project() argument. Must be string or object');
234 }
235
236 return this.append({$project: fields});
237};
238
239/**
240 * Appends a new custom $group operator to this aggregate pipeline.
241 *
242 * ####Examples:
243 *
244 * aggregate.group({ _id: "$department" });
245 *
246 * @see $group http://docs.mongodb.org/manual/reference/aggregation/group/
247 * @method group
248 * @memberOf Aggregate
249 * @instance
250 * @param {Object} arg $group operator contents
251 * @return {Aggregate}
252 * @api public
253 */
254
255/**
256 * Appends a new custom $match operator to this aggregate pipeline.
257 *
258 * ####Examples:
259 *
260 * aggregate.match({ department: { $in: [ "sales", "engineering" ] } });
261 *
262 * @see $match http://docs.mongodb.org/manual/reference/aggregation/match/
263 * @method match
264 * @memberOf Aggregate
265 * @instance
266 * @param {Object} arg $match operator contents
267 * @return {Aggregate}
268 * @api public
269 */
270
271/**
272 * Appends a new $skip operator to this aggregate pipeline.
273 *
274 * ####Examples:
275 *
276 * aggregate.skip(10);
277 *
278 * @see $skip http://docs.mongodb.org/manual/reference/aggregation/skip/
279 * @method skip
280 * @memberOf Aggregate
281 * @instance
282 * @param {Number} num number of records to skip before next stage
283 * @return {Aggregate}
284 * @api public
285 */
286
287/**
288 * Appends a new $limit operator to this aggregate pipeline.
289 *
290 * ####Examples:
291 *
292 * aggregate.limit(10);
293 *
294 * @see $limit http://docs.mongodb.org/manual/reference/aggregation/limit/
295 * @method limit
296 * @memberOf Aggregate
297 * @instance
298 * @param {Number} num maximum number of records to pass to the next stage
299 * @return {Aggregate}
300 * @api public
301 */
302
303/**
304 * Appends a new $geoNear operator to this aggregate pipeline.
305 *
306 * ####NOTE:
307 *
308 * **MUST** be used as the first operator in the pipeline.
309 *
310 * ####Examples:
311 *
312 * aggregate.near({
313 * near: [40.724, -73.997],
314 * distanceField: "dist.calculated", // required
315 * maxDistance: 0.008,
316 * query: { type: "public" },
317 * includeLocs: "dist.location",
318 * uniqueDocs: true,
319 * num: 5
320 * });
321 *
322 * @see $geoNear http://docs.mongodb.org/manual/reference/aggregation/geoNear/
323 * @method near
324 * @memberOf Aggregate
325 * @instance
326 * @param {Object} arg
327 * @return {Aggregate}
328 * @api public
329 */
330
331Aggregate.prototype.near = function(arg) {
332 const op = {};
333 op.$geoNear = arg;
334 return this.append(op);
335};
336
337/*!
338 * define methods
339 */
340
341'group match skip limit out'.split(' ').forEach(function($operator) {
342 Aggregate.prototype[$operator] = function(arg) {
343 const op = {};
344 op['$' + $operator] = arg;
345 return this.append(op);
346 };
347});
348
349/**
350 * Appends new custom $unwind operator(s) to this aggregate pipeline.
351 *
352 * Note that the `$unwind` operator requires the path name to start with '$'.
353 * Mongoose will prepend '$' if the specified field doesn't start '$'.
354 *
355 * ####Examples:
356 *
357 * aggregate.unwind("tags");
358 * aggregate.unwind("a", "b", "c");
359 *
360 * @see $unwind http://docs.mongodb.org/manual/reference/aggregation/unwind/
361 * @param {String} fields the field(s) to unwind
362 * @return {Aggregate}
363 * @api public
364 */
365
366Aggregate.prototype.unwind = function() {
367 const args = utils.args(arguments);
368
369 const res = [];
370 for (let i = 0; i < args.length; ++i) {
371 const arg = args[i];
372 if (arg && typeof arg === 'object') {
373 res.push({ $unwind: arg });
374 } else if (typeof arg === 'string') {
375 res.push({
376 $unwind: (arg && arg.charAt(0) === '$') ? arg : '$' + arg
377 });
378 } else {
379 throw new Error('Invalid arg "' + arg + '" to unwind(), ' +
380 'must be string or object');
381 }
382 }
383
384 return this.append.apply(this, res);
385};
386
387/**
388 * Appends a new $replaceRoot operator to this aggregate pipeline.
389 *
390 * Note that the `$replaceRoot` operator requires field strings to start with '$'.
391 * If you are passing in a string Mongoose will prepend '$' if the specified field doesn't start '$'.
392 * If you are passing in an object the strings in your expression will not be altered.
393 *
394 * ####Examples:
395 *
396 * aggregate.replaceRoot("user");
397 *
398 * aggregate.replaceRoot({ x: { $concat: ['$this', '$that'] } });
399 *
400 * @see $replaceRoot https://docs.mongodb.org/manual/reference/operator/aggregation/replaceRoot
401 * @param {String|Object} the field or document which will become the new root document
402 * @return {Aggregate}
403 * @api public
404 */
405
406Aggregate.prototype.replaceRoot = function(newRoot) {
407 let ret;
408
409 if (typeof newRoot === 'string') {
410 ret = newRoot.startsWith('$') ? newRoot : '$' + newRoot;
411 } else {
412 ret = newRoot;
413 }
414
415 return this.append({
416 $replaceRoot: {
417 newRoot: ret
418 }
419 });
420};
421
422/**
423 * Appends a new $count operator to this aggregate pipeline.
424 *
425 * ####Examples:
426 *
427 * aggregate.count("userCount");
428 *
429 * @see $count https://docs.mongodb.org/manual/reference/operator/aggregation/count
430 * @param {String} the name of the count field
431 * @return {Aggregate}
432 * @api public
433 */
434
435Aggregate.prototype.count = function(countName) {
436 return this.append({ $count: countName });
437};
438
439/**
440 * Appends a new $sortByCount operator to this aggregate pipeline. Accepts either a string field name
441 * or a pipeline object.
442 *
443 * Note that the `$sortByCount` operator requires the new root to start with '$'.
444 * Mongoose will prepend '$' if the specified field name doesn't start with '$'.
445 *
446 * ####Examples:
447 *
448 * aggregate.sortByCount('users');
449 * aggregate.sortByCount({ $mergeObjects: [ "$employee", "$business" ] })
450 *
451 * @see $sortByCount https://docs.mongodb.com/manual/reference/operator/aggregation/sortByCount/
452 * @param {Object|String} arg
453 * @return {Aggregate} this
454 * @api public
455 */
456
457Aggregate.prototype.sortByCount = function(arg) {
458 if (arg && typeof arg === 'object') {
459 return this.append({ $sortByCount: arg });
460 } else if (typeof arg === 'string') {
461 return this.append({
462 $sortByCount: (arg && arg.charAt(0) === '$') ? arg : '$' + arg
463 });
464 } else {
465 throw new TypeError('Invalid arg "' + arg + '" to sortByCount(), ' +
466 'must be string or object');
467 }
468};
469
470/**
471 * Appends new custom $lookup operator(s) to this aggregate pipeline.
472 *
473 * ####Examples:
474 *
475 * aggregate.lookup({ from: 'users', localField: 'userId', foreignField: '_id', as: 'users' });
476 *
477 * @see $lookup https://docs.mongodb.org/manual/reference/operator/aggregation/lookup/#pipe._S_lookup
478 * @param {Object} options to $lookup as described in the above link
479 * @return {Aggregate}
480 * @api public
481 */
482
483Aggregate.prototype.lookup = function(options) {
484 return this.append({$lookup: options});
485};
486
487/**
488 * Appends new custom $graphLookup operator(s) to this aggregate pipeline, performing a recursive search on a collection.
489 *
490 * Note that graphLookup can only consume at most 100MB of memory, and does not allow disk use even if `{ allowDiskUse: true }` is specified.
491 *
492 * #### Examples:
493 * // Suppose we have a collection of courses, where a document might look like `{ _id: 0, name: 'Calculus', prerequisite: 'Trigonometry'}` and `{ _id: 0, name: 'Trigonometry', prerequisite: 'Algebra' }`
494 * aggregate.graphLookup({ from: 'courses', startWith: '$prerequisite', connectFromField: 'prerequisite', connectToField: 'name', as: 'prerequisites', maxDepth: 3 }) // this will recursively search the 'courses' collection up to 3 prerequisites
495 *
496 * @see $graphLookup https://docs.mongodb.com/manual/reference/operator/aggregation/graphLookup/#pipe._S_graphLookup
497 * @param {Object} options to $graphLookup as described in the above link
498 * @return {Aggregate}
499 * @api public
500 */
501
502Aggregate.prototype.graphLookup = function(options) {
503 const cloneOptions = {};
504 if (options) {
505 if (!utils.isObject(options)) {
506 throw new TypeError('Invalid graphLookup() argument. Must be an object.');
507 }
508
509 utils.mergeClone(cloneOptions, options);
510 const startWith = cloneOptions.startWith;
511
512 if (startWith && typeof startWith === 'string') {
513 cloneOptions.startWith = cloneOptions.startWith.charAt(0) === '$' ?
514 cloneOptions.startWith :
515 '$' + cloneOptions.startWith;
516 }
517
518 }
519 return this.append({ $graphLookup: cloneOptions });
520};
521
522/**
523 * Appends new custom $sample operator(s) to this aggregate pipeline.
524 *
525 * ####Examples:
526 *
527 * aggregate.sample(3); // Add a pipeline that picks 3 random documents
528 *
529 * @see $sample https://docs.mongodb.org/manual/reference/operator/aggregation/sample/#pipe._S_sample
530 * @param {Number} size number of random documents to pick
531 * @return {Aggregate}
532 * @api public
533 */
534
535Aggregate.prototype.sample = function(size) {
536 return this.append({$sample: {size: size}});
537};
538
539/**
540 * Appends a new $sort operator to this aggregate pipeline.
541 *
542 * If an object is passed, values allowed are `asc`, `desc`, `ascending`, `descending`, `1`, and `-1`.
543 *
544 * If a string is passed, it must be a space delimited list of path names. The sort order of each path is ascending unless the path name is prefixed with `-` which will be treated as descending.
545 *
546 * ####Examples:
547 *
548 * // these are equivalent
549 * aggregate.sort({ field: 'asc', test: -1 });
550 * aggregate.sort('field -test');
551 *
552 * @see $sort http://docs.mongodb.org/manual/reference/aggregation/sort/
553 * @param {Object|String} arg
554 * @return {Aggregate} this
555 * @api public
556 */
557
558Aggregate.prototype.sort = function(arg) {
559 // TODO refactor to reuse the query builder logic
560
561 const sort = {};
562
563 if (arg.constructor.name === 'Object') {
564 const desc = ['desc', 'descending', -1];
565 Object.keys(arg).forEach(function(field) {
566 // If sorting by text score, skip coercing into 1/-1
567 if (arg[field] instanceof Object && arg[field].$meta) {
568 sort[field] = arg[field];
569 return;
570 }
571 sort[field] = desc.indexOf(arg[field]) === -1 ? 1 : -1;
572 });
573 } else if (arguments.length === 1 && typeof arg === 'string') {
574 arg.split(/\s+/).forEach(function(field) {
575 if (!field) {
576 return;
577 }
578 const ascend = field[0] === '-' ? -1 : 1;
579 if (ascend === -1) {
580 field = field.substring(1);
581 }
582 sort[field] = ascend;
583 });
584 } else {
585 throw new TypeError('Invalid sort() argument. Must be a string or object.');
586 }
587
588 return this.append({$sort: sort});
589};
590
591/**
592 * Sets the readPreference option for the aggregation query.
593 *
594 * ####Example:
595 *
596 * Model.aggregate(..).read('primaryPreferred').exec(callback)
597 *
598 * @param {String} pref one of the listed preference options or their aliases
599 * @param {Array} [tags] optional tags for this query
600 * @return {Aggregate} this
601 * @api public
602 * @see mongodb http://docs.mongodb.org/manual/applications/replication/#read-preference
603 * @see driver http://mongodb.github.com/node-mongodb-native/driver-articles/anintroductionto1_1and2_2.html#read-preferences
604 */
605
606Aggregate.prototype.read = function(pref, tags) {
607 if (!this.options) {
608 this.options = {};
609 }
610 read.call(this, pref, tags);
611 return this;
612};
613
614/**
615 * Sets the readConcern level for the aggregation query.
616 *
617 * ####Example:
618 *
619 * Model.aggregate(..).readConcern('majority').exec(callback)
620 *
621 * @param {String} level one of the listed read concern level or their aliases
622 * @see mongodb https://docs.mongodb.com/manual/reference/read-concern/
623 * @return {Aggregate} this
624 * @api public
625 */
626
627Aggregate.prototype.readConcern = function(level) {
628 if (!this.options) {
629 this.options = {};
630 }
631 readConcern.call(this, level);
632 return this;
633};
634
635/**
636 * Appends a new $redact operator to this aggregate pipeline.
637 *
638 * If 3 arguments are supplied, Mongoose will wrap them with if-then-else of $cond operator respectively
639 * If `thenExpr` or `elseExpr` is string, make sure it starts with $$, like `$$DESCEND`, `$$PRUNE` or `$$KEEP`.
640 *
641 * ####Example:
642 *
643 * Model.aggregate(...)
644 * .redact({
645 * $cond: {
646 * if: { $eq: [ '$level', 5 ] },
647 * then: '$$PRUNE',
648 * else: '$$DESCEND'
649 * }
650 * })
651 * .exec();
652 *
653 * // $redact often comes with $cond operator, you can also use the following syntax provided by mongoose
654 * Model.aggregate(...)
655 * .redact({ $eq: [ '$level', 5 ] }, '$$PRUNE', '$$DESCEND')
656 * .exec();
657 *
658 * @param {Object} expression redact options or conditional expression
659 * @param {String|Object} [thenExpr] true case for the condition
660 * @param {String|Object} [elseExpr] false case for the condition
661 * @return {Aggregate} this
662 * @see $redact https://docs.mongodb.com/manual/reference/operator/aggregation/redact/
663 * @api public
664 */
665
666Aggregate.prototype.redact = function(expression, thenExpr, elseExpr) {
667 if (arguments.length === 3) {
668 if ((typeof thenExpr === 'string' && !thenExpr.startsWith('$$')) ||
669 (typeof elseExpr === 'string' && !elseExpr.startsWith('$$'))) {
670 throw new Error('If thenExpr or elseExpr is string, it must start with $$. e.g. $$DESCEND, $$PRUNE, $$KEEP');
671 }
672
673 expression = {
674 $cond: {
675 if: expression,
676 then: thenExpr,
677 else: elseExpr
678 }
679 };
680 } else if (arguments.length !== 1) {
681 throw new TypeError('Invalid arguments');
682 }
683
684 return this.append({$redact: expression});
685};
686
687/**
688 * Execute the aggregation with explain
689 *
690 * ####Example:
691 *
692 * Model.aggregate(..).explain(callback)
693 *
694 * @param {Function} callback
695 * @return {Promise}
696 */
697
698Aggregate.prototype.explain = function(callback) {
699 return utils.promiseOrCallback(callback, cb => {
700 if (!this._pipeline.length) {
701 const err = new Error('Aggregate has empty pipeline');
702 return cb(err);
703 }
704
705 prepareDiscriminatorPipeline(this);
706
707 this._model.collection.
708 aggregate(this._pipeline, this.options || {}).
709 explain(function(error, result) {
710 if (error) {
711 return cb(error);
712 }
713 cb(null, result);
714 });
715 }, this._model.events);
716};
717
718/**
719 * Sets the allowDiskUse option for the aggregation query (ignored for < 2.6.0)
720 *
721 * ####Example:
722 *
723 * await Model.aggregate([{ $match: { foo: 'bar' } }]).allowDiskUse(true);
724 *
725 * @param {Boolean} value Should tell server it can use hard drive to store data during aggregation.
726 * @param {Array} [tags] optional tags for this query
727 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
728 */
729
730Aggregate.prototype.allowDiskUse = function(value) {
731 this.options.allowDiskUse = value;
732 return this;
733};
734
735/**
736 * Sets the hint option for the aggregation query (ignored for < 3.6.0)
737 *
738 * ####Example:
739 *
740 * Model.aggregate(..).hint({ qty: 1, category: 1 }).exec(callback)
741 *
742 * @param {Object|String} value a hint object or the index name
743 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
744 */
745
746Aggregate.prototype.hint = function(value) {
747 this.options.hint = value;
748 return this;
749};
750
751/**
752 * Sets the session for this aggregation. Useful for [transactions](/docs/transactions.html).
753 *
754 * ####Example:
755 *
756 * const session = await Model.startSession();
757 * await Model.aggregate(..).session(session);
758 *
759 * @param {ClientSession} session
760 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
761 */
762
763Aggregate.prototype.session = function(session) {
764 if (session == null) {
765 delete this.options.session;
766 } else {
767 this.options.session = session;
768 }
769 return this;
770};
771
772/**
773 * Lets you set arbitrary options, for middleware or plugins.
774 *
775 * ####Example:
776 *
777 * var agg = Model.aggregate(..).option({ allowDiskUse: true }); // Set the `allowDiskUse` option
778 * agg.options; // `{ allowDiskUse: true }`
779 *
780 * @param {Object} options keys to merge into current options
781 * @param [options.maxTimeMS] number limits the time this aggregation will run, see [MongoDB docs on `maxTimeMS`](https://docs.mongodb.com/manual/reference/operator/meta/maxTimeMS/)
782 * @param [options.allowDiskUse] boolean if true, the MongoDB server will use the hard drive to store data during this aggregation
783 * @param [options.collation] object see [`Aggregate.prototype.collation()`](./docs/api.html#aggregate_Aggregate-collation)
784 * @param [options.session] ClientSession see [`Aggregate.prototype.session()`](./docs/api.html#aggregate_Aggregate-session)
785 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
786 * @return {Aggregate} this
787 * @api public
788 */
789
790Aggregate.prototype.option = function(value) {
791 for (const key in value) {
792 this.options[key] = value[key];
793 }
794 return this;
795};
796
797/**
798 * Sets the cursor option option for the aggregation query (ignored for < 2.6.0).
799 * Note the different syntax below: .exec() returns a cursor object, and no callback
800 * is necessary.
801 *
802 * ####Example:
803 *
804 * var cursor = Model.aggregate(..).cursor({ batchSize: 1000 }).exec();
805 * cursor.eachAsync(function(error, doc) {
806 * // use doc
807 * });
808 *
809 * @param {Object} options
810 * @param {Number} options.batchSize set the cursor batch size
811 * @param {Boolean} [options.useMongooseAggCursor] use experimental mongoose-specific aggregation cursor (for `eachAsync()` and other query cursor semantics)
812 * @return {Aggregate} this
813 * @api public
814 * @see mongodb http://mongodb.github.io/node-mongodb-native/2.0/api/AggregationCursor.html
815 */
816
817Aggregate.prototype.cursor = function(options) {
818 if (!this.options) {
819 this.options = {};
820 }
821 this.options.cursor = options || {};
822 return this;
823};
824
825/**
826 * Sets an option on this aggregation. This function will be deprecated in a
827 * future release. Use the [`cursor()`](./api.html#aggregate_Aggregate-cursor),
828 * [`collation()`](./api.html#aggregate_Aggregate-collation), etc. helpers to
829 * set individual options, or access `agg.options` directly.
830 *
831 * Note that MongoDB aggregations [do **not** support the `noCursorTimeout` flag](https://jira.mongodb.org/browse/SERVER-6036),
832 * if you try setting that flag with this function you will get a "unrecognized field 'noCursorTimeout'" error.
833 *
834 * @param {String} flag
835 * @param {Boolean} value
836 * @return {Aggregate} this
837 * @api public
838 * @deprecated Use [`.option()`](api.html#aggregate_Aggregate-option) instead. Note that MongoDB aggregations do **not** support a `noCursorTimeout` option.
839 */
840
841Aggregate.prototype.addCursorFlag = util.deprecate(function(flag, value) {
842 if (!this.options) {
843 this.options = {};
844 }
845 this.options[flag] = value;
846 return this;
847}, 'Mongoose: `Aggregate#addCursorFlag()` is deprecated, use `option()` instead');
848
849/**
850 * Adds a collation
851 *
852 * ####Example:
853 *
854 * Model.aggregate(..).collation({ locale: 'en_US', strength: 1 }).exec();
855 *
856 * @param {Object} collation options
857 * @return {Aggregate} this
858 * @api public
859 * @see mongodb http://mongodb.github.io/node-mongodb-native/2.2/api/Collection.html#aggregate
860 */
861
862Aggregate.prototype.collation = function(collation) {
863 if (!this.options) {
864 this.options = {};
865 }
866 this.options.collation = collation;
867 return this;
868};
869
870/**
871 * Combines multiple aggregation pipelines.
872 *
873 * ####Example:
874 *
875 * Model.aggregate(...)
876 * .facet({
877 * books: [{ groupBy: '$author' }],
878 * price: [{ $bucketAuto: { groupBy: '$price', buckets: 2 } }]
879 * })
880 * .exec();
881 *
882 * // Output: { books: [...], price: [{...}, {...}] }
883 *
884 * @param {Object} facet options
885 * @return {Aggregate} this
886 * @see $facet https://docs.mongodb.com/v3.4/reference/operator/aggregation/facet/
887 * @api public
888 */
889
890Aggregate.prototype.facet = function(options) {
891 return this.append({$facet: options});
892};
893
894/**
895 * Returns the current pipeline
896 *
897 * ####Example:
898 *
899 * MyModel.aggregate().match({ test: 1 }).pipeline(); // [{ $match: { test: 1 } }]
900 *
901 * @return {Array}
902 * @api public
903 */
904
905
906Aggregate.prototype.pipeline = function() {
907 return this._pipeline;
908};
909
910/**
911 * Executes the aggregate pipeline on the currently bound Model.
912 *
913 * ####Example:
914 *
915 * aggregate.exec(callback);
916 *
917 * // Because a promise is returned, the `callback` is optional.
918 * var promise = aggregate.exec();
919 * promise.then(..);
920 *
921 * @see Promise #promise_Promise
922 * @param {Function} [callback]
923 * @return {Promise}
924 * @api public
925 */
926
927Aggregate.prototype.exec = function(callback) {
928 if (!this._model) {
929 throw new Error('Aggregate not bound to any Model');
930 }
931 const model = this._model;
932 const pipeline = this._pipeline;
933 const collection = this._model.collection;
934
935 if (this.options && this.options.cursor) {
936 return new AggregationCursor(this);
937 }
938
939 return utils.promiseOrCallback(callback, cb => {
940 if (!pipeline.length) {
941 const err = new Error('Aggregate has empty pipeline');
942 return cb(err);
943 }
944
945 prepareDiscriminatorPipeline(this);
946
947 model.hooks.execPre('aggregate', this, error => {
948 if (error) {
949 const _opts = { error: error };
950 return model.hooks.execPost('aggregate', this, [null], _opts, error => {
951 cb(error);
952 });
953 }
954
955 const options = utils.clone(this.options || {});
956 collection.aggregate(pipeline, options, (error, cursor) => {
957 if (error) {
958 const _opts = { error: error };
959 return model.hooks.execPost('aggregate', this, [null], _opts, error => {
960 if (error) {
961 return cb(error);
962 }
963 return cb(null);
964 });
965 }
966 cursor.toArray((error, result) => {
967 const _opts = { error: error };
968 model.hooks.execPost('aggregate', this, [result], _opts, (error, result) => {
969 if (error) {
970 return cb(error);
971 }
972
973 cb(null, result);
974 });
975 });
976 });
977 });
978 }, model.events);
979};
980
981/**
982 * Provides promise for aggregate.
983 *
984 * ####Example:
985 *
986 * Model.aggregate(..).then(successCallback, errorCallback);
987 *
988 * @see Promise #promise_Promise
989 * @param {Function} [resolve] successCallback
990 * @param {Function} [reject] errorCallback
991 * @return {Promise}
992 */
993Aggregate.prototype.then = function(resolve, reject) {
994 return this.exec().then(resolve, reject);
995};
996
997/**
998 * Executes the query returning a `Promise` which will be
999 * resolved with either the doc(s) or rejected with the error.
1000 * Like [`.then()`](#query_Query-then), but only takes a rejection handler.
1001 *
1002 * @param {Function} [reject]
1003 * @return {Promise}
1004 * @api public
1005 */
1006
1007Aggregate.prototype.catch = function(reject) {
1008 return this.exec().then(null, reject);
1009};
1010
1011/**
1012 * Returns an asyncIterator for use with [`for/await/of` loops](http://bit.ly/async-iterators)
1013 * This function *only* works for `find()` queries.
1014 * You do not need to call this function explicitly, the JavaScript runtime
1015 * will call it for you.
1016 *
1017 * ####Example
1018 *
1019 * for await (const doc of Model.find().sort({ name: 1 })) {
1020 * console.log(doc.name);
1021 * }
1022 *
1023 * Node.js 10.x supports async iterators natively without any flags. You can
1024 * enable async iterators in Node.js 8.x using the [`--harmony_async_iteration` flag](https://github.com/tc39/proposal-async-iteration/issues/117#issuecomment-346695187).
1025 *
1026 * **Note:** This function is not if `Symbol.asyncIterator` is undefined. If
1027 * `Symbol.asyncIterator` is undefined, that means your Node.js version does not
1028 * support async iterators.
1029 *
1030 * @method Symbol.asyncIterator
1031 * @memberOf Aggregate
1032 * @instance
1033 * @api public
1034 */
1035
1036if (Symbol.asyncIterator != null) {
1037 Aggregate.prototype[Symbol.asyncIterator] = function() {
1038 return this.cursor({ useMongooseAggCursor: true }).
1039 exec().
1040 transformNull().
1041 map(doc => {
1042 return doc == null ? { done: true } : { value: doc, done: false };
1043 });
1044 };
1045}
1046
1047/*!
1048 * Helpers
1049 */
1050
1051/**
1052 * Checks whether an object is likely a pipeline operator
1053 *
1054 * @param {Object} obj object to check
1055 * @return {Boolean}
1056 * @api private
1057 */
1058
1059function isOperator(obj) {
1060 if (typeof obj !== 'object') {
1061 return false;
1062 }
1063
1064 const k = Object.keys(obj);
1065
1066 return k.length === 1 && k.some(key => { return key[0] === '$'; });
1067}
1068
1069/*!
1070 * Adds the appropriate `$match` pipeline step to the top of an aggregate's
1071 * pipeline, should it's model is a non-root discriminator type. This is
1072 * analogous to the `prepareDiscriminatorCriteria` function in `lib/query.js`.
1073 *
1074 * @param {Aggregate} aggregate Aggregate to prepare
1075 */
1076
1077Aggregate._prepareDiscriminatorPipeline = prepareDiscriminatorPipeline;
1078
1079function prepareDiscriminatorPipeline(aggregate) {
1080 const schema = aggregate._model.schema;
1081 const discriminatorMapping = schema && schema.discriminatorMapping;
1082
1083 if (discriminatorMapping && !discriminatorMapping.isRoot) {
1084 const originalPipeline = aggregate._pipeline;
1085 const discriminatorKey = discriminatorMapping.key;
1086 const discriminatorValue = discriminatorMapping.value;
1087
1088 // If the first pipeline stage is a match and it doesn't specify a `__t`
1089 // key, add the discriminator key to it. This allows for potential
1090 // aggregation query optimizations not to be disturbed by this feature.
1091 if (originalPipeline[0] && originalPipeline[0].$match && !originalPipeline[0].$match[discriminatorKey]) {
1092 originalPipeline[0].$match[discriminatorKey] = discriminatorValue;
1093 // `originalPipeline` is a ref, so there's no need for
1094 // aggregate._pipeline = originalPipeline
1095 } else if (originalPipeline[0] && originalPipeline[0].$geoNear) {
1096 originalPipeline[0].$geoNear.query =
1097 originalPipeline[0].$geoNear.query || {};
1098 originalPipeline[0].$geoNear.query[discriminatorKey] = discriminatorValue;
1099 } else {
1100 const match = {};
1101 match[discriminatorKey] = discriminatorValue;
1102 aggregate._pipeline.unshift({ $match: match });
1103 }
1104 }
1105}
1106
1107/*!
1108 * Exports
1109 */
1110
1111module.exports = Aggregate;