UNPKG

2.98 kBJavaScriptView Raw
1var BaseSampler = require('./base-sampler');
2var rawTransform = require('./raw-transform');
3var ReservoirSampler = require('./reservoir-sampler');
4var inherits = require('util').inherits;
5var debug = require('debug')('mongodb-collection-sample:native-sampler');
6
7/**
8 * A readable stream of sample of documents from a collection using the
9 * `$sample` aggregation operator.
10 *
11 * @param {mongodb.DB} db
12 * @param {String} collectionName to source from.
13 * @param {Object} opts
14 * @option {Object} query to refine possible samples [default: `{}`].
15 * @option {Array} fields to only return certain fields [default: null]
16 * @option {Number} size of the sample to capture [default: `5`].
17 * @option {Boolean} return document results as raw BSON buffers [default: `false`].
18 * @api public
19 */
20function NativeSampler(db, collectionName, opts) {
21 BaseSampler.call(this, db, collectionName, opts);
22 this.running = false;
23}
24inherits(NativeSampler, BaseSampler);
25
26NativeSampler.prototype._read = function() {
27 if (this.running) {
28 return;
29 }
30
31 this.running = true;
32
33 var options = {
34 maxTimeMS: this.maxTimeMS,
35 allowDiskUse: true,
36 promoteValues: this.promoteValues
37 };
38
39 this.collection.countDocuments(this.query, options, function(err, count) {
40 if (err) {
41 return this.emit('error', err);
42 }
43 debug('sampling %d documents from a collection with %d documents',
44 this.size, count);
45
46 // if we need more than 5% of all docs (but not all of them), use
47 // ReservoirSampler to avoid the blocking sort stage (SERVER-22815).
48 // if need raw output, always do native sampling
49 if (count > this.size && count <= this.size * 20) {
50 var reservoirSampler = new ReservoirSampler(this.db, this.collectionName, this.opts);
51 return reservoirSampler
52 .on('error', this.emit.bind(this, 'error'))
53 .on('data', this.push.bind(this))
54 .on('end', this.push.bind(this, null));
55 }
56 // else, use native sampler
57
58 // add $match stage if a query was specified
59 this.pipeline = [];
60 if (Object.keys(this.query).length > 0) {
61 this.pipeline.push({
62 $match: this.query
63 });
64 }
65
66 // only add $sample stage if the result set contains more
67 // documents than requested
68 if (count > this.size) {
69 this.pipeline.push({
70 $sample: {
71 size: this.size
72 }
73 });
74 }
75
76 // add $project stage if projection (fields) was specified
77 if (this.fields && Object.keys(this.fields).length > 0) {
78 this.pipeline.push({
79 $project: this.fields
80 });
81 }
82
83 options.raw = this.raw;
84 options.cursor = this.cursor;
85 options.batchSize = this.size;
86
87 var cursor = this.collection.aggregate(this.pipeline, options);
88
89 cursor.pipe(rawTransform(this.raw))
90 .on('error', this.emit.bind(this, 'error'))
91 .on('data', this.push.bind(this))
92 .on('end', this.push.bind(this, null));
93 }.bind(this));
94};
95
96module.exports = NativeSampler;