UNPKG

4.39 kBJavaScriptView Raw
1var BaseSampler = require('./base-sampler');
2var inherits = require('util').inherits;
3var es = require('event-stream');
4var Reservoir = require('reservoir');
5var _defaults = require('lodash.defaults');
6var _chunk = require('lodash.chunk');
7
8var debug = require('debug')('mongodb-collection-sample:reservoir-sampler');
9
10var RESERVOIR_SAMPLE_LIMIT = 10000;
11var RESERVOIR_CHUNK_SIZE = 1000;
12
13/**
14 * Does reservoir sampling and fetches the resulting documents from the
15 * collection.
16 *
17 * The query runs with a limit of `RESERVOIR_SAMPLE_LIMIT`. The reservoir
18 * then samples `size` _ids from the query result. These _ids get grouped
19 * into chunks of at most `RESERVOIR_CHUNK_SIZE`. For each chunk, a cursor
20 * is opened to fetch the actual documents. The cursor streams are combined
21 * and all the resulting documents are emitted downstream.
22
23 * @param {mongodb.Collection} collection The collection to sample from.
24 * @param {Number} size How many documents should be returned.
25 * @param {Object} opts
26 * @option {Object} query to refine possible samples [default: `{}`].
27 * @option {Number} size of the sample to capture [default: `5`].
28 * @option {Object} fields to return for each document [default: `null`].
29 * @option {Boolean} return document results as raw BSON buffers [default: `false`].
30 * @option {Number} chunkSize For chunked $in queries [default: `1000`].
31 * @return {stream.Readable}
32 * @api private
33 */
34function reservoirStream(collection, size, opts) {
35 opts = _defaults(opts || {}, {
36 chunkSize: RESERVOIR_CHUNK_SIZE,
37 promoteValues: true
38 });
39 var reservoir = new Reservoir(size);
40
41 var stream = es.through(
42 function write(data) {
43 // fill reservoir with ids
44 reservoir.pushSome(data);
45 },
46 function end() {
47 // split the reservoir of ids into smaller chunks
48 var chunks = _chunk(reservoir, opts.chunkSize);
49 // create cursors for chunks
50 var cursors = chunks.map(function(ids) {
51 var cursor = collection.find({ _id: { $in: ids }}, { promoteValues: opts.promoteValues, raw: opts.raw });
52 if (opts.fields) {
53 cursor.project(opts.fields);
54 }
55 if (opts.maxTimeMS) {
56 cursor.maxTimeMS(opts.maxTimeMS);
57 }
58 return cursor.stream();
59 });
60 // merge all cursors (order arbitrary) and emit docs
61 es.merge(cursors).pipe(es.through(function(doc) {
62 stream.emit('data', doc);
63 })).on('end', function() {
64 stream.emit('end');
65 });
66 }
67 );
68 return stream;
69}
70
71/**
72 * A readable stream of sample of documents from a collection via
73 * [reservoir sampling](https://en.wikipedia.org/wiki/Reservoir_sampling).
74 *
75 * @param {mongodb.DB} db
76 * @param {String} collectionName to source from.
77 * @param {Object} opts
78 * @option {Object} query to refine possible samples [default: `{}`].
79 * @option {Number} size of the sample to capture [default: `5`].
80 * @option {Object} fields to return for each document [default: `null`].
81 * @option {Number} chunkSize for chunked $in queries [default: `1000`].
82 * @api public
83 */
84function ReservoirSampler(db, collectionName, opts) {
85 this.running = false;
86 this.chunkSize = opts ? opts.chunkSize : undefined;
87 BaseSampler.call(this, db, collectionName, opts);
88}
89inherits(ReservoirSampler, BaseSampler);
90
91ReservoirSampler.prototype._read = function() {
92 if (this.running) {
93 return;
94 }
95 this.running = true;
96
97 debug('using query `%j`', this.query);
98 this.collection.countDocuments(this.query, function(err, count) {
99 if (err) {
100 return this.emit('error', err);
101 }
102
103 debug('sampling %d documents from a collection with %d documents',
104 this.size, count);
105
106 this.collection.find(this.query, {
107 projection: { _id: 1 },
108 sort: this.sort,
109 limit: RESERVOIR_SAMPLE_LIMIT
110 })
111 .stream()
112 .pipe(es.map(function(obj, cb) {
113 return cb(null, obj._id);
114 }))
115 .pipe(reservoirStream(this.collection, this.size, {
116 chunkSize: this.chunkSize,
117 fields: this.fields,
118 maxTimeMS: this.maxTimeMS,
119 raw: this.raw,
120 promoteValues: this.promoteValues
121 }))
122 .on('error', this.emit.bind(this, 'error'))
123 .on('data', this.push.bind(this))
124 .on('end', function() {
125 this.running = false;
126 this.push(null);
127 }.bind(this));
128 }.bind(this));
129};
130
131module.exports = ReservoirSampler;