UNPKG

33.1 kBJavaScriptView Raw
1'use strict';
2
3/*!
4 * Module dependencies
5 */
6
7const AggregationCursor = require('./cursor/AggregationCursor');
8const Query = require('./query');
9const applyGlobalMaxTimeMS = require('./helpers/query/applyGlobalMaxTimeMS');
10const promiseOrCallback = require('./helpers/promiseOrCallback');
11const util = require('util');
12const utils = require('./utils');
13const read = Query.prototype.read;
14const readConcern = Query.prototype.readConcern;
15
16/**
17 * Aggregate constructor used for building aggregation pipelines. Do not
18 * instantiate this class directly, use [Model.aggregate()](/docs/api.html#model_Model.aggregate) instead.
19 *
20 * ####Example:
21 *
22 * const aggregate = Model.aggregate([
23 * { $project: { a: 1, b: 1 } },
24 * { $skip: 5 }
25 * ]);
26 *
27 * Model.
28 * aggregate([{ $match: { age: { $gte: 21 }}}]).
29 * unwind('tags').
30 * exec(callback);
31 *
32 * ####Note:
33 *
34 * - The documents returned are plain javascript objects, not mongoose documents (since any shape of document can be returned).
35 * - Mongoose does **not** cast pipeline stages. The below will **not** work unless `_id` is a string in the database
36 *
37 * ```javascript
38 * new Aggregate([{ $match: { _id: '00000000000000000000000a' } }]);
39 * // Do this instead to cast to an ObjectId
40 * new Aggregate([{ $match: { _id: mongoose.Types.ObjectId('00000000000000000000000a') } }]);
41 * ```
42 *
43 * @see MongoDB http://docs.mongodb.org/manual/applications/aggregation/
44 * @see driver http://mongodb.github.com/node-mongodb-native/api-generated/collection.html#aggregate
45 * @param {Array} [pipeline] aggregation pipeline as an array of objects
46 * @api public
47 */
48
49function Aggregate(pipeline) {
50 this._pipeline = [];
51 this._model = undefined;
52 this.options = {};
53
54 if (arguments.length === 1 && util.isArray(pipeline)) {
55 this.append.apply(this, pipeline);
56 }
57}
58
59/**
60 * Contains options passed down to the [aggregate command](https://docs.mongodb.com/manual/reference/command/aggregate/).
61 * Supported options are:
62 *
63 * - `readPreference`
64 * - [`cursor`](./api.html#aggregate_Aggregate-cursor)
65 * - [`explain`](./api.html#aggregate_Aggregate-explain)
66 * - [`allowDiskUse`](./api.html#aggregate_Aggregate-allowDiskUse)
67 * - `maxTimeMS`
68 * - `bypassDocumentValidation`
69 * - `raw`
70 * - `promoteLongs`
71 * - `promoteValues`
72 * - `promoteBuffers`
73 * - [`collation`](./api.html#aggregate_Aggregate-collation)
74 * - `comment`
75 * - [`session`](./api.html#aggregate_Aggregate-session)
76 *
77 * @property options
78 * @memberOf Aggregate
79 * @api public
80 */
81
82Aggregate.prototype.options;
83
84/**
85 * Get/set the model that this aggregation will execute on.
86 *
87 * ####Example:
88 * const aggregate = MyModel.aggregate([{ $match: { answer: 42 } }]);
89 * aggregate.model() === MyModel; // true
90 *
91 * // Change the model. There's rarely any reason to do this.
92 * aggregate.model(SomeOtherModel);
93 * aggregate.model() === SomeOtherModel; // true
94 *
95 * @param {Model} [model] the model to which the aggregate is to be bound
96 * @return {Aggregate|Model} if model is passed, will return `this`, otherwise will return the model
97 * @api public
98 */
99
100Aggregate.prototype.model = function(model) {
101 if (arguments.length === 0) {
102 return this._model;
103 }
104
105 this._model = model;
106 if (model.schema != null) {
107 if (this.options.readPreference == null &&
108 model.schema.options.read != null) {
109 this.options.readPreference = model.schema.options.read;
110 }
111 if (this.options.collation == null &&
112 model.schema.options.collation != null) {
113 this.options.collation = model.schema.options.collation;
114 }
115 }
116 return this;
117};
118
119/**
120 * Appends new operators to this aggregate pipeline
121 *
122 * ####Examples:
123 *
124 * aggregate.append({ $project: { field: 1 }}, { $limit: 2 });
125 *
126 * // or pass an array
127 * var pipeline = [{ $match: { daw: 'Logic Audio X' }} ];
128 * aggregate.append(pipeline);
129 *
130 * @param {Object} ops operator(s) to append
131 * @return {Aggregate}
132 * @api public
133 */
134
135Aggregate.prototype.append = function() {
136 const args = (arguments.length === 1 && util.isArray(arguments[0]))
137 ? arguments[0]
138 : utils.args(arguments);
139
140 if (!args.every(isOperator)) {
141 throw new Error('Arguments must be aggregate pipeline operators');
142 }
143
144 this._pipeline = this._pipeline.concat(args);
145
146 return this;
147};
148
149/**
150 * Appends a new $addFields operator to this aggregate pipeline.
151 * Requires MongoDB v3.4+ to work
152 *
153 * ####Examples:
154 *
155 * // adding new fields based on existing fields
156 * aggregate.addFields({
157 * newField: '$b.nested'
158 * , plusTen: { $add: ['$val', 10]}
159 * , sub: {
160 * name: '$a'
161 * }
162 * })
163 *
164 * // etc
165 * aggregate.addFields({ salary_k: { $divide: [ "$salary", 1000 ] } });
166 *
167 * @param {Object} arg field specification
168 * @see $addFields https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/
169 * @return {Aggregate}
170 * @api public
171 */
172Aggregate.prototype.addFields = function(arg) {
173 const fields = {};
174 if (typeof arg === 'object' && !util.isArray(arg)) {
175 Object.keys(arg).forEach(function(field) {
176 fields[field] = arg[field];
177 });
178 } else {
179 throw new Error('Invalid addFields() argument. Must be an object');
180 }
181 return this.append({$addFields: fields});
182};
183
184/**
185 * Appends a new $project operator to this aggregate pipeline.
186 *
187 * Mongoose query [selection syntax](#query_Query-select) is also supported.
188 *
189 * ####Examples:
190 *
191 * // include a, include b, exclude _id
192 * aggregate.project("a b -_id");
193 *
194 * // or you may use object notation, useful when
195 * // you have keys already prefixed with a "-"
196 * aggregate.project({a: 1, b: 1, _id: 0});
197 *
198 * // reshaping documents
199 * aggregate.project({
200 * newField: '$b.nested'
201 * , plusTen: { $add: ['$val', 10]}
202 * , sub: {
203 * name: '$a'
204 * }
205 * })
206 *
207 * // etc
208 * aggregate.project({ salary_k: { $divide: [ "$salary", 1000 ] } });
209 *
210 * @param {Object|String} arg field specification
211 * @see projection http://docs.mongodb.org/manual/reference/aggregation/project/
212 * @return {Aggregate}
213 * @api public
214 */
215
216Aggregate.prototype.project = function(arg) {
217 const fields = {};
218
219 if (typeof arg === 'object' && !util.isArray(arg)) {
220 Object.keys(arg).forEach(function(field) {
221 fields[field] = arg[field];
222 });
223 } else if (arguments.length === 1 && typeof arg === 'string') {
224 arg.split(/\s+/).forEach(function(field) {
225 if (!field) {
226 return;
227 }
228 const include = field[0] === '-' ? 0 : 1;
229 if (include === 0) {
230 field = field.substring(1);
231 }
232 fields[field] = include;
233 });
234 } else {
235 throw new Error('Invalid project() argument. Must be string or object');
236 }
237
238 return this.append({$project: fields});
239};
240
241/**
242 * Appends a new custom $group operator to this aggregate pipeline.
243 *
244 * ####Examples:
245 *
246 * aggregate.group({ _id: "$department" });
247 *
248 * @see $group http://docs.mongodb.org/manual/reference/aggregation/group/
249 * @method group
250 * @memberOf Aggregate
251 * @instance
252 * @param {Object} arg $group operator contents
253 * @return {Aggregate}
254 * @api public
255 */
256
257/**
258 * Appends a new custom $match operator to this aggregate pipeline.
259 *
260 * ####Examples:
261 *
262 * aggregate.match({ department: { $in: [ "sales", "engineering" ] } });
263 *
264 * @see $match http://docs.mongodb.org/manual/reference/aggregation/match/
265 * @method match
266 * @memberOf Aggregate
267 * @instance
268 * @param {Object} arg $match operator contents
269 * @return {Aggregate}
270 * @api public
271 */
272
273/**
274 * Appends a new $skip operator to this aggregate pipeline.
275 *
276 * ####Examples:
277 *
278 * aggregate.skip(10);
279 *
280 * @see $skip http://docs.mongodb.org/manual/reference/aggregation/skip/
281 * @method skip
282 * @memberOf Aggregate
283 * @instance
284 * @param {Number} num number of records to skip before next stage
285 * @return {Aggregate}
286 * @api public
287 */
288
289/**
290 * Appends a new $limit operator to this aggregate pipeline.
291 *
292 * ####Examples:
293 *
294 * aggregate.limit(10);
295 *
296 * @see $limit http://docs.mongodb.org/manual/reference/aggregation/limit/
297 * @method limit
298 * @memberOf Aggregate
299 * @instance
300 * @param {Number} num maximum number of records to pass to the next stage
301 * @return {Aggregate}
302 * @api public
303 */
304
305/**
306 * Appends a new $geoNear operator to this aggregate pipeline.
307 *
308 * ####NOTE:
309 *
310 * **MUST** be used as the first operator in the pipeline.
311 *
312 * ####Examples:
313 *
314 * aggregate.near({
315 * near: [40.724, -73.997],
316 * distanceField: "dist.calculated", // required
317 * maxDistance: 0.008,
318 * query: { type: "public" },
319 * includeLocs: "dist.location",
320 * uniqueDocs: true,
321 * num: 5
322 * });
323 *
324 * @see $geoNear http://docs.mongodb.org/manual/reference/aggregation/geoNear/
325 * @method near
326 * @memberOf Aggregate
327 * @instance
328 * @param {Object} arg
329 * @return {Aggregate}
330 * @api public
331 */
332
333Aggregate.prototype.near = function(arg) {
334 const op = {};
335 op.$geoNear = arg;
336 return this.append(op);
337};
338
339/*!
340 * define methods
341 */
342
343'group match skip limit out'.split(' ').forEach(function($operator) {
344 Aggregate.prototype[$operator] = function(arg) {
345 const op = {};
346 op['$' + $operator] = arg;
347 return this.append(op);
348 };
349});
350
351/**
352 * Appends new custom $unwind operator(s) to this aggregate pipeline.
353 *
354 * Note that the `$unwind` operator requires the path name to start with '$'.
355 * Mongoose will prepend '$' if the specified field doesn't start '$'.
356 *
357 * ####Examples:
358 *
359 * aggregate.unwind("tags");
360 * aggregate.unwind("a", "b", "c");
361 * aggregate.unwind({ path: '$tags', preserveNullAndEmptyArrays: true });
362 *
363 * @see $unwind http://docs.mongodb.org/manual/reference/aggregation/unwind/
364 * @param {String|Object} fields the field(s) to unwind, either as field names or as [objects with options](https://docs.mongodb.com/manual/reference/operator/aggregation/unwind/#document-operand-with-options). If passing a string, prefixing the field name with '$' is optional. If passing an object, `path` must start with '$'.
365 * @return {Aggregate}
366 * @api public
367 */
368
369Aggregate.prototype.unwind = function() {
370 const args = utils.args(arguments);
371
372 const res = [];
373 for (let i = 0; i < args.length; ++i) {
374 const arg = args[i];
375 if (arg && typeof arg === 'object') {
376 res.push({ $unwind: arg });
377 } else if (typeof arg === 'string') {
378 res.push({
379 $unwind: (arg && arg.startsWith('$')) ? arg : '$' + arg
380 });
381 } else {
382 throw new Error('Invalid arg "' + arg + '" to unwind(), ' +
383 'must be string or object');
384 }
385 }
386
387 return this.append.apply(this, res);
388};
389
390/**
391 * Appends a new $replaceRoot operator to this aggregate pipeline.
392 *
393 * Note that the `$replaceRoot` operator requires field strings to start with '$'.
394 * If you are passing in a string Mongoose will prepend '$' if the specified field doesn't start '$'.
395 * If you are passing in an object the strings in your expression will not be altered.
396 *
397 * ####Examples:
398 *
399 * aggregate.replaceRoot("user");
400 *
401 * aggregate.replaceRoot({ x: { $concat: ['$this', '$that'] } });
402 *
403 * @see $replaceRoot https://docs.mongodb.org/manual/reference/operator/aggregation/replaceRoot
404 * @param {String|Object} the field or document which will become the new root document
405 * @return {Aggregate}
406 * @api public
407 */
408
409Aggregate.prototype.replaceRoot = function(newRoot) {
410 let ret;
411
412 if (typeof newRoot === 'string') {
413 ret = newRoot.startsWith('$') ? newRoot : '$' + newRoot;
414 } else {
415 ret = newRoot;
416 }
417
418 return this.append({
419 $replaceRoot: {
420 newRoot: ret
421 }
422 });
423};
424
425/**
426 * Appends a new $count operator to this aggregate pipeline.
427 *
428 * ####Examples:
429 *
430 * aggregate.count("userCount");
431 *
432 * @see $count https://docs.mongodb.org/manual/reference/operator/aggregation/count
433 * @param {String} the name of the count field
434 * @return {Aggregate}
435 * @api public
436 */
437
438Aggregate.prototype.count = function(countName) {
439 return this.append({ $count: countName });
440};
441
442/**
443 * Appends a new $sortByCount operator to this aggregate pipeline. Accepts either a string field name
444 * or a pipeline object.
445 *
446 * Note that the `$sortByCount` operator requires the new root to start with '$'.
447 * Mongoose will prepend '$' if the specified field name doesn't start with '$'.
448 *
449 * ####Examples:
450 *
451 * aggregate.sortByCount('users');
452 * aggregate.sortByCount({ $mergeObjects: [ "$employee", "$business" ] })
453 *
454 * @see $sortByCount https://docs.mongodb.com/manual/reference/operator/aggregation/sortByCount/
455 * @param {Object|String} arg
456 * @return {Aggregate} this
457 * @api public
458 */
459
460Aggregate.prototype.sortByCount = function(arg) {
461 if (arg && typeof arg === 'object') {
462 return this.append({ $sortByCount: arg });
463 } else if (typeof arg === 'string') {
464 return this.append({
465 $sortByCount: (arg && arg.startsWith('$')) ? arg : '$' + arg
466 });
467 } else {
468 throw new TypeError('Invalid arg "' + arg + '" to sortByCount(), ' +
469 'must be string or object');
470 }
471};
472
473/**
474 * Appends new custom $lookup operator(s) to this aggregate pipeline.
475 *
476 * ####Examples:
477 *
478 * aggregate.lookup({ from: 'users', localField: 'userId', foreignField: '_id', as: 'users' });
479 *
480 * @see $lookup https://docs.mongodb.org/manual/reference/operator/aggregation/lookup/#pipe._S_lookup
481 * @param {Object} options to $lookup as described in the above link
482 * @return {Aggregate}
483 * @api public
484 */
485
486Aggregate.prototype.lookup = function(options) {
487 return this.append({$lookup: options});
488};
489
490/**
491 * Appends new custom $graphLookup operator(s) to this aggregate pipeline, performing a recursive search on a collection.
492 *
493 * Note that graphLookup can only consume at most 100MB of memory, and does not allow disk use even if `{ allowDiskUse: true }` is specified.
494 *
495 * #### Examples:
496 * // Suppose we have a collection of courses, where a document might look like `{ _id: 0, name: 'Calculus', prerequisite: 'Trigonometry'}` and `{ _id: 0, name: 'Trigonometry', prerequisite: 'Algebra' }`
497 * aggregate.graphLookup({ from: 'courses', startWith: '$prerequisite', connectFromField: 'prerequisite', connectToField: 'name', as: 'prerequisites', maxDepth: 3 }) // this will recursively search the 'courses' collection up to 3 prerequisites
498 *
499 * @see $graphLookup https://docs.mongodb.com/manual/reference/operator/aggregation/graphLookup/#pipe._S_graphLookup
500 * @param {Object} options to $graphLookup as described in the above link
501 * @return {Aggregate}
502 * @api public
503 */
504
505Aggregate.prototype.graphLookup = function(options) {
506 const cloneOptions = {};
507 if (options) {
508 if (!utils.isObject(options)) {
509 throw new TypeError('Invalid graphLookup() argument. Must be an object.');
510 }
511
512 utils.mergeClone(cloneOptions, options);
513 const startWith = cloneOptions.startWith;
514
515 if (startWith && typeof startWith === 'string') {
516 cloneOptions.startWith = cloneOptions.startWith.startsWith('$') ?
517 cloneOptions.startWith :
518 '$' + cloneOptions.startWith;
519 }
520
521 }
522 return this.append({ $graphLookup: cloneOptions });
523};
524
525/**
526 * Appends new custom $sample operator(s) to this aggregate pipeline.
527 *
528 * ####Examples:
529 *
530 * aggregate.sample(3); // Add a pipeline that picks 3 random documents
531 *
532 * @see $sample https://docs.mongodb.org/manual/reference/operator/aggregation/sample/#pipe._S_sample
533 * @param {Number} size number of random documents to pick
534 * @return {Aggregate}
535 * @api public
536 */
537
538Aggregate.prototype.sample = function(size) {
539 return this.append({$sample: {size: size}});
540};
541
542/**
543 * Appends a new $sort operator to this aggregate pipeline.
544 *
545 * If an object is passed, values allowed are `asc`, `desc`, `ascending`, `descending`, `1`, and `-1`.
546 *
547 * If a string is passed, it must be a space delimited list of path names. The sort order of each path is ascending unless the path name is prefixed with `-` which will be treated as descending.
548 *
549 * ####Examples:
550 *
551 * // these are equivalent
552 * aggregate.sort({ field: 'asc', test: -1 });
553 * aggregate.sort('field -test');
554 *
555 * @see $sort http://docs.mongodb.org/manual/reference/aggregation/sort/
556 * @param {Object|String} arg
557 * @return {Aggregate} this
558 * @api public
559 */
560
561Aggregate.prototype.sort = function(arg) {
562 // TODO refactor to reuse the query builder logic
563
564 const sort = {};
565
566 if (arg.constructor.name === 'Object') {
567 const desc = ['desc', 'descending', -1];
568 Object.keys(arg).forEach(function(field) {
569 // If sorting by text score, skip coercing into 1/-1
570 if (arg[field] instanceof Object && arg[field].$meta) {
571 sort[field] = arg[field];
572 return;
573 }
574 sort[field] = desc.indexOf(arg[field]) === -1 ? 1 : -1;
575 });
576 } else if (arguments.length === 1 && typeof arg === 'string') {
577 arg.split(/\s+/).forEach(function(field) {
578 if (!field) {
579 return;
580 }
581 const ascend = field[0] === '-' ? -1 : 1;
582 if (ascend === -1) {
583 field = field.substring(1);
584 }
585 sort[field] = ascend;
586 });
587 } else {
588 throw new TypeError('Invalid sort() argument. Must be a string or object.');
589 }
590
591 return this.append({$sort: sort});
592};
593
594/**
595 * Sets the readPreference option for the aggregation query.
596 *
597 * ####Example:
598 *
599 * Model.aggregate(..).read('primaryPreferred').exec(callback)
600 *
601 * @param {String} pref one of the listed preference options or their aliases
602 * @param {Array} [tags] optional tags for this query
603 * @return {Aggregate} this
604 * @api public
605 * @see mongodb http://docs.mongodb.org/manual/applications/replication/#read-preference
606 * @see driver http://mongodb.github.com/node-mongodb-native/driver-articles/anintroductionto1_1and2_2.html#read-preferences
607 */
608
609Aggregate.prototype.read = function(pref, tags) {
610 if (!this.options) {
611 this.options = {};
612 }
613 read.call(this, pref, tags);
614 return this;
615};
616
617/**
618 * Sets the readConcern level for the aggregation query.
619 *
620 * ####Example:
621 *
622 * Model.aggregate(..).readConcern('majority').exec(callback)
623 *
624 * @param {String} level one of the listed read concern level or their aliases
625 * @see mongodb https://docs.mongodb.com/manual/reference/read-concern/
626 * @return {Aggregate} this
627 * @api public
628 */
629
630Aggregate.prototype.readConcern = function(level) {
631 if (!this.options) {
632 this.options = {};
633 }
634 readConcern.call(this, level);
635 return this;
636};
637
638/**
639 * Appends a new $redact operator to this aggregate pipeline.
640 *
641 * If 3 arguments are supplied, Mongoose will wrap them with if-then-else of $cond operator respectively
642 * If `thenExpr` or `elseExpr` is string, make sure it starts with $$, like `$$DESCEND`, `$$PRUNE` or `$$KEEP`.
643 *
644 * ####Example:
645 *
646 * Model.aggregate(...)
647 * .redact({
648 * $cond: {
649 * if: { $eq: [ '$level', 5 ] },
650 * then: '$$PRUNE',
651 * else: '$$DESCEND'
652 * }
653 * })
654 * .exec();
655 *
656 * // $redact often comes with $cond operator, you can also use the following syntax provided by mongoose
657 * Model.aggregate(...)
658 * .redact({ $eq: [ '$level', 5 ] }, '$$PRUNE', '$$DESCEND')
659 * .exec();
660 *
661 * @param {Object} expression redact options or conditional expression
662 * @param {String|Object} [thenExpr] true case for the condition
663 * @param {String|Object} [elseExpr] false case for the condition
664 * @return {Aggregate} this
665 * @see $redact https://docs.mongodb.com/manual/reference/operator/aggregation/redact/
666 * @api public
667 */
668
669Aggregate.prototype.redact = function(expression, thenExpr, elseExpr) {
670 if (arguments.length === 3) {
671 if ((typeof thenExpr === 'string' && !thenExpr.startsWith('$$')) ||
672 (typeof elseExpr === 'string' && !elseExpr.startsWith('$$'))) {
673 throw new Error('If thenExpr or elseExpr is string, it must start with $$. e.g. $$DESCEND, $$PRUNE, $$KEEP');
674 }
675
676 expression = {
677 $cond: {
678 if: expression,
679 then: thenExpr,
680 else: elseExpr
681 }
682 };
683 } else if (arguments.length !== 1) {
684 throw new TypeError('Invalid arguments');
685 }
686
687 return this.append({$redact: expression});
688};
689
690/**
691 * Execute the aggregation with explain
692 *
693 * ####Example:
694 *
695 * Model.aggregate(..).explain(callback)
696 *
697 * @param {Function} callback
698 * @return {Promise}
699 */
700
701Aggregate.prototype.explain = function(callback) {
702 const model = this._model;
703
704 return promiseOrCallback(callback, cb => {
705 if (!this._pipeline.length) {
706 const err = new Error('Aggregate has empty pipeline');
707 return cb(err);
708 }
709
710 prepareDiscriminatorPipeline(this);
711
712 model.hooks.execPre('aggregate', this, error => {
713 if (error) {
714 const _opts = { error: error };
715 return model.hooks.execPost('aggregate', this, [null], _opts, error => {
716 cb(error);
717 });
718 }
719
720 this.options.explain = true;
721
722 model.collection.
723 aggregate(this._pipeline, this.options || {}).
724 explain((error, result) => {
725 const _opts = { error: error };
726 return model.hooks.execPost('aggregate', this, [result], _opts, error => {
727 if (error) {
728 return cb(error);
729 }
730 return cb(null, result);
731 });
732 });
733 });
734 }, model.events);
735};
736
737/**
738 * Sets the allowDiskUse option for the aggregation query (ignored for < 2.6.0)
739 *
740 * ####Example:
741 *
742 * await Model.aggregate([{ $match: { foo: 'bar' } }]).allowDiskUse(true);
743 *
744 * @param {Boolean} value Should tell server it can use hard drive to store data during aggregation.
745 * @param {Array} [tags] optional tags for this query
746 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
747 */
748
749Aggregate.prototype.allowDiskUse = function(value) {
750 this.options.allowDiskUse = value;
751 return this;
752};
753
754/**
755 * Sets the hint option for the aggregation query (ignored for < 3.6.0)
756 *
757 * ####Example:
758 *
759 * Model.aggregate(..).hint({ qty: 1, category: 1 }).exec(callback)
760 *
761 * @param {Object|String} value a hint object or the index name
762 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
763 */
764
765Aggregate.prototype.hint = function(value) {
766 this.options.hint = value;
767 return this;
768};
769
770/**
771 * Sets the session for this aggregation. Useful for [transactions](/docs/transactions.html).
772 *
773 * ####Example:
774 *
775 * const session = await Model.startSession();
776 * await Model.aggregate(..).session(session);
777 *
778 * @param {ClientSession} session
779 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
780 */
781
782Aggregate.prototype.session = function(session) {
783 if (session == null) {
784 delete this.options.session;
785 } else {
786 this.options.session = session;
787 }
788 return this;
789};
790
791/**
792 * Lets you set arbitrary options, for middleware or plugins.
793 *
794 * ####Example:
795 *
796 * var agg = Model.aggregate(..).option({ allowDiskUse: true }); // Set the `allowDiskUse` option
797 * agg.options; // `{ allowDiskUse: true }`
798 *
799 * @param {Object} options keys to merge into current options
800 * @param [options.maxTimeMS] number limits the time this aggregation will run, see [MongoDB docs on `maxTimeMS`](https://docs.mongodb.com/manual/reference/operator/meta/maxTimeMS/)
801 * @param [options.allowDiskUse] boolean if true, the MongoDB server will use the hard drive to store data during this aggregation
802 * @param [options.collation] object see [`Aggregate.prototype.collation()`](./docs/api.html#aggregate_Aggregate-collation)
803 * @param [options.session] ClientSession see [`Aggregate.prototype.session()`](./docs/api.html#aggregate_Aggregate-session)
804 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
805 * @return {Aggregate} this
806 * @api public
807 */
808
809Aggregate.prototype.option = function(value) {
810 for (const key in value) {
811 this.options[key] = value[key];
812 }
813 return this;
814};
815
816/**
817 * Sets the cursor option option for the aggregation query (ignored for < 2.6.0).
818 * Note the different syntax below: .exec() returns a cursor object, and no callback
819 * is necessary.
820 *
821 * ####Example:
822 *
823 * var cursor = Model.aggregate(..).cursor({ batchSize: 1000 }).exec();
824 * cursor.eachAsync(function(error, doc) {
825 * // use doc
826 * });
827 *
828 * @param {Object} options
829 * @param {Number} options.batchSize set the cursor batch size
830 * @param {Boolean} [options.useMongooseAggCursor] use experimental mongoose-specific aggregation cursor (for `eachAsync()` and other query cursor semantics)
831 * @return {Aggregate} this
832 * @api public
833 * @see mongodb http://mongodb.github.io/node-mongodb-native/2.0/api/AggregationCursor.html
834 */
835
836Aggregate.prototype.cursor = function(options) {
837 if (!this.options) {
838 this.options = {};
839 }
840 this.options.cursor = options || {};
841 return this;
842};
843
844/**
845 * Sets an option on this aggregation. This function will be deprecated in a
846 * future release. Use the [`cursor()`](./api.html#aggregate_Aggregate-cursor),
847 * [`collation()`](./api.html#aggregate_Aggregate-collation), etc. helpers to
848 * set individual options, or access `agg.options` directly.
849 *
850 * Note that MongoDB aggregations [do **not** support the `noCursorTimeout` flag](https://jira.mongodb.org/browse/SERVER-6036),
851 * if you try setting that flag with this function you will get a "unrecognized field 'noCursorTimeout'" error.
852 *
853 * @param {String} flag
854 * @param {Boolean} value
855 * @return {Aggregate} this
856 * @api public
857 * @deprecated Use [`.option()`](api.html#aggregate_Aggregate-option) instead. Note that MongoDB aggregations do **not** support a `noCursorTimeout` option.
858 */
859
860Aggregate.prototype.addCursorFlag = util.deprecate(function(flag, value) {
861 if (!this.options) {
862 this.options = {};
863 }
864 this.options[flag] = value;
865 return this;
866}, 'Mongoose: `Aggregate#addCursorFlag()` is deprecated, use `option()` instead');
867
868/**
869 * Adds a collation
870 *
871 * ####Example:
872 *
873 * Model.aggregate(..).collation({ locale: 'en_US', strength: 1 }).exec();
874 *
875 * @param {Object} collation options
876 * @return {Aggregate} this
877 * @api public
878 * @see mongodb http://mongodb.github.io/node-mongodb-native/2.2/api/Collection.html#aggregate
879 */
880
881Aggregate.prototype.collation = function(collation) {
882 if (!this.options) {
883 this.options = {};
884 }
885 this.options.collation = collation;
886 return this;
887};
888
889/**
890 * Combines multiple aggregation pipelines.
891 *
892 * ####Example:
893 *
894 * Model.aggregate(...)
895 * .facet({
896 * books: [{ groupBy: '$author' }],
897 * price: [{ $bucketAuto: { groupBy: '$price', buckets: 2 } }]
898 * })
899 * .exec();
900 *
901 * // Output: { books: [...], price: [{...}, {...}] }
902 *
903 * @param {Object} facet options
904 * @return {Aggregate} this
905 * @see $facet https://docs.mongodb.com/v3.4/reference/operator/aggregation/facet/
906 * @api public
907 */
908
909Aggregate.prototype.facet = function(options) {
910 return this.append({$facet: options});
911};
912
913/**
914 * Returns the current pipeline
915 *
916 * ####Example:
917 *
918 * MyModel.aggregate().match({ test: 1 }).pipeline(); // [{ $match: { test: 1 } }]
919 *
920 * @return {Array}
921 * @api public
922 */
923
924
925Aggregate.prototype.pipeline = function() {
926 return this._pipeline;
927};
928
929/**
930 * Executes the aggregate pipeline on the currently bound Model.
931 *
932 * ####Example:
933 *
934 * aggregate.exec(callback);
935 *
936 * // Because a promise is returned, the `callback` is optional.
937 * var promise = aggregate.exec();
938 * promise.then(..);
939 *
940 * @see Promise #promise_Promise
941 * @param {Function} [callback]
942 * @return {Promise}
943 * @api public
944 */
945
946Aggregate.prototype.exec = function(callback) {
947 if (!this._model) {
948 throw new Error('Aggregate not bound to any Model');
949 }
950 const model = this._model;
951 const collection = this._model.collection;
952
953 applyGlobalMaxTimeMS(this.options, model);
954
955 if (this.options && this.options.cursor) {
956 return new AggregationCursor(this);
957 }
958
959 return promiseOrCallback(callback, cb => {
960
961 prepareDiscriminatorPipeline(this);
962
963 model.hooks.execPre('aggregate', this, error => {
964 if (error) {
965 const _opts = { error: error };
966 return model.hooks.execPost('aggregate', this, [null], _opts, error => {
967 cb(error);
968 });
969 }
970 if (!this._pipeline.length) {
971 return cb(new Error('Aggregate has empty pipeline'));
972 }
973
974 const options = utils.clone(this.options || {});
975 collection.aggregate(this._pipeline, options, (error, cursor) => {
976 if (error) {
977 const _opts = { error: error };
978 return model.hooks.execPost('aggregate', this, [null], _opts, error => {
979 if (error) {
980 return cb(error);
981 }
982 return cb(null);
983 });
984 }
985 cursor.toArray((error, result) => {
986 const _opts = { error: error };
987 model.hooks.execPost('aggregate', this, [result], _opts, (error, result) => {
988 if (error) {
989 return cb(error);
990 }
991
992 cb(null, result);
993 });
994 });
995 });
996 });
997 }, model.events);
998};
999
1000/**
1001 * Provides promise for aggregate.
1002 *
1003 * ####Example:
1004 *
1005 * Model.aggregate(..).then(successCallback, errorCallback);
1006 *
1007 * @see Promise #promise_Promise
1008 * @param {Function} [resolve] successCallback
1009 * @param {Function} [reject] errorCallback
1010 * @return {Promise}
1011 */
1012Aggregate.prototype.then = function(resolve, reject) {
1013 return this.exec().then(resolve, reject);
1014};
1015
1016/**
1017 * Executes the query returning a `Promise` which will be
1018 * resolved with either the doc(s) or rejected with the error.
1019 * Like [`.then()`](#query_Query-then), but only takes a rejection handler.
1020 *
1021 * @param {Function} [reject]
1022 * @return {Promise}
1023 * @api public
1024 */
1025
1026Aggregate.prototype.catch = function(reject) {
1027 return this.exec().then(null, reject);
1028};
1029
1030/**
1031 * Returns an asyncIterator for use with [`for/await/of` loops](http://bit.ly/async-iterators)
1032 * You do not need to call this function explicitly, the JavaScript runtime
1033 * will call it for you.
1034 *
1035 * ####Example
1036 *
1037 * const agg = Model.aggregate([{ $match: { age: { $gte: 25 } } }]);
1038 * for await (const doc of agg) {
1039 * console.log(doc.name);
1040 * }
1041 *
1042 * Node.js 10.x supports async iterators natively without any flags. You can
1043 * enable async iterators in Node.js 8.x using the [`--harmony_async_iteration` flag](https://github.com/tc39/proposal-async-iteration/issues/117#issuecomment-346695187).
1044 *
1045 * **Note:** This function is not set if `Symbol.asyncIterator` is undefined. If
1046 * `Symbol.asyncIterator` is undefined, that means your Node.js version does not
1047 * support async iterators.
1048 *
1049 * @method Symbol.asyncIterator
1050 * @memberOf Aggregate
1051 * @instance
1052 * @api public
1053 */
1054
1055if (Symbol.asyncIterator != null) {
1056 Aggregate.prototype[Symbol.asyncIterator] = function() {
1057 return this.cursor({ useMongooseAggCursor: true }).
1058 exec().
1059 transformNull().
1060 map(doc => {
1061 return doc == null ? { done: true } : { value: doc, done: false };
1062 });
1063 };
1064}
1065
1066/*!
1067 * Helpers
1068 */
1069
1070/**
1071 * Checks whether an object is likely a pipeline operator
1072 *
1073 * @param {Object} obj object to check
1074 * @return {Boolean}
1075 * @api private
1076 */
1077
1078function isOperator(obj) {
1079 if (typeof obj !== 'object') {
1080 return false;
1081 }
1082
1083 const k = Object.keys(obj);
1084
1085 return k.length === 1 && k.some(key => { return key[0] === '$'; });
1086}
1087
1088/*!
1089 * Adds the appropriate `$match` pipeline step to the top of an aggregate's
1090 * pipeline, should it's model is a non-root discriminator type. This is
1091 * analogous to the `prepareDiscriminatorCriteria` function in `lib/query.js`.
1092 *
1093 * @param {Aggregate} aggregate Aggregate to prepare
1094 */
1095
1096Aggregate._prepareDiscriminatorPipeline = prepareDiscriminatorPipeline;
1097
1098function prepareDiscriminatorPipeline(aggregate) {
1099 const schema = aggregate._model.schema;
1100 const discriminatorMapping = schema && schema.discriminatorMapping;
1101
1102 if (discriminatorMapping && !discriminatorMapping.isRoot) {
1103 const originalPipeline = aggregate._pipeline;
1104 const discriminatorKey = discriminatorMapping.key;
1105 const discriminatorValue = discriminatorMapping.value;
1106
1107 // If the first pipeline stage is a match and it doesn't specify a `__t`
1108 // key, add the discriminator key to it. This allows for potential
1109 // aggregation query optimizations not to be disturbed by this feature.
1110 if (originalPipeline[0] && originalPipeline[0].$match && !originalPipeline[0].$match[discriminatorKey]) {
1111 originalPipeline[0].$match[discriminatorKey] = discriminatorValue;
1112 // `originalPipeline` is a ref, so there's no need for
1113 // aggregate._pipeline = originalPipeline
1114 } else if (originalPipeline[0] && originalPipeline[0].$geoNear) {
1115 originalPipeline[0].$geoNear.query =
1116 originalPipeline[0].$geoNear.query || {};
1117 originalPipeline[0].$geoNear.query[discriminatorKey] = discriminatorValue;
1118 } else {
1119 const match = {};
1120 match[discriminatorKey] = discriminatorValue;
1121 aggregate._pipeline.unshift({ $match: match });
1122 }
1123 }
1124}
1125
1126/*!
1127 * Exports
1128 */
1129
1130module.exports = Aggregate;