1 | var BaseSampler = require('./base-sampler');
|
2 | var inherits = require('util').inherits;
|
3 | var es = require('event-stream');
|
4 | var Reservoir = require('reservoir');
|
5 | var _defaults = require('lodash.defaults');
|
6 | var _chunk = require('lodash.chunk');
|
7 |
|
8 | var debug = require('debug')('mongodb-collection-sample:reservoir-sampler');
|
9 |
|
10 | var RESERVOIR_SAMPLE_LIMIT = 10000;
|
11 | var RESERVOIR_CHUNK_SIZE = 1000;
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 | function reservoirStream(collection, size, opts) {
|
35 | opts = _defaults(opts || {}, {
|
36 | chunkSize: RESERVOIR_CHUNK_SIZE,
|
37 | promoteValues: true
|
38 | });
|
39 | var reservoir = new Reservoir(size);
|
40 |
|
41 | var stream = es.through(
|
42 | function write(data) {
|
43 |
|
44 | reservoir.pushSome(data);
|
45 | },
|
46 | function end() {
|
47 |
|
48 | var chunks = _chunk(reservoir, opts.chunkSize);
|
49 |
|
50 | var cursors = chunks.map(function(ids) {
|
51 | var cursor = collection.find({ _id: { $in: ids }}, { promoteValues: opts.promoteValues, raw: opts.raw });
|
52 | if (opts.fields) {
|
53 | cursor.project(opts.fields);
|
54 | }
|
55 | if (opts.maxTimeMS) {
|
56 | cursor.maxTimeMS(opts.maxTimeMS);
|
57 | }
|
58 | return cursor.stream();
|
59 | });
|
60 |
|
61 | es.merge(cursors).pipe(es.through(function(doc) {
|
62 | stream.emit('data', doc);
|
63 | })).on('end', function() {
|
64 | stream.emit('end');
|
65 | });
|
66 | }
|
67 | );
|
68 | return stream;
|
69 | }
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 |
|
78 |
|
79 |
|
80 |
|
81 |
|
82 |
|
83 |
|
84 | function ReservoirSampler(db, collectionName, opts) {
|
85 | this.running = false;
|
86 | this.chunkSize = opts ? opts.chunkSize : undefined;
|
87 | BaseSampler.call(this, db, collectionName, opts);
|
88 | }
|
89 | inherits(ReservoirSampler, BaseSampler);
|
90 |
|
91 | ReservoirSampler.prototype._read = function() {
|
92 | if (this.running) {
|
93 | return;
|
94 | }
|
95 | this.running = true;
|
96 |
|
97 | debug('using query `%j`', this.query);
|
98 | this.collection.countDocuments(this.query, function(err, count) {
|
99 | if (err) {
|
100 | return this.emit('error', err);
|
101 | }
|
102 |
|
103 | debug('sampling %d documents from a collection with %d documents',
|
104 | this.size, count);
|
105 |
|
106 | this.collection.find(this.query, {
|
107 | projection: { _id: 1 },
|
108 | sort: this.sort,
|
109 | limit: RESERVOIR_SAMPLE_LIMIT
|
110 | })
|
111 | .stream()
|
112 | .pipe(es.map(function(obj, cb) {
|
113 | return cb(null, obj._id);
|
114 | }))
|
115 | .pipe(reservoirStream(this.collection, this.size, {
|
116 | chunkSize: this.chunkSize,
|
117 | fields: this.fields,
|
118 | maxTimeMS: this.maxTimeMS,
|
119 | raw: this.raw,
|
120 | promoteValues: this.promoteValues
|
121 | }))
|
122 | .on('error', this.emit.bind(this, 'error'))
|
123 | .on('data', this.push.bind(this))
|
124 | .on('end', function() {
|
125 | this.running = false;
|
126 | this.push(null);
|
127 | }.bind(this));
|
128 | }.bind(this));
|
129 | };
|
130 |
|
131 | module.exports = ReservoirSampler;
|