UNPKG

31.7 kBJavaScriptView Raw
1'use strict';
2
3/*!
4 * Module dependencies
5 */
6
7const AggregationCursor = require('./cursor/AggregationCursor');
8const Query = require('./query');
9const util = require('util');
10const utils = require('./utils');
11const read = Query.prototype.read;
12const readConcern = Query.prototype.readConcern;
13
14/**
15 * Aggregate constructor used for building aggregation pipelines. Do not
16 * instantiate this class directly, use [Model.aggregate()](/docs/api.html#model_Model.aggregate) instead.
17 *
18 * ####Example:
19 *
20 * const aggregate = Model.aggregate([
21 * { $project: { a: 1, b: 1 } },
22 * { $skip: 5 }
23 * ]);
24 *
25 * Model.
26 * aggregate([{ $match: { age: { $gte: 21 }}}]).
27 * unwind('tags').
28 * exec(callback);
29 *
30 * ####Note:
31 *
32 * - The documents returned are plain javascript objects, not mongoose documents (since any shape of document can be returned).
33 * - Mongoose does **not** cast pipeline stages. The below will **not** work unless `_id` is a string in the database
34 *
35 * ```javascript
36 * new Aggregate([{ $match: { _id: '00000000000000000000000a' } }]);
37 * // Do this instead to cast to an ObjectId
38 * new Aggregate([{ $match: { _id: mongoose.Types.ObjectId('00000000000000000000000a') } }]);
39 * ```
40 *
41 * @see MongoDB http://docs.mongodb.org/manual/applications/aggregation/
42 * @see driver http://mongodb.github.com/node-mongodb-native/api-generated/collection.html#aggregate
43 * @param {Array} [pipeline] aggregation pipeline as an array of objects
44 * @api public
45 */
46
47function Aggregate(pipeline) {
48 this._pipeline = [];
49 this._model = undefined;
50 this.options = {};
51
52 if (arguments.length === 1 && util.isArray(pipeline)) {
53 this.append.apply(this, pipeline);
54 }
55}
56
57/**
58 * Contains options passed down to the [aggregate command](https://docs.mongodb.com/manual/reference/command/aggregate/).
59 * Supported options are:
60 *
61 * - `readPreference`
62 * - [`cursor`](./api.html#aggregate_Aggregate-cursor)
63 * - [`explain`](./api.html#aggregate_Aggregate-explain)
64 * - [`allowDiskUse`](./api.html#aggregate_Aggregate-allowDiskUse)
65 * - `maxTimeMS`
66 * - `bypassDocumentValidation`
67 * - `raw`
68 * - `promoteLongs`
69 * - `promoteValues`
70 * - `promoteBuffers`
71 * - [`collation`](./api.html#aggregate_Aggregate-collation)
72 * - `comment`
73 * - [`session`](./api.html#aggregate_Aggregate-session)
74 *
75 * @property options
76 * @memberOf Aggregate
77 * @api public
78 */
79
80Aggregate.prototype.options;
81
82/**
83 * Binds this aggregate to a model.
84 *
85 * @param {Model} model the model to which the aggregate is to be bound
86 * @return {Aggregate}
87 * @api public
88 */
89
90Aggregate.prototype.model = function(model) {
91 this._model = model;
92 if (model.schema != null) {
93 if (this.options.readPreference == null &&
94 model.schema.options.read != null) {
95 this.options.readPreference = model.schema.options.read;
96 }
97 if (this.options.collation == null &&
98 model.schema.options.collation != null) {
99 this.options.collation = model.schema.options.collation;
100 }
101 }
102 return this;
103};
104
105/**
106 * Appends new operators to this aggregate pipeline
107 *
108 * ####Examples:
109 *
110 * aggregate.append({ $project: { field: 1 }}, { $limit: 2 });
111 *
112 * // or pass an array
113 * var pipeline = [{ $match: { daw: 'Logic Audio X' }} ];
114 * aggregate.append(pipeline);
115 *
116 * @param {Object} ops operator(s) to append
117 * @return {Aggregate}
118 * @api public
119 */
120
121Aggregate.prototype.append = function() {
122 const args = (arguments.length === 1 && util.isArray(arguments[0]))
123 ? arguments[0]
124 : utils.args(arguments);
125
126 if (!args.every(isOperator)) {
127 throw new Error('Arguments must be aggregate pipeline operators');
128 }
129
130 this._pipeline = this._pipeline.concat(args);
131
132 return this;
133};
134
135/**
136 * Appends a new $addFields operator to this aggregate pipeline.
137 * Requires MongoDB v3.4+ to work
138 *
139 * ####Examples:
140 *
141 * // adding new fields based on existing fields
142 * aggregate.addFields({
143 * newField: '$b.nested'
144 * , plusTen: { $add: ['$val', 10]}
145 * , sub: {
146 * name: '$a'
147 * }
148 * })
149 *
150 * // etc
151 * aggregate.addFields({ salary_k: { $divide: [ "$salary", 1000 ] } });
152 *
153 * @param {Object} arg field specification
154 * @see $addFields https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/
155 * @return {Aggregate}
156 * @api public
157 */
158Aggregate.prototype.addFields = function(arg) {
159 const fields = {};
160 if (typeof arg === 'object' && !util.isArray(arg)) {
161 Object.keys(arg).forEach(function(field) {
162 fields[field] = arg[field];
163 });
164 } else {
165 throw new Error('Invalid addFields() argument. Must be an object');
166 }
167 return this.append({$addFields: fields});
168};
169
170/**
171 * Appends a new $project operator to this aggregate pipeline.
172 *
173 * Mongoose query [selection syntax](#query_Query-select) is also supported.
174 *
175 * ####Examples:
176 *
177 * // include a, include b, exclude _id
178 * aggregate.project("a b -_id");
179 *
180 * // or you may use object notation, useful when
181 * // you have keys already prefixed with a "-"
182 * aggregate.project({a: 1, b: 1, _id: 0});
183 *
184 * // reshaping documents
185 * aggregate.project({
186 * newField: '$b.nested'
187 * , plusTen: { $add: ['$val', 10]}
188 * , sub: {
189 * name: '$a'
190 * }
191 * })
192 *
193 * // etc
194 * aggregate.project({ salary_k: { $divide: [ "$salary", 1000 ] } });
195 *
196 * @param {Object|String} arg field specification
197 * @see projection http://docs.mongodb.org/manual/reference/aggregation/project/
198 * @return {Aggregate}
199 * @api public
200 */
201
202Aggregate.prototype.project = function(arg) {
203 const fields = {};
204
205 if (typeof arg === 'object' && !util.isArray(arg)) {
206 Object.keys(arg).forEach(function(field) {
207 fields[field] = arg[field];
208 });
209 } else if (arguments.length === 1 && typeof arg === 'string') {
210 arg.split(/\s+/).forEach(function(field) {
211 if (!field) {
212 return;
213 }
214 const include = field[0] === '-' ? 0 : 1;
215 if (include === 0) {
216 field = field.substring(1);
217 }
218 fields[field] = include;
219 });
220 } else {
221 throw new Error('Invalid project() argument. Must be string or object');
222 }
223
224 return this.append({$project: fields});
225};
226
227/**
228 * Appends a new custom $group operator to this aggregate pipeline.
229 *
230 * ####Examples:
231 *
232 * aggregate.group({ _id: "$department" });
233 *
234 * @see $group http://docs.mongodb.org/manual/reference/aggregation/group/
235 * @method group
236 * @memberOf Aggregate
237 * @instance
238 * @param {Object} arg $group operator contents
239 * @return {Aggregate}
240 * @api public
241 */
242
243/**
244 * Appends a new custom $match operator to this aggregate pipeline.
245 *
246 * ####Examples:
247 *
248 * aggregate.match({ department: { $in: [ "sales", "engineering" ] } });
249 *
250 * @see $match http://docs.mongodb.org/manual/reference/aggregation/match/
251 * @method match
252 * @memberOf Aggregate
253 * @instance
254 * @param {Object} arg $match operator contents
255 * @return {Aggregate}
256 * @api public
257 */
258
259/**
260 * Appends a new $skip operator to this aggregate pipeline.
261 *
262 * ####Examples:
263 *
264 * aggregate.skip(10);
265 *
266 * @see $skip http://docs.mongodb.org/manual/reference/aggregation/skip/
267 * @method skip
268 * @memberOf Aggregate
269 * @instance
270 * @param {Number} num number of records to skip before next stage
271 * @return {Aggregate}
272 * @api public
273 */
274
275/**
276 * Appends a new $limit operator to this aggregate pipeline.
277 *
278 * ####Examples:
279 *
280 * aggregate.limit(10);
281 *
282 * @see $limit http://docs.mongodb.org/manual/reference/aggregation/limit/
283 * @method limit
284 * @memberOf Aggregate
285 * @instance
286 * @param {Number} num maximum number of records to pass to the next stage
287 * @return {Aggregate}
288 * @api public
289 */
290
291/**
292 * Appends a new $geoNear operator to this aggregate pipeline.
293 *
294 * ####NOTE:
295 *
296 * **MUST** be used as the first operator in the pipeline.
297 *
298 * ####Examples:
299 *
300 * aggregate.near({
301 * near: [40.724, -73.997],
302 * distanceField: "dist.calculated", // required
303 * maxDistance: 0.008,
304 * query: { type: "public" },
305 * includeLocs: "dist.location",
306 * uniqueDocs: true,
307 * num: 5
308 * });
309 *
310 * @see $geoNear http://docs.mongodb.org/manual/reference/aggregation/geoNear/
311 * @method near
312 * @memberOf Aggregate
313 * @instance
314 * @param {Object} arg
315 * @return {Aggregate}
316 * @api public
317 */
318
319Aggregate.prototype.near = function(arg) {
320 const op = {};
321 op.$geoNear = arg;
322 return this.append(op);
323};
324
325/*!
326 * define methods
327 */
328
329'group match skip limit out'.split(' ').forEach(function($operator) {
330 Aggregate.prototype[$operator] = function(arg) {
331 const op = {};
332 op['$' + $operator] = arg;
333 return this.append(op);
334 };
335});
336
337/**
338 * Appends new custom $unwind operator(s) to this aggregate pipeline.
339 *
340 * Note that the `$unwind` operator requires the path name to start with '$'.
341 * Mongoose will prepend '$' if the specified field doesn't start '$'.
342 *
343 * ####Examples:
344 *
345 * aggregate.unwind("tags");
346 * aggregate.unwind("a", "b", "c");
347 *
348 * @see $unwind http://docs.mongodb.org/manual/reference/aggregation/unwind/
349 * @param {String} fields the field(s) to unwind
350 * @return {Aggregate}
351 * @api public
352 */
353
354Aggregate.prototype.unwind = function() {
355 const args = utils.args(arguments);
356
357 const res = [];
358 for (let i = 0; i < args.length; ++i) {
359 const arg = args[i];
360 if (arg && typeof arg === 'object') {
361 res.push({ $unwind: arg });
362 } else if (typeof arg === 'string') {
363 res.push({
364 $unwind: (arg && arg.charAt(0) === '$') ? arg : '$' + arg
365 });
366 } else {
367 throw new Error('Invalid arg "' + arg + '" to unwind(), ' +
368 'must be string or object');
369 }
370 }
371
372 return this.append.apply(this, res);
373};
374
375/**
376 * Appends a new $replaceRoot operator to this aggregate pipeline.
377 *
378 * Note that the `$replaceRoot` operator requires field strings to start with '$'.
379 * If you are passing in a string Mongoose will prepend '$' if the specified field doesn't start '$'.
380 * If you are passing in an object the strings in your expression will not be altered.
381 *
382 * ####Examples:
383 *
384 * aggregate.replaceRoot("user");
385 *
386 * aggregate.replaceRoot({ x: { $concat: ['$this', '$that'] } });
387 *
388 * @see $replaceRoot https://docs.mongodb.org/manual/reference/operator/aggregation/replaceRoot
389 * @param {String|Object} the field or document which will become the new root document
390 * @return {Aggregate}
391 * @api public
392 */
393
394Aggregate.prototype.replaceRoot = function(newRoot) {
395 let ret;
396
397 if (typeof newRoot === 'string') {
398 ret = newRoot.startsWith('$') ? newRoot : '$' + newRoot;
399 } else {
400 ret = newRoot;
401 }
402
403 return this.append({
404 $replaceRoot: {
405 newRoot: ret
406 }
407 });
408};
409
410/**
411 * Appends a new $count operator to this aggregate pipeline.
412 *
413 * ####Examples:
414 *
415 * aggregate.count("userCount");
416 *
417 * @see $count https://docs.mongodb.org/manual/reference/operator/aggregation/count
418 * @param {String} the name of the count field
419 * @return {Aggregate}
420 * @api public
421 */
422
423Aggregate.prototype.count = function(countName) {
424 return this.append({ $count: countName });
425};
426
427/**
428 * Appends a new $sortByCount operator to this aggregate pipeline. Accepts either a string field name
429 * or a pipeline object.
430 *
431 * Note that the `$sortByCount` operator requires the new root to start with '$'.
432 * Mongoose will prepend '$' if the specified field name doesn't start with '$'.
433 *
434 * ####Examples:
435 *
436 * aggregate.sortByCount('users');
437 * aggregate.sortByCount({ $mergeObjects: [ "$employee", "$business" ] })
438 *
439 * @see $sortByCount https://docs.mongodb.com/manual/reference/operator/aggregation/sortByCount/
440 * @param {Object|String} arg
441 * @return {Aggregate} this
442 * @api public
443 */
444
445Aggregate.prototype.sortByCount = function(arg) {
446 if (arg && typeof arg === 'object') {
447 return this.append({ $sortByCount: arg });
448 } else if (typeof arg === 'string') {
449 return this.append({
450 $sortByCount: (arg && arg.charAt(0) === '$') ? arg : '$' + arg
451 });
452 } else {
453 throw new TypeError('Invalid arg "' + arg + '" to sortByCount(), ' +
454 'must be string or object');
455 }
456};
457
458/**
459 * Appends new custom $lookup operator(s) to this aggregate pipeline.
460 *
461 * ####Examples:
462 *
463 * aggregate.lookup({ from: 'users', localField: 'userId', foreignField: '_id', as: 'users' });
464 *
465 * @see $lookup https://docs.mongodb.org/manual/reference/operator/aggregation/lookup/#pipe._S_lookup
466 * @param {Object} options to $lookup as described in the above link
467 * @return {Aggregate}
468 * @api public
469 */
470
471Aggregate.prototype.lookup = function(options) {
472 return this.append({$lookup: options});
473};
474
475/**
476 * Appends new custom $graphLookup operator(s) to this aggregate pipeline, performing a recursive search on a collection.
477 *
478 * Note that graphLookup can only consume at most 100MB of memory, and does not allow disk use even if `{ allowDiskUse: true }` is specified.
479 *
480 * #### Examples:
481 * // Suppose we have a collection of courses, where a document might look like `{ _id: 0, name: 'Calculus', prerequisite: 'Trigonometry'}` and `{ _id: 0, name: 'Trigonometry', prerequisite: 'Algebra' }`
482 * aggregate.graphLookup({ from: 'courses', startWith: '$prerequisite', connectFromField: 'prerequisite', connectToField: 'name', as: 'prerequisites', maxDepth: 3 }) // this will recursively search the 'courses' collection up to 3 prerequisites
483 *
484 * @see $graphLookup https://docs.mongodb.com/manual/reference/operator/aggregation/graphLookup/#pipe._S_graphLookup
485 * @param {Object} options to $graphLookup as described in the above link
486 * @return {Aggregate}
487 * @api public
488 */
489
490Aggregate.prototype.graphLookup = function(options) {
491 const cloneOptions = {};
492 if (options) {
493 if (!utils.isObject(options)) {
494 throw new TypeError('Invalid graphLookup() argument. Must be an object.');
495 }
496
497 utils.mergeClone(cloneOptions, options);
498 const startWith = cloneOptions.startWith;
499
500 if (startWith && typeof startWith === 'string') {
501 cloneOptions.startWith = cloneOptions.startWith.charAt(0) === '$' ?
502 cloneOptions.startWith :
503 '$' + cloneOptions.startWith;
504 }
505
506 }
507 return this.append({ $graphLookup: cloneOptions });
508};
509
510/**
511 * Appends new custom $sample operator(s) to this aggregate pipeline.
512 *
513 * ####Examples:
514 *
515 * aggregate.sample(3); // Add a pipeline that picks 3 random documents
516 *
517 * @see $sample https://docs.mongodb.org/manual/reference/operator/aggregation/sample/#pipe._S_sample
518 * @param {Number} size number of random documents to pick
519 * @return {Aggregate}
520 * @api public
521 */
522
523Aggregate.prototype.sample = function(size) {
524 return this.append({$sample: {size: size}});
525};
526
527/**
528 * Appends a new $sort operator to this aggregate pipeline.
529 *
530 * If an object is passed, values allowed are `asc`, `desc`, `ascending`, `descending`, `1`, and `-1`.
531 *
532 * If a string is passed, it must be a space delimited list of path names. The sort order of each path is ascending unless the path name is prefixed with `-` which will be treated as descending.
533 *
534 * ####Examples:
535 *
536 * // these are equivalent
537 * aggregate.sort({ field: 'asc', test: -1 });
538 * aggregate.sort('field -test');
539 *
540 * @see $sort http://docs.mongodb.org/manual/reference/aggregation/sort/
541 * @param {Object|String} arg
542 * @return {Aggregate} this
543 * @api public
544 */
545
546Aggregate.prototype.sort = function(arg) {
547 // TODO refactor to reuse the query builder logic
548
549 const sort = {};
550
551 if (arg.constructor.name === 'Object') {
552 const desc = ['desc', 'descending', -1];
553 Object.keys(arg).forEach(function(field) {
554 // If sorting by text score, skip coercing into 1/-1
555 if (arg[field] instanceof Object && arg[field].$meta) {
556 sort[field] = arg[field];
557 return;
558 }
559 sort[field] = desc.indexOf(arg[field]) === -1 ? 1 : -1;
560 });
561 } else if (arguments.length === 1 && typeof arg === 'string') {
562 arg.split(/\s+/).forEach(function(field) {
563 if (!field) {
564 return;
565 }
566 const ascend = field[0] === '-' ? -1 : 1;
567 if (ascend === -1) {
568 field = field.substring(1);
569 }
570 sort[field] = ascend;
571 });
572 } else {
573 throw new TypeError('Invalid sort() argument. Must be a string or object.');
574 }
575
576 return this.append({$sort: sort});
577};
578
579/**
580 * Sets the readPreference option for the aggregation query.
581 *
582 * ####Example:
583 *
584 * Model.aggregate(..).read('primaryPreferred').exec(callback)
585 *
586 * @param {String} pref one of the listed preference options or their aliases
587 * @param {Array} [tags] optional tags for this query
588 * @return {Aggregate} this
589 * @api public
590 * @see mongodb http://docs.mongodb.org/manual/applications/replication/#read-preference
591 * @see driver http://mongodb.github.com/node-mongodb-native/driver-articles/anintroductionto1_1and2_2.html#read-preferences
592 */
593
594Aggregate.prototype.read = function(pref, tags) {
595 if (!this.options) {
596 this.options = {};
597 }
598 read.call(this, pref, tags);
599 return this;
600};
601
602/**
603 * Sets the readConcern level for the aggregation query.
604 *
605 * ####Example:
606 *
607 * Model.aggregate(..).readConcern('majority').exec(callback)
608 *
609 * @param {String} level one of the listed read concern level or their aliases
610 * @see mongodb https://docs.mongodb.com/manual/reference/read-concern/
611 * @return {Aggregate} this
612 * @api public
613 */
614
615Aggregate.prototype.readConcern = function(level) {
616 if (!this.options) {
617 this.options = {};
618 }
619 readConcern.call(this, level);
620 return this;
621};
622
623/**
624 * Appends a new $redact operator to this aggregate pipeline.
625 *
626 * If 3 arguments are supplied, Mongoose will wrap them with if-then-else of $cond operator respectively
627 * If `thenExpr` or `elseExpr` is string, make sure it starts with $$, like `$$DESCEND`, `$$PRUNE` or `$$KEEP`.
628 *
629 * ####Example:
630 *
631 * Model.aggregate(...)
632 * .redact({
633 * $cond: {
634 * if: { $eq: [ '$level', 5 ] },
635 * then: '$$PRUNE',
636 * else: '$$DESCEND'
637 * }
638 * })
639 * .exec();
640 *
641 * // $redact often comes with $cond operator, you can also use the following syntax provided by mongoose
642 * Model.aggregate(...)
643 * .redact({ $eq: [ '$level', 5 ] }, '$$PRUNE', '$$DESCEND')
644 * .exec();
645 *
646 * @param {Object} expression redact options or conditional expression
647 * @param {String|Object} [thenExpr] true case for the condition
648 * @param {String|Object} [elseExpr] false case for the condition
649 * @return {Aggregate} this
650 * @see $redact https://docs.mongodb.com/manual/reference/operator/aggregation/redact/
651 * @api public
652 */
653
654Aggregate.prototype.redact = function(expression, thenExpr, elseExpr) {
655 if (arguments.length === 3) {
656 if ((typeof thenExpr === 'string' && !thenExpr.startsWith('$$')) ||
657 (typeof elseExpr === 'string' && !elseExpr.startsWith('$$'))) {
658 throw new Error('If thenExpr or elseExpr is string, it must start with $$. e.g. $$DESCEND, $$PRUNE, $$KEEP');
659 }
660
661 expression = {
662 $cond: {
663 if: expression,
664 then: thenExpr,
665 else: elseExpr
666 }
667 };
668 } else if (arguments.length !== 1) {
669 throw new TypeError('Invalid arguments');
670 }
671
672 return this.append({$redact: expression});
673};
674
675/**
676 * Execute the aggregation with explain
677 *
678 * ####Example:
679 *
680 * Model.aggregate(..).explain(callback)
681 *
682 * @param {Function} callback
683 * @return {Promise}
684 */
685
686Aggregate.prototype.explain = function(callback) {
687 return utils.promiseOrCallback(callback, cb => {
688 if (!this._pipeline.length) {
689 const err = new Error('Aggregate has empty pipeline');
690 return cb(err);
691 }
692
693 prepareDiscriminatorPipeline(this);
694
695 this._model.collection.
696 aggregate(this._pipeline, this.options || {}).
697 explain(function(error, result) {
698 if (error) {
699 return cb(error);
700 }
701 cb(null, result);
702 });
703 }, this._model.events);
704};
705
706/**
707 * Sets the allowDiskUse option for the aggregation query (ignored for < 2.6.0)
708 *
709 * ####Example:
710 *
711 * await Model.aggregate([{ $match: { foo: 'bar' } }]).allowDiskUse(true);
712 *
713 * @param {Boolean} value Should tell server it can use hard drive to store data during aggregation.
714 * @param {Array} [tags] optional tags for this query
715 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
716 */
717
718Aggregate.prototype.allowDiskUse = function(value) {
719 this.options.allowDiskUse = value;
720 return this;
721};
722
723/**
724 * Sets the hint option for the aggregation query (ignored for < 3.6.0)
725 *
726 * ####Example:
727 *
728 * Model.aggregate(..).hint({ qty: 1, category: 1 } }).exec(callback)
729 *
730 * @param {Object|String} value a hint object or the index name
731 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
732 */
733
734Aggregate.prototype.hint = function(value) {
735 this.options.hint = value;
736 return this;
737};
738
739/**
740 * Sets the session for this aggregation. Useful for [transactions](/docs/transactions.html).
741 *
742 * ####Example:
743 *
744 * const session = await Model.startSession();
745 * await Model.aggregate(..).session(session);
746 *
747 * @param {ClientSession} session
748 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
749 */
750
751Aggregate.prototype.session = function(session) {
752 if (session == null) {
753 delete this.options.session;
754 } else {
755 this.options.session = session;
756 }
757 return this;
758};
759
760/**
761 * Lets you set arbitrary options, for middleware or plugins.
762 *
763 * ####Example:
764 *
765 * var agg = Model.aggregate(..).option({ allowDiskUse: true }); // Set the `allowDiskUse` option
766 * agg.options; // `{ allowDiskUse: true }`
767 *
768 * @param {Object} options keys to merge into current options
769 * @param [options.maxTimeMS] number limits the time this aggregation will run, see [MongoDB docs on `maxTimeMS`](https://docs.mongodb.com/manual/reference/operator/meta/maxTimeMS/)
770 * @param [options.allowDiskUse] boolean if true, the MongoDB server will use the hard drive to store data during this aggregation
771 * @param [options.collation] object see [`Aggregate.prototype.collation()`](./docs/api.html#aggregate_Aggregate-collation)
772 * @param [options.session] ClientSession see [`Aggregate.prototype.session()`](./docs/api.html#aggregate_Aggregate-session)
773 * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
774 * @return {Aggregate} this
775 * @api public
776 */
777
778Aggregate.prototype.option = function(value) {
779 for (const key in value) {
780 this.options[key] = value[key];
781 }
782 return this;
783};
784
785/**
786 * Sets the cursor option option for the aggregation query (ignored for < 2.6.0).
787 * Note the different syntax below: .exec() returns a cursor object, and no callback
788 * is necessary.
789 *
790 * ####Example:
791 *
792 * var cursor = Model.aggregate(..).cursor({ batchSize: 1000 }).exec();
793 * cursor.each(function(error, doc) {
794 * // use doc
795 * });
796 *
797 * @param {Object} options
798 * @param {Number} options.batchSize set the cursor batch size
799 * @param {Boolean} [options.useMongooseAggCursor] use experimental mongoose-specific aggregation cursor (for `eachAsync()` and other query cursor semantics)
800 * @return {Aggregate} this
801 * @api public
802 * @see mongodb http://mongodb.github.io/node-mongodb-native/2.0/api/AggregationCursor.html
803 */
804
805Aggregate.prototype.cursor = function(options) {
806 if (!this.options) {
807 this.options = {};
808 }
809 this.options.cursor = options || {};
810 return this;
811};
812
813/**
814 * Sets an option on this aggregation. This function will be deprecated in a
815 * future release. Use the [`cursor()`](./api.html#aggregate_Aggregate-cursor),
816 * [`collation()`](./api.html#aggregate_Aggregate-collation), etc. helpers to
817 * set individual options, or access `agg.options` directly.
818 *
819 * Note that MongoDB aggregations [do **not** support the `noCursorTimeout` flag](https://jira.mongodb.org/browse/SERVER-6036),
820 * if you try setting that flag with this function you will get a "unrecognized field 'noCursorTimeout'" error.
821 *
822 * @param {String} flag
823 * @param {Boolean} value
824 * @return {Aggregate} this
825 * @api public
826 * @deprecated Use [`.option()`](api.html#aggregate_Aggregate-option) instead. Note that MongoDB aggregations do **not** support a `noCursorTimeout` option.
827 */
828
829Aggregate.prototype.addCursorFlag = util.deprecate(function(flag, value) {
830 if (!this.options) {
831 this.options = {};
832 }
833 this.options[flag] = value;
834 return this;
835}, 'Mongoose: `Aggregate#addCursorFlag()` is deprecated, use `option()` instead');
836
837/**
838 * Adds a collation
839 *
840 * ####Example:
841 *
842 * Model.aggregate(..).collation({ locale: 'en_US', strength: 1 }).exec();
843 *
844 * @param {Object} collation options
845 * @return {Aggregate} this
846 * @api public
847 * @see mongodb http://mongodb.github.io/node-mongodb-native/2.2/api/Collection.html#aggregate
848 */
849
850Aggregate.prototype.collation = function(collation) {
851 if (!this.options) {
852 this.options = {};
853 }
854 this.options.collation = collation;
855 return this;
856};
857
858/**
859 * Combines multiple aggregation pipelines.
860 *
861 * ####Example:
862 *
863 * Model.aggregate(...)
864 * .facet({
865 * books: [{ groupBy: '$author' }],
866 * price: [{ $bucketAuto: { groupBy: '$price', buckets: 2 } }]
867 * })
868 * .exec();
869 *
870 * // Output: { books: [...], price: [{...}, {...}] }
871 *
872 * @param {Object} facet options
873 * @return {Aggregate} this
874 * @see $facet https://docs.mongodb.com/v3.4/reference/operator/aggregation/facet/
875 * @api public
876 */
877
878Aggregate.prototype.facet = function(options) {
879 return this.append({$facet: options});
880};
881
882/**
883 * Returns the current pipeline
884 *
885 * ####Example:
886 *
887 * MyModel.aggregate().match({ test: 1 }).pipeline(); // [{ $match: { test: 1 } }]
888 *
889 * @return {Array}
890 * @api public
891 */
892
893
894Aggregate.prototype.pipeline = function() {
895 return this._pipeline;
896};
897
898/**
899 * Executes the aggregate pipeline on the currently bound Model.
900 *
901 * ####Example:
902 *
903 * aggregate.exec(callback);
904 *
905 * // Because a promise is returned, the `callback` is optional.
906 * var promise = aggregate.exec();
907 * promise.then(..);
908 *
909 * @see Promise #promise_Promise
910 * @param {Function} [callback]
911 * @return {Promise}
912 * @api public
913 */
914
915Aggregate.prototype.exec = function(callback) {
916 if (!this._model) {
917 throw new Error('Aggregate not bound to any Model');
918 }
919 const model = this._model;
920 const options = utils.clone(this.options || {});
921 const pipeline = this._pipeline;
922 const collection = this._model.collection;
923
924 if (options && options.cursor) {
925 return new AggregationCursor(this);
926 }
927
928 return utils.promiseOrCallback(callback, cb => {
929 if (!pipeline.length) {
930 const err = new Error('Aggregate has empty pipeline');
931 return cb(err);
932 }
933
934 prepareDiscriminatorPipeline(this);
935
936 model.hooks.execPre('aggregate', this, error => {
937 if (error) {
938 const _opts = { error: error };
939 return model.hooks.execPost('aggregate', this, [null], _opts, error => {
940 cb(error);
941 });
942 }
943
944 collection.aggregate(pipeline, options, (error, cursor) => {
945 if (error) {
946 const _opts = { error: error };
947 return model.hooks.execPost('aggregate', this, [null], _opts, error => {
948 if (error) {
949 return cb(error);
950 }
951 return cb(null);
952 });
953 }
954 cursor.toArray((error, result) => {
955 const _opts = { error: error };
956 model.hooks.execPost('aggregate', this, [result], _opts, (error, result) => {
957 if (error) {
958 return cb(error);
959 }
960
961 cb(null, result);
962 });
963 });
964 });
965 });
966 }, model.events);
967};
968
969/**
970 * Provides promise for aggregate.
971 *
972 * ####Example:
973 *
974 * Model.aggregate(..).then(successCallback, errorCallback);
975 *
976 * @see Promise #promise_Promise
977 * @param {Function} [resolve] successCallback
978 * @param {Function} [reject] errorCallback
979 * @return {Promise}
980 */
981Aggregate.prototype.then = function(resolve, reject) {
982 return this.exec().then(resolve, reject);
983};
984
985/**
986 * Executes the query returning a `Promise` which will be
987 * resolved with either the doc(s) or rejected with the error.
988 * Like [`.then()`](#query_Query-then), but only takes a rejection handler.
989 *
990 * @param {Function} [reject]
991 * @return {Promise}
992 * @api public
993 */
994
995Aggregate.prototype.catch = function(reject) {
996 return this.exec().then(null, reject);
997};
998
999/**
1000 * Returns an asyncIterator for use with [`for/await/of` loops](http://bit.ly/async-iterators)
1001 * This function *only* works for `find()` queries.
1002 * You do not need to call this function explicitly, the JavaScript runtime
1003 * will call it for you.
1004 *
1005 * ####Example
1006 *
1007 * for await (const doc of Model.find().sort({ name: 1 })) {
1008 * console.log(doc.name);
1009 * }
1010 *
1011 * Node.js 10.x supports async iterators natively without any flags. You can
1012 * enable async iterators in Node.js 8.x using the [`--harmony_async_iteration` flag](https://github.com/tc39/proposal-async-iteration/issues/117#issuecomment-346695187).
1013 *
1014 * **Note:** This function is not if `Symbol.asyncIterator` is undefined. If
1015 * `Symbol.asyncIterator` is undefined, that means your Node.js version does not
1016 * support async iterators.
1017 *
1018 * @method Symbol.asyncIterator
1019 * @memberOf Aggregate
1020 * @instance
1021 * @api public
1022 */
1023
1024if (Symbol.asyncIterator != null) {
1025 Aggregate.prototype[Symbol.asyncIterator] = function() {
1026 return this.cursor({ useMongooseAggCursor: true }).
1027 exec().
1028 transformNull().
1029 map(doc => {
1030 return doc == null ? { done: true } : { value: doc, done: false };
1031 });
1032 };
1033}
1034
1035/*!
1036 * Helpers
1037 */
1038
1039/**
1040 * Checks whether an object is likely a pipeline operator
1041 *
1042 * @param {Object} obj object to check
1043 * @return {Boolean}
1044 * @api private
1045 */
1046
1047function isOperator(obj) {
1048 if (typeof obj !== 'object') {
1049 return false;
1050 }
1051
1052 const k = Object.keys(obj);
1053
1054 return k.length === 1 && k.some(key => { return key[0] === '$'; });
1055}
1056
1057/*!
1058 * Adds the appropriate `$match` pipeline step to the top of an aggregate's
1059 * pipeline, should it's model is a non-root discriminator type. This is
1060 * analogous to the `prepareDiscriminatorCriteria` function in `lib/query.js`.
1061 *
1062 * @param {Aggregate} aggregate Aggregate to prepare
1063 */
1064
1065Aggregate._prepareDiscriminatorPipeline = prepareDiscriminatorPipeline;
1066
1067function prepareDiscriminatorPipeline(aggregate) {
1068 const schema = aggregate._model.schema;
1069 const discriminatorMapping = schema && schema.discriminatorMapping;
1070
1071 if (discriminatorMapping && !discriminatorMapping.isRoot) {
1072 const originalPipeline = aggregate._pipeline;
1073 const discriminatorKey = discriminatorMapping.key;
1074 const discriminatorValue = discriminatorMapping.value;
1075
1076 // If the first pipeline stage is a match and it doesn't specify a `__t`
1077 // key, add the discriminator key to it. This allows for potential
1078 // aggregation query optimizations not to be disturbed by this feature.
1079 if (originalPipeline[0] && originalPipeline[0].$match && !originalPipeline[0].$match[discriminatorKey]) {
1080 originalPipeline[0].$match[discriminatorKey] = discriminatorValue;
1081 // `originalPipeline` is a ref, so there's no need for
1082 // aggregate._pipeline = originalPipeline
1083 } else if (originalPipeline[0] && originalPipeline[0].$geoNear) {
1084 originalPipeline[0].$geoNear.query =
1085 originalPipeline[0].$geoNear.query || {};
1086 originalPipeline[0].$geoNear.query[discriminatorKey] = discriminatorValue;
1087 } else {
1088 const match = {};
1089 match[discriminatorKey] = discriminatorValue;
1090 aggregate._pipeline.unshift({ $match: match });
1091 }
1092 }
1093}
1094
1095/*!
1096 * Exports
1097 */
1098
1099module.exports = Aggregate;