UNPKG

2.75 kBJavaScriptView Raw
1import {Transform, tupleid} from 'vega-dataflow';
2import {random} from 'vega-statistics';
3import {inherits} from 'vega-util';
4
5/**
6 * Samples tuples passing through this operator.
7 * Uses reservoir sampling to maintain a representative sample.
8 * @constructor
9 * @param {object} params - The parameters for this operator.
10 * @param {number} [params.size=1000] - The maximum number of samples.
11 */
12export default function Sample(params) {
13 Transform.call(this, [], params);
14 this.count = 0;
15}
16
17Sample.Definition = {
18 'type': 'Sample',
19 'metadata': {},
20 'params': [
21 { 'name': 'size', 'type': 'number', 'default': 1000 }
22 ]
23};
24
25inherits(Sample, Transform, {
26 transform(_, pulse) {
27 const out = pulse.fork(pulse.NO_SOURCE),
28 mod = _.modified('size'),
29 num = _.size,
30 map = this.value.reduce((m, t) => (m[tupleid(t)] = 1, m), {});
31
32 let res = this.value,
33 cnt = this.count,
34 cap = 0;
35
36 // sample reservoir update function
37 function update(t) {
38 let p, idx;
39
40 if (res.length < num) {
41 res.push(t);
42 } else {
43 idx = ~~((cnt + 1) * random());
44 if (idx < res.length && idx >= cap) {
45 p = res[idx];
46 if (map[tupleid(p)]) out.rem.push(p); // eviction
47 res[idx] = t;
48 }
49 }
50 ++cnt;
51 }
52
53 if (pulse.rem.length) {
54 // find all tuples that should be removed, add to output
55 pulse.visit(pulse.REM, t => {
56 const id = tupleid(t);
57 if (map[id]) {
58 map[id] = -1;
59 out.rem.push(t);
60 }
61 --cnt;
62 });
63
64 // filter removed tuples out of the sample reservoir
65 res = res.filter(t => map[tupleid(t)] !== -1);
66 }
67
68 if ((pulse.rem.length || mod) && res.length < num && pulse.source) {
69 // replenish sample if backing data source is available
70 cap = cnt = res.length;
71 pulse.visit(pulse.SOURCE, t => {
72 // update, but skip previously sampled tuples
73 if (!map[tupleid(t)]) update(t);
74 });
75 cap = -1;
76 }
77
78 if (mod && res.length > num) {
79 const n = res.length-num;
80 for (let i=0; i<n; ++i) {
81 map[tupleid(res[i])] = -1;
82 out.rem.push(res[i]);
83 }
84 res = res.slice(n);
85 }
86
87 if (pulse.mod.length) {
88 // propagate modified tuples in the sample reservoir
89 pulse.visit(pulse.MOD, t => {
90 if (map[tupleid(t)]) out.mod.push(t);
91 });
92 }
93
94 if (pulse.add.length) {
95 // update sample reservoir
96 pulse.visit(pulse.ADD, update);
97 }
98
99 if (pulse.add.length || cap < 0) {
100 // output newly added tuples
101 out.add = res.filter(t => !map[tupleid(t)]);
102 }
103
104 this.count = cnt;
105 this.value = out.source = res;
106 return out;
107 }
108});