UNPKG

2.71 kBJavaScriptView Raw
1import {Transform, tupleid} from 'vega-dataflow';
2import {random} from 'vega-statistics';
3import {inherits} from 'vega-util';
4
5/**
6 * Samples tuples passing through this operator.
7 * Uses reservoir sampling to maintain a representative sample.
8 * @constructor
9 * @param {object} params - The parameters for this operator.
10 * @param {number} [params.size=1000] - The maximum number of samples.
11 */
12export default function Sample(params) {
13 Transform.call(this, [], params);
14 this.count = 0;
15}
16
17Sample.Definition = {
18 'type': 'Sample',
19 'metadata': {},
20 'params': [
21 { 'name': 'size', 'type': 'number', 'default': 1000 }
22 ]
23};
24
25var prototype = inherits(Sample, Transform);
26
27prototype.transform = function(_, pulse) {
28 var out = pulse.fork(pulse.NO_SOURCE),
29 mod = _.modified('size'),
30 num = _.size,
31 res = this.value,
32 cnt = this.count,
33 cap = 0,
34 map = res.reduce(function(m, t) {
35 m[tupleid(t)] = 1;
36 return m;
37 }, {});
38
39 // sample reservoir update function
40 function update(t) {
41 var p, idx;
42
43 if (res.length < num) {
44 res.push(t);
45 } else {
46 idx = ~~((cnt + 1) * random());
47 if (idx < res.length && idx >= cap) {
48 p = res[idx];
49 if (map[tupleid(p)]) out.rem.push(p); // eviction
50 res[idx] = t;
51 }
52 }
53 ++cnt;
54 }
55
56 if (pulse.rem.length) {
57 // find all tuples that should be removed, add to output
58 pulse.visit(pulse.REM, function(t) {
59 var id = tupleid(t);
60 if (map[id]) {
61 map[id] = -1;
62 out.rem.push(t);
63 }
64 --cnt;
65 });
66
67 // filter removed tuples out of the sample reservoir
68 res = res.filter(function(t) { return map[tupleid(t)] !== -1; });
69 }
70
71 if ((pulse.rem.length || mod) && res.length < num && pulse.source) {
72 // replenish sample if backing data source is available
73 cap = cnt = res.length;
74 pulse.visit(pulse.SOURCE, function(t) {
75 // update, but skip previously sampled tuples
76 if (!map[tupleid(t)]) update(t);
77 });
78 cap = -1;
79 }
80
81 if (mod && res.length > num) {
82 for (var i=0, n=res.length-num; i<n; ++i) {
83 map[tupleid(res[i])] = -1;
84 out.rem.push(res[i]);
85 }
86 res = res.slice(n);
87 }
88
89 if (pulse.mod.length) {
90 // propagate modified tuples in the sample reservoir
91 pulse.visit(pulse.MOD, function(t) {
92 if (map[tupleid(t)]) out.mod.push(t);
93 });
94 }
95
96 if (pulse.add.length) {
97 // update sample reservoir
98 pulse.visit(pulse.ADD, update);
99 }
100
101 if (pulse.add.length || cap < 0) {
102 // output newly added tuples
103 out.add = res.filter(function(t) { return !map[tupleid(t)]; });
104 }
105
106 this.count = cnt;
107 this.value = out.source = res;
108 return out;
109};