1 | var BaseSampler = require('./base-sampler');
|
2 | var rawTransform = require('./raw-transform');
|
3 | var ReservoirSampler = require('./reservoir-sampler');
|
4 | var inherits = require('util').inherits;
|
5 | var debug = require('debug')('mongodb-collection-sample:native-sampler');
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 | function NativeSampler(db, collectionName, opts) {
|
21 | BaseSampler.call(this, db, collectionName, opts);
|
22 | this.running = false;
|
23 | }
|
24 | inherits(NativeSampler, BaseSampler);
|
25 |
|
26 | NativeSampler.prototype._read = function() {
|
27 | if (this.running) {
|
28 | return;
|
29 | }
|
30 |
|
31 | this.running = true;
|
32 |
|
33 | var options = {
|
34 | maxTimeMS: this.maxTimeMS,
|
35 | allowDiskUse: true,
|
36 | promoteValues: this.promoteValues
|
37 | };
|
38 |
|
39 | this.collection.countDocuments(this.query, options, function(err, count) {
|
40 | if (err) {
|
41 | return this.emit('error', err);
|
42 | }
|
43 | debug('sampling %d documents from a collection with %d documents',
|
44 | this.size, count);
|
45 |
|
46 |
|
47 |
|
48 |
|
49 | if (count > this.size && count <= this.size * 20) {
|
50 | var reservoirSampler = new ReservoirSampler(this.db, this.collectionName, this.opts);
|
51 | return reservoirSampler
|
52 | .on('error', this.emit.bind(this, 'error'))
|
53 | .on('data', this.push.bind(this))
|
54 | .on('end', this.push.bind(this, null));
|
55 | }
|
56 |
|
57 |
|
58 |
|
59 | this.pipeline = [];
|
60 | if (Object.keys(this.query).length > 0) {
|
61 | this.pipeline.push({
|
62 | $match: this.query
|
63 | });
|
64 | }
|
65 |
|
66 |
|
67 |
|
68 | if (count > this.size) {
|
69 | this.pipeline.push({
|
70 | $sample: {
|
71 | size: this.size
|
72 | }
|
73 | });
|
74 | }
|
75 |
|
76 |
|
77 | if (this.fields && Object.keys(this.fields).length > 0) {
|
78 | this.pipeline.push({
|
79 | $project: this.fields
|
80 | });
|
81 | }
|
82 |
|
83 | options.raw = this.raw;
|
84 | options.cursor = this.cursor;
|
85 | options.batchSize = this.size;
|
86 |
|
87 | var cursor = this.collection.aggregate(this.pipeline, options);
|
88 |
|
89 | cursor.pipe(rawTransform(this.raw))
|
90 | .on('error', this.emit.bind(this, 'error'))
|
91 | .on('data', this.push.bind(this))
|
92 | .on('end', this.push.bind(this, null));
|
93 | }.bind(this));
|
94 | };
|
95 |
|
96 | module.exports = NativeSampler;
|