UNPKG

32.3 kBJavaScriptView Raw
1'use strict';
2
3/*!
4 * Module dependencies
5 */
6
7const AggregationCursor = require('./cursor/AggregationCursor');
8const Query = require('./query');
9const applyGlobalMaxTimeMS = require('./helpers/query/applyGlobalMaxTimeMS');
10const util = require('util');
11const utils = require('./utils');
12const read = Query.prototype.read;
13const readConcern = Query.prototype.readConcern;
14
15/**
16 * Aggregate constructor used for building aggregation pipelines. Do not
17 * instantiate this class directly, use [Model.aggregate()](/docs/api.html#model_Model.aggregate) instead.
18 *
19 * ####Example:
20 *
21 * const aggregate = Model.aggregate([
22 * { $project: { a: 1, b: 1 } },
23 * { $skip: 5 }
24 * ]);
25 *
26 * Model.
27 * aggregate([{ $match: { age: { $gte: 21 }}}]).
28 * unwind('tags').
29 * exec(callback);
30 *
31 * ####Note:
32 *
33 * - The documents returned are plain javascript objects, not mongoose documents (since any shape of document can be returned).
34 * - Mongoose does **not** cast pipeline stages. The below will **not** work unless `_id` is a string in the database
35 *
36 * ```javascript
37 * new Aggregate([{ $match: { _id: '00000000000000000000000a' } }]);
38 * // Do this instead to cast to an ObjectId
39 * new Aggregate([{ $match: { _id: mongoose.Types.ObjectId('00000000000000000000000a') } }]);
40 * ```
41 *
42 * @see MongoDB http://docs.mongodb.org/manual/applications/aggregation/
43 * @see driver http://mongodb.github.com/node-mongodb-native/api-generated/collection.html#aggregate
44 * @param {Array} [pipeline] aggregation pipeline as an array of objects
45 * @api public
46 */
47
48function Aggregate(pipeline) {
49 this._pipeline = [];
50 this._model = undefined;
51 this.options = {};
52
53 if (arguments.length === 1 && util.isArray(pipeline)) {
54 this.append.apply(this, pipeline);
55 }
56}
57
58/**
59 * Contains options passed down to the [aggregate command](https://docs.mongodb.com/manual/reference/command/aggregate/).
60 * Supported options are:
61 *
62 * - `readPreference`
63 * - [`cursor`](./api.html#aggregate_Aggregate-cursor)
64 * - [`explain`](./api.html#aggregate_Aggregate-explain)
65 * - [`allowDiskUse`](./api.html#aggregate_Aggregate-allowDiskUse)
66 * - `maxTimeMS`
67 * - `bypassDocumentValidation`
68 * - `raw`
69 * - `promoteLongs`
70 * - `promoteValues`
71 * - `promoteBuffers`
72 * - [`collation`](./api.html#aggregate_Aggregate-collation)
73 * - `comment`
74 * - [`session`](./api.html#aggregate_Aggregate-session)
75 *
76 * @property options
77 * @memberOf Aggregate
78 * @api public
79 */
80
81Aggregate.prototype.options;
82
83/**
84 * Get/set the model that this aggregation will execute on.
85 *
86 * ####Example:
87 * const aggregate = MyModel.aggregate([{ $match: { answer: 42 } }]);
88 * aggregate.model() === MyModel; // true
89 *
90 * // Change the model. There's rarely any reason to do this.
91 * aggregate.model(SomeOtherModel);
92 * aggregate.model() === SomeOtherModel; // true
93 *
94 * @param {Model} [model] the model to which the aggregate is to be bound
95 * @return {Aggregate|Model} if model is passed, will return `this`, otherwise will return the model
96 * @api public
97 */
98
99Aggregate.prototype.model = function(model) {
100 if (arguments.length === 0) {
101 return this._model;
102 }
103
104 this._model = model;
105 if (model.schema != null) {
106 if (this.options.readPreference == null &&
107 model.schema.options.read != null) {
108 this.options.readPreference = model.schema.options.read;
109 }
110 if (this.options.collation == null &&
111 model.schema.options.collation != null) {
112 this.options.collation = model.schema.options.collation;
113 }
114 }
115 return this;
116};
117
118/**
119 * Appends new operators to this aggregate pipeline
120 *
121 * ####Examples:
122 *
123 * aggregate.append({ $project: { field: 1 }}, { $limit: 2 });
124 *
125 * // or pass an array
126 * var pipeline = [{ $match: { daw: 'Logic Audio X' }} ];
127 * aggregate.append(pipeline);
128 *
129 * @param {Object} ops operator(s) to append
130 * @return {Aggregate}
131 * @api public
132 */
133
134Aggregate.prototype.append = function() {
135 const args = (arguments.length === 1 && util.isArray(arguments[0]))
136 ? arguments[0]
137 : utils.args(arguments);
138
139 if (!args.every(isOperator)) {
140 throw new Error('Arguments must be aggregate pipeline operators');
141 }
142
143 this._pipeline = this._pipeline.concat(args);
144
145 return this;
146};
147
148/**
149 * Appends a new $addFields operator to this aggregate pipeline.
150 * Requires MongoDB v3.4+ to work
151 *
152 * ####Examples:
153 *
154 * // adding new fields based on existing fields
155 * aggregate.addFields({
156 * newField: '$b.nested'
157 * , plusTen: { $add: ['$val', 10]}
158 * , sub: {
159 * name: '$a'
160 * }
161 * })
162 *
163 * // etc
164 * aggregate.addFields({ salary_k: { $divide: [ "$salary", 1000 ] } });
165 *
166 * @param {Object} arg field specification
167 * @see $addFields https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/
168 * @return {Aggregate}
169 * @api public
170 */
171Aggregate.prototype.addFields = function(arg) {
172 const fields = {};
173 if (typeof arg === 'object' && !util.isArray(arg)) {
174 Object.keys(arg).forEach(function(field) {
175 fields[field] = arg[field];
176 });
177 } else {
178 throw new Error('Invalid addFields() argument. Must be an object');
179 }
180 return this.append({$addFields: fields});
181};
182
183/**
184 * Appends a new $project operator to this aggregate pipeline.
185 *
186 * Mongoose query [selection syntax](#query_Query-select) is also supported.
187 *
188 * ####Examples:
189 *
190 * // include a, include b, exclude _id
191 * aggregate.project("a b -_id");
192 *
193 * // or you may use object notation, useful when
194 * // you have keys already prefixed with a "-"
195 * aggregate.project({a: 1, b: 1, _id: 0});
196 *
197 * // reshaping documents
198 * aggregate.project({
199 * newField: '$b.nested'
200 * , plusTen: { $add: ['$val', 10]}
201 * , sub: {
202 * name: '$a'
203 * }
204 * })
205 *
206 * // etc
207 * aggregate.project({ salary_k: { $divide: [ "$salary", 1000 ] } });
208 *
209 * @param {Object|String} arg field specification
210 * @see projection http://docs.mongodb.org/manual/reference/aggregation/project/
211 * @return {Aggregate}
212 * @api public
213 */
214
215Aggregate.prototype.project = function(arg) {
216 const fields = {};
217
218 if (typeof arg === 'object' && !util.isArray(arg)) {
219 Object.keys(arg).forEach(function(field) {
220 fields[field] = arg[field];
221 });
222 } else if (arguments.length === 1 && typeof arg === 'string') {
223 arg.split(/\s+/).forEach(function(field) {
224 if (!field) {
225 return;
226 }
227 const include = field[0] === '-' ? 0 : 1;
228 if (include === 0) {
229 field = field.substring(1);
230 }
231 fields[field] = include;
232 });
233 } else {
234 throw new Error('Invalid project() argument. Must be string or object');
235 }
236
237 return this.append({$project: fields});
238};
239
240/**
241 * Appends a new custom $group operator to this aggregate pipeline.
242 *
243 * ####Examples:
244 *
245 * aggregate.group({ _id: "$department" });
246 *
247 * @see $group http://docs.mongodb.org/manual/reference/aggregation/group/
248 * @method group
249 * @memberOf Aggregate
250 * @instance
251 * @param {Object} arg $group operator contents
252 * @return {Aggregate}
253 * @api public
254 */
255
256/**
257 * Appends a new custom $match operator to this aggregate pipeline.
258 *
259 * ####Examples:
260 *
261 * aggregate.match({ department: { $in: [ "sales", "engineering" ] } });
262 *
263 * @see $match http://docs.mongodb.org/manual/reference/aggregation/match/
264 * @method match
265 * @memberOf Aggregate
266 * @instance
267 * @param {Object} arg $match operator contents
268 * @return {Aggregate}
269 * @api public
270 */
271
272/**
273 * Appends a new $skip operator to this aggregate pipeline.
274 *
275 * ####Examples:
276 *
277 * aggregate.skip(10);
278 *
279 * @see $skip http://docs.mongodb.org/manual/reference/aggregation/skip/
280 * @method skip
281 * @memberOf Aggregate
282 * @instance
283 * @param {Number} num number of records to skip before next stage
284 * @return {Aggregate}
285 * @api public
286 */
287
288/**
289 * Appends a new $limit operator to this aggregate pipeline.
290 *
291 * ####Examples:
292 *
293 * aggregate.limit(10);
294 *
295 * @see $limit http://docs.mongodb.org/manual/reference/aggregation/limit/
296 * @method limit
297 * @memberOf Aggregate
298 * @instance
299 * @param {Number} num maximum number of records to pass to the next stage
300 * @return {Aggregate}
301 * @api public
302 */
303
304/**
305 * Appends a new $geoNear operator to this aggregate pipeline.
306 *
307 * ####NOTE:
308 *
309 * **MUST** be used as the first operator in the pipeline.
310 *
311 * ####Examples:
312 *
313 * aggregate.near({
314 * near: [40.724, -73.997],
315 * distanceField: "dist.calculated", // required
316 * maxDistance: 0.008,
317 * query: { type: "public" },
318 * includeLocs: "dist.location",
319 * uniqueDocs: true,
320 * num: 5
321 * });
322 *
323 * @see $geoNear http://docs.mongodb.org/manual/reference/aggregation/geoNear/
324 * @method near
325 * @memberOf Aggregate
326 * @instance
327 * @param {Object} arg
328 * @return {Aggregate}
329 * @api public
330 */
331
332Aggregate.prototype.near = function(arg) {
333 const op = {};
334 op.$geoNear = arg;
335 return this.append(op);
336};
337
338/*!
339 * define methods
340 */
341
342'group match skip limit out'.split(' ').forEach(function($operator) {
343 Aggregate.prototype[$operator] = function(arg) {
344 const op = {};
345 op['$' + $operator] = arg;
346 return this.append(op);
347 };
348});
349
350/**
351 * Appends new custom $unwind operator(s) to this aggregate pipeline.
352 *
353 * Note that the `$unwind` operator requires the path name to start with '$'.
354 * Mongoose will prepend '$' if the specified field doesn't start '$'.
355 *
356 * ####Examples:
357 *
358 * aggregate.unwind("tags");
359 * aggregate.unwind("a", "b", "c");
360 *
361 * @see $unwind http://docs.mongodb.org/manual/reference/aggregation/unwind/
362 * @param {String} fields the field(s) to unwind
363 * @return {Aggregate}
364 * @api public
365 */
366
367Aggregate.prototype.unwind = function() {
368 const args = utils.args(arguments);
369
370 const res = [];
371 for (let i = 0; i < args.length; ++i) {
372 const arg = args[i];
373 if (arg && typeof arg === 'object') {
374 res.push({ $unwind: arg });
375 } else if (typeof arg === 'string') {
376 res.push({
377 $unwind: (arg && arg.startsWith('$')) ? arg : '$' + arg
378 });
379 } else {
380 throw new Error('Invalid arg "' + arg + '" to unwind(), ' +
381 'must be string or object');
382 }
383 }
384
385 return this.append.apply(this, res);
386};
387
388/**
389 * Appends a new $replaceRoot operator to this aggregate pipeline.
390 *
391 * Note that the `$replaceRoot` operator requires field strings to start with '$'.
392 * If you are passing in a string Mongoose will prepend '$' if the specified field doesn't start '$'.
393 * If you are passing in an object the strings in your expression will not be altered.
394 *
395 * ####Examples:
396 *
397 * aggregate.replaceRoot("user");
398 *
399 * aggregate.replaceRoot({ x: { $concat: ['$this', '$that'] } });
400 *
401 * @see $replaceRoot https://docs.mongodb.org/manual/reference/operator/aggregation/replaceRoot
402 * @param {String|Object} the field or document which will become the new root document
403 * @return {Aggregate}
404 * @api public
405 */
406
407Aggregate.prototype.replaceRoot = function(newRoot) {
408 let ret;
409
410 if (typeof newRoot === 'string') {
411 ret = newRoot.startsWith('$') ? newRoot : '$' + newRoot;
412 } else {
413 ret = newRoot;
414 }
415
416 return this.append({
417 $replaceRoot: {
418 newRoot: ret
419 }
420 });
421};
422
423/**
424 * Appends a new $count operator to this aggregate pipeline.
425 *
426 * ####Examples:
427 *
428 * aggregate.count("userCount");
429 *
430 * @see $count https://docs.mongodb.org/manual/reference/operator/aggregation/count
431 * @param {String} the name of the count field
432 * @return {Aggregate}
433 * @api public
434 */
435
436Aggregate.prototype.count = function(countName) {
437 return this.append({ $count: countName });
438};
439
440/**
441 * Appends a new $sortByCount operator to this aggregate pipeline. Accepts either a string field name
442 * or a pipeline object.
443 *
444 * Note that the `$sortByCount` operator requires the new root to start with '$'.
445 * Mongoose will prepend '$' if the specified field name doesn't start with '$'.
446 *
447 * ####Examples:
448 *
449 * aggregate.sortByCount('users');
450 * aggregate.sortByCount({ $mergeObjects: [ "$employee", "$business" ] })
451 *
452 * @see $sortByCount https://docs.mongodb.com/manual/reference/operator/aggregation/sortByCount/
453 * @param {Object|String} arg
454 * @return {Aggregate} this
455 * @api public
456 */
457
458Aggregate.prototype.sortByCount = function(arg) {
459 if (arg && typeof arg === 'object') {
460 return this.append({ $sortByCount: arg });
461 } else if (typeof arg === 'string') {
462 return this.append({
463 $sortByCount: (arg && arg.startsWith('$')) ? arg : '$' + arg
464 });
465 } else {
466 throw new TypeError('Invalid arg "' + arg + '" to sortByCount(), ' +
467 'must be string or object');
468 }
469};
470
471/**
472 * Appends new custom $lookup operator(s) to this aggregate pipeline.
473 *
474 * ####Examples:
475 *
476 * aggregate.lookup({ from: 'users', localField: 'userId', foreignField: '_id', as: 'users' });
477 *
478 * @see $lookup https://docs.mongodb.org/manual/reference/operator/aggregation/lookup/#pipe._S_lookup
479 * @param {Object} options to $lookup as described in the above link
480 * @return {Aggregate}
481 * @api public
482 */
483
484Aggregate.prototype.lookup = function(options) {
485 return this.append({$lookup: options});
486};
487
488/**
489 * Appends new custom $graphLookup operator(s) to this aggregate pipeline, performing a recursive search on a collection.
490 *
491 * Note that graphLookup can only consume at most 100MB of memory, and does not allow disk use even if `{ allowDiskUse: true }` is specified.
492 *
493 * #### Examples:
494 * // Suppose we have a collection of courses, where a document might look like `{ _id: 0, name: 'Calculus', prerequisite: 'Trigonometry'}` and `{ _id: 0, name: 'Trigonometry', prerequisite: 'Algebra' }`
495 * aggregate.graphLookup({ from: 'courses', startWith: '$prerequisite', connectFromField: 'prerequisite', connectToField: 'name', as: 'prerequisites', maxDepth: 3 }) // this will recursively search the 'courses' collection up to 3 prerequisites
496 *
497 * @see $graphLookup https://docs.mongodb.com/manual/reference/operator/aggregation/graphLookup/#pipe._S_graphLookup
498 * @param {Object} options to $graphLookup as described in the above link
499 * @return {Aggregate}
500 * @api public
501 */
502
503Aggregate.prototype.graphLookup = function(options) {
504 const cloneOptions = {};
505 if (options) {
506 if (!utils.isObject(options)) {
507 throw new TypeError('Invalid graphLookup() argument. Must be an object.');
508 }
509
510 utils.mergeClone(cloneOptions, options);
511 const startWith = cloneOptions.startWith;
512
513 if (startWith && typeof startWith === 'string') {
514 cloneOptions.startWith = cloneOptions.startWith.startsWith('$') ?
515 cloneOptions.startWith :
516 '$' + cloneOptions.startWith;
517 }
518
519 }
520 return this.append({ $graphLookup: cloneOptions });
521};
522
523/**
524 * Appends new custom $sample operator(s) to this aggregate pipeline.
525 *
526 * ####Examples:
527 *
528 * aggregate.sample(3); // Add a pipeline that picks 3 random documents
529 *
530 * @see $sample https://docs.mongodb.org/manual/reference/operator/aggregation/sample/#pipe._S_sample
531 * @param {Number} size number of random documents to pick
532 * @return {Aggregate}
533 * @api public
534 */
535
536Aggregate.prototype.sample = function(size) {
537 return this.append({$sample: {size: size}});
538};
539
540/**
541 * Appends a new $sort operator to this aggregate pipeline.
542 *
543 * If an object is passed, values allowed are `asc`, `desc`, `ascending`, `descending`, `1`, and `-1`.
544 *
545 * If a string is passed, it must be a space delimited list of path names. The sort order of each path is ascending unless the path name is prefixed with `-` which will be treated as descending.
546 *
547 * ####Examples:
548 *
549 * // these are equivalent
550 * aggregate.sort({ field: 'asc', test: -1 });
551 * aggregate.sort('field -test');
552 *
553 * @see $sort http://docs.mongodb.org/manual/reference/aggregation/sort/
554 * @param {Object|String} arg
555 * @return {Aggregate} this
556 * @api public
557 */
558
559Aggregate.prototype.sort = function(arg) {
560 // TODO refactor to reuse the query builder logic
561
562 const sort = {};
563
564 if (arg.constructor.name === 'Object') {
565 const desc = ['desc', 'descending', -1];
566 Object.keys(arg).forEach(function(field) {
567 // If sorting by text score, skip coercing into 1/-1
568 if (arg[field] instanceof Object && arg[field].$meta) {
569 sort[field] = arg[field];
570 return;
571 }
572 sort[field] = desc.indexOf(arg[field]) === -1 ? 1 : -1;
573 });
574 } else if (arguments.length === 1 && typeof arg === 'string') {
575 arg.split(/\s+/).forEach(function(field) {
576 if (!field) {
577 return;
578 }
579 const ascend = field[0] === '-' ? -1 : 1;
580 if (ascend === -1) {
581 field = field.substring(1);
582 }
583 sort[field] = ascend;
584 });
585 } else {
586 throw new TypeError('Invalid sort() argument. Must be a string or object.');
587 }
588
589 return this.append({$sort: sort});
590};
591
592/**
593 * Sets the readPreference option for the aggregation query.
594 *
595 * ####Example:
596 *
597 * Model.aggregate(..).read('primaryPreferred').exec(callback)
598 *
599 * @param {String} pref one of the listed preference options or their aliases
600 * @param {Array} [tags] optional tags for this query
601 * @return {Aggregate} this
602 * @api public
603 * @see mongodb http://docs.mongodb.org/manual/applications/replication/#read-preference
604 * @see driver http://mongodb.github.com/node-mongodb-native/driver-articles/anintroductionto1_1and2_2.html#read-preferences
605 */
606
607Aggregate.prototype.read = function(pref, tags) {
608 if (!this.options) {
609 this.options = {};
610 }
611 read.call(this, pref, tags);
612 return this;
613};
614
615/**
616 * Sets the readConcern level for the aggregation query.
617 *
618 * ####Example:
619 *
620 * Model.aggregate(..).readConcern('majority').exec(callback)
621 *
622 * @param {String} level one of the listed read concern level or their aliases
623 * @see mongodb https://docs.mongodb.com/manual/reference/read-concern/
624 * @return {Aggregate} this
625 * @api public
626 */
627
628Aggregate.prototype.readConcern = function(level) {
629 if (!this.options) {
630 this.options = {};
631 }
632 readConcern.call(this, level);
633 return this;
634};
635
636/**
637 * Appends a new $redact operator to this aggregate pipeline.
638 *
639 * If 3 arguments are supplied, Mongoose will wrap them with if-then-else of $cond operator respectively
640 * If `thenExpr` or `elseExpr` is string, make sure it starts with $$, like `$$DESCEND`, `$$PRUNE` or `$$KEEP`.
641 *
642 * ####Example:
643 *
644 * Model.aggregate(...)
645 * .redact({
646 * $cond: {
647 * if: { $eq: [ '$level', 5 ] },
648 * then: '$$PRUNE',
649 * else: '$$DESCEND'
650 * }
651 * })
652 * .exec();
653 *
654 * // $redact often comes with $cond operator, you can also use the following syntax provided by mongoose
655 * Model.aggregate(...)
656 * .redact({ $eq: [ '$level', 5 ] }, '$$PRUNE', '$$DESCEND')
657 * .exec();
658 *
659 * @param {Object} expression redact options or conditional expression
660 * @param {String|Object} [thenExpr] true case for the condition
661 * @param {String|Object} [elseExpr] false case for the condition
662 * @return {Aggregate} this
663 * @see $redact https://docs.mongodb.com/manual/reference/operator/aggregation/redact/
664 * @api public
665 */
666
667Aggregate.prototype.redact = function(expression, thenExpr, elseExpr) {
668 if (arguments.length === 3) {
669 if ((typeof thenExpr === 'string' && !thenExpr.startsWith('$$')) ||
670 (typeof elseExpr === 'string' && !elseExpr.startsWith('$$'))) {
671 throw new Error('If thenExpr or elseExpr is string, it must start with $$. e.g. $$DESCEND, $$PRUNE, $$KEEP');
672 }
673
674 expression = {
675 $cond: {
676 if: expression,
677 then: thenExpr,
678 else: elseExpr
679 }
680 };
681 } else if (arguments.length !== 1) {
682 throw new TypeError('Invalid arguments');
683 }
684
685 return this.append({$redact: expression});
686};
687
688/**
689 * Execute the aggregation with explain
690 *
691 * ####Example:
692 *
693 * Model.aggregate(..).explain(callback)
694 *
695 * @param {Function} callback
696 * @return {Promise}
697 */
698
699Aggregate.prototype.explain = function(callback) {
700 return utils.promiseOrCallback(callback, cb => {
701 if (!this._pipeline.length) {
702 const err = new Error('Aggregate has empty pipeline');
703 return cb(err);
704 }
705
706 prepareDiscriminatorPipeline(this);
707
708 this._model.collection.
709 aggregate(this._pipeline, this.options || {}).
710 explain(function(error, result) {
711 if (error) {
712 return cb(error);
713 }
714 cb(null, result);
715 });
716 }, this._model.events);
717};
718
719/**
720 * Sets the allowDiskUse option for the aggregation query (ignored for < 2.6.0)
721 *
722 * ####Example:
723 *
724 * await Model.aggregate([{ $match: { foo: 'bar' } }]).allowDiskUse(true);
725 *
726 * @param {Boolean} value Should tell server it can use hard drive to store data during aggregation.
727 * @param {Array} [tags] optional tags for this query
728 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
729 */
730
731Aggregate.prototype.allowDiskUse = function(value) {
732 this.options.allowDiskUse = value;
733 return this;
734};
735
736/**
737 * Sets the hint option for the aggregation query (ignored for < 3.6.0)
738 *
739 * ####Example:
740 *
741 * Model.aggregate(..).hint({ qty: 1, category: 1 }).exec(callback)
742 *
743 * @param {Object|String} value a hint object or the index name
744 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
745 */
746
747Aggregate.prototype.hint = function(value) {
748 this.options.hint = value;
749 return this;
750};
751
752/**
753 * Sets the session for this aggregation. Useful for [transactions](/docs/transactions.html).
754 *
755 * ####Example:
756 *
757 * const session = await Model.startSession();
758 * await Model.aggregate(..).session(session);
759 *
760 * @param {ClientSession} session
761 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
762 */
763
764Aggregate.prototype.session = function(session) {
765 if (session == null) {
766 delete this.options.session;
767 } else {
768 this.options.session = session;
769 }
770 return this;
771};
772
773/**
774 * Lets you set arbitrary options, for middleware or plugins.
775 *
776 * ####Example:
777 *
778 * var agg = Model.aggregate(..).option({ allowDiskUse: true }); // Set the `allowDiskUse` option
779 * agg.options; // `{ allowDiskUse: true }`
780 *
781 * @param {Object} options keys to merge into current options
782 * @param [options.maxTimeMS] number limits the time this aggregation will run, see [MongoDB docs on `maxTimeMS`](https://docs.mongodb.com/manual/reference/operator/meta/maxTimeMS/)
783 * @param [options.allowDiskUse] boolean if true, the MongoDB server will use the hard drive to store data during this aggregation
784 * @param [options.collation] object see [`Aggregate.prototype.collation()`](./docs/api.html#aggregate_Aggregate-collation)
785 * @param [options.session] ClientSession see [`Aggregate.prototype.session()`](./docs/api.html#aggregate_Aggregate-session)
786 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
787 * @return {Aggregate} this
788 * @api public
789 */
790
791Aggregate.prototype.option = function(value) {
792 for (const key in value) {
793 this.options[key] = value[key];
794 }
795 return this;
796};
797
798/**
799 * Sets the cursor option option for the aggregation query (ignored for < 2.6.0).
800 * Note the different syntax below: .exec() returns a cursor object, and no callback
801 * is necessary.
802 *
803 * ####Example:
804 *
805 * var cursor = Model.aggregate(..).cursor({ batchSize: 1000 }).exec();
806 * cursor.eachAsync(function(error, doc) {
807 * // use doc
808 * });
809 *
810 * @param {Object} options
811 * @param {Number} options.batchSize set the cursor batch size
812 * @param {Boolean} [options.useMongooseAggCursor] use experimental mongoose-specific aggregation cursor (for `eachAsync()` and other query cursor semantics)
813 * @return {Aggregate} this
814 * @api public
815 * @see mongodb http://mongodb.github.io/node-mongodb-native/2.0/api/AggregationCursor.html
816 */
817
818Aggregate.prototype.cursor = function(options) {
819 if (!this.options) {
820 this.options = {};
821 }
822 this.options.cursor = options || {};
823 return this;
824};
825
826/**
827 * Sets an option on this aggregation. This function will be deprecated in a
828 * future release. Use the [`cursor()`](./api.html#aggregate_Aggregate-cursor),
829 * [`collation()`](./api.html#aggregate_Aggregate-collation), etc. helpers to
830 * set individual options, or access `agg.options` directly.
831 *
832 * Note that MongoDB aggregations [do **not** support the `noCursorTimeout` flag](https://jira.mongodb.org/browse/SERVER-6036),
833 * if you try setting that flag with this function you will get a "unrecognized field 'noCursorTimeout'" error.
834 *
835 * @param {String} flag
836 * @param {Boolean} value
837 * @return {Aggregate} this
838 * @api public
839 * @deprecated Use [`.option()`](api.html#aggregate_Aggregate-option) instead. Note that MongoDB aggregations do **not** support a `noCursorTimeout` option.
840 */
841
842Aggregate.prototype.addCursorFlag = util.deprecate(function(flag, value) {
843 if (!this.options) {
844 this.options = {};
845 }
846 this.options[flag] = value;
847 return this;
848}, 'Mongoose: `Aggregate#addCursorFlag()` is deprecated, use `option()` instead');
849
850/**
851 * Adds a collation
852 *
853 * ####Example:
854 *
855 * Model.aggregate(..).collation({ locale: 'en_US', strength: 1 }).exec();
856 *
857 * @param {Object} collation options
858 * @return {Aggregate} this
859 * @api public
860 * @see mongodb http://mongodb.github.io/node-mongodb-native/2.2/api/Collection.html#aggregate
861 */
862
863Aggregate.prototype.collation = function(collation) {
864 if (!this.options) {
865 this.options = {};
866 }
867 this.options.collation = collation;
868 return this;
869};
870
871/**
872 * Combines multiple aggregation pipelines.
873 *
874 * ####Example:
875 *
876 * Model.aggregate(...)
877 * .facet({
878 * books: [{ groupBy: '$author' }],
879 * price: [{ $bucketAuto: { groupBy: '$price', buckets: 2 } }]
880 * })
881 * .exec();
882 *
883 * // Output: { books: [...], price: [{...}, {...}] }
884 *
885 * @param {Object} facet options
886 * @return {Aggregate} this
887 * @see $facet https://docs.mongodb.com/v3.4/reference/operator/aggregation/facet/
888 * @api public
889 */
890
891Aggregate.prototype.facet = function(options) {
892 return this.append({$facet: options});
893};
894
895/**
896 * Returns the current pipeline
897 *
898 * ####Example:
899 *
900 * MyModel.aggregate().match({ test: 1 }).pipeline(); // [{ $match: { test: 1 } }]
901 *
902 * @return {Array}
903 * @api public
904 */
905
906
907Aggregate.prototype.pipeline = function() {
908 return this._pipeline;
909};
910
911/**
912 * Executes the aggregate pipeline on the currently bound Model.
913 *
914 * ####Example:
915 *
916 * aggregate.exec(callback);
917 *
918 * // Because a promise is returned, the `callback` is optional.
919 * var promise = aggregate.exec();
920 * promise.then(..);
921 *
922 * @see Promise #promise_Promise
923 * @param {Function} [callback]
924 * @return {Promise}
925 * @api public
926 */
927
928Aggregate.prototype.exec = function(callback) {
929 if (!this._model) {
930 throw new Error('Aggregate not bound to any Model');
931 }
932 const model = this._model;
933 const pipeline = this._pipeline;
934 const collection = this._model.collection;
935
936 applyGlobalMaxTimeMS(this.options, model);
937
938 if (this.options && this.options.cursor) {
939 return new AggregationCursor(this);
940 }
941
942 return utils.promiseOrCallback(callback, cb => {
943 if (!pipeline.length) {
944 const err = new Error('Aggregate has empty pipeline');
945 return cb(err);
946 }
947
948 prepareDiscriminatorPipeline(this);
949
950 model.hooks.execPre('aggregate', this, error => {
951 if (error) {
952 const _opts = { error: error };
953 return model.hooks.execPost('aggregate', this, [null], _opts, error => {
954 cb(error);
955 });
956 }
957
958 const options = utils.clone(this.options || {});
959 collection.aggregate(pipeline, options, (error, cursor) => {
960 if (error) {
961 const _opts = { error: error };
962 return model.hooks.execPost('aggregate', this, [null], _opts, error => {
963 if (error) {
964 return cb(error);
965 }
966 return cb(null);
967 });
968 }
969 cursor.toArray((error, result) => {
970 const _opts = { error: error };
971 model.hooks.execPost('aggregate', this, [result], _opts, (error, result) => {
972 if (error) {
973 return cb(error);
974 }
975
976 cb(null, result);
977 });
978 });
979 });
980 });
981 }, model.events);
982};
983
984/**
985 * Provides promise for aggregate.
986 *
987 * ####Example:
988 *
989 * Model.aggregate(..).then(successCallback, errorCallback);
990 *
991 * @see Promise #promise_Promise
992 * @param {Function} [resolve] successCallback
993 * @param {Function} [reject] errorCallback
994 * @return {Promise}
995 */
996Aggregate.prototype.then = function(resolve, reject) {
997 return this.exec().then(resolve, reject);
998};
999
1000/**
1001 * Executes the query returning a `Promise` which will be
1002 * resolved with either the doc(s) or rejected with the error.
1003 * Like [`.then()`](#query_Query-then), but only takes a rejection handler.
1004 *
1005 * @param {Function} [reject]
1006 * @return {Promise}
1007 * @api public
1008 */
1009
1010Aggregate.prototype.catch = function(reject) {
1011 return this.exec().then(null, reject);
1012};
1013
1014/**
1015 * Returns an asyncIterator for use with [`for/await/of` loops](http://bit.ly/async-iterators)
1016 * This function *only* works for `find()` queries.
1017 * You do not need to call this function explicitly, the JavaScript runtime
1018 * will call it for you.
1019 *
1020 * ####Example
1021 *
1022 * for await (const doc of Model.find().sort({ name: 1 })) {
1023 * console.log(doc.name);
1024 * }
1025 *
1026 * Node.js 10.x supports async iterators natively without any flags. You can
1027 * enable async iterators in Node.js 8.x using the [`--harmony_async_iteration` flag](https://github.com/tc39/proposal-async-iteration/issues/117#issuecomment-346695187).
1028 *
1029 * **Note:** This function is not if `Symbol.asyncIterator` is undefined. If
1030 * `Symbol.asyncIterator` is undefined, that means your Node.js version does not
1031 * support async iterators.
1032 *
1033 * @method Symbol.asyncIterator
1034 * @memberOf Aggregate
1035 * @instance
1036 * @api public
1037 */
1038
1039if (Symbol.asyncIterator != null) {
1040 Aggregate.prototype[Symbol.asyncIterator] = function() {
1041 return this.cursor({ useMongooseAggCursor: true }).
1042 exec().
1043 transformNull().
1044 map(doc => {
1045 return doc == null ? { done: true } : { value: doc, done: false };
1046 });
1047 };
1048}
1049
1050/*!
1051 * Helpers
1052 */
1053
1054/**
1055 * Checks whether an object is likely a pipeline operator
1056 *
1057 * @param {Object} obj object to check
1058 * @return {Boolean}
1059 * @api private
1060 */
1061
1062function isOperator(obj) {
1063 if (typeof obj !== 'object') {
1064 return false;
1065 }
1066
1067 const k = Object.keys(obj);
1068
1069 return k.length === 1 && k.some(key => { return key[0] === '$'; });
1070}
1071
1072/*!
1073 * Adds the appropriate `$match` pipeline step to the top of an aggregate's
1074 * pipeline, should it's model is a non-root discriminator type. This is
1075 * analogous to the `prepareDiscriminatorCriteria` function in `lib/query.js`.
1076 *
1077 * @param {Aggregate} aggregate Aggregate to prepare
1078 */
1079
1080Aggregate._prepareDiscriminatorPipeline = prepareDiscriminatorPipeline;
1081
1082function prepareDiscriminatorPipeline(aggregate) {
1083 const schema = aggregate._model.schema;
1084 const discriminatorMapping = schema && schema.discriminatorMapping;
1085
1086 if (discriminatorMapping && !discriminatorMapping.isRoot) {
1087 const originalPipeline = aggregate._pipeline;
1088 const discriminatorKey = discriminatorMapping.key;
1089 const discriminatorValue = discriminatorMapping.value;
1090
1091 // If the first pipeline stage is a match and it doesn't specify a `__t`
1092 // key, add the discriminator key to it. This allows for potential
1093 // aggregation query optimizations not to be disturbed by this feature.
1094 if (originalPipeline[0] && originalPipeline[0].$match && !originalPipeline[0].$match[discriminatorKey]) {
1095 originalPipeline[0].$match[discriminatorKey] = discriminatorValue;
1096 // `originalPipeline` is a ref, so there's no need for
1097 // aggregate._pipeline = originalPipeline
1098 } else if (originalPipeline[0] && originalPipeline[0].$geoNear) {
1099 originalPipeline[0].$geoNear.query =
1100 originalPipeline[0].$geoNear.query || {};
1101 originalPipeline[0].$geoNear.query[discriminatorKey] = discriminatorValue;
1102 } else {
1103 const match = {};
1104 match[discriminatorKey] = discriminatorValue;
1105 aggregate._pipeline.unshift({ $match: match });
1106 }
1107 }
1108}
1109
1110/*!
1111 * Exports
1112 */
1113
1114module.exports = Aggregate;