1 | import {Transform, tupleid} from 'vega-dataflow';
|
2 | import {random} from 'vega-statistics';
|
3 | import {inherits} from 'vega-util';
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 | export default function Sample(params) {
|
13 | Transform.call(this, [], params);
|
14 | this.count = 0;
|
15 | }
|
16 |
|
17 | Sample.Definition = {
|
18 | 'type': 'Sample',
|
19 | 'metadata': {},
|
20 | 'params': [
|
21 | { 'name': 'size', 'type': 'number', 'default': 1000 }
|
22 | ]
|
23 | };
|
24 |
|
25 | inherits(Sample, Transform, {
|
26 | transform(_, pulse) {
|
27 | const out = pulse.fork(pulse.NO_SOURCE),
|
28 | mod = _.modified('size'),
|
29 | num = _.size,
|
30 | map = this.value.reduce((m, t) => (m[tupleid(t)] = 1, m), {});
|
31 |
|
32 | let res = this.value,
|
33 | cnt = this.count,
|
34 | cap = 0;
|
35 |
|
36 |
|
37 | function update(t) {
|
38 | let p, idx;
|
39 |
|
40 | if (res.length < num) {
|
41 | res.push(t);
|
42 | } else {
|
43 | idx = ~~((cnt + 1) * random());
|
44 | if (idx < res.length && idx >= cap) {
|
45 | p = res[idx];
|
46 | if (map[tupleid(p)]) out.rem.push(p);
|
47 | res[idx] = t;
|
48 | }
|
49 | }
|
50 | ++cnt;
|
51 | }
|
52 |
|
53 | if (pulse.rem.length) {
|
54 |
|
55 | pulse.visit(pulse.REM, t => {
|
56 | const id = tupleid(t);
|
57 | if (map[id]) {
|
58 | map[id] = -1;
|
59 | out.rem.push(t);
|
60 | }
|
61 | --cnt;
|
62 | });
|
63 |
|
64 |
|
65 | res = res.filter(t => map[tupleid(t)] !== -1);
|
66 | }
|
67 |
|
68 | if ((pulse.rem.length || mod) && res.length < num && pulse.source) {
|
69 |
|
70 | cap = cnt = res.length;
|
71 | pulse.visit(pulse.SOURCE, t => {
|
72 |
|
73 | if (!map[tupleid(t)]) update(t);
|
74 | });
|
75 | cap = -1;
|
76 | }
|
77 |
|
78 | if (mod && res.length > num) {
|
79 | const n = res.length-num;
|
80 | for (let i=0; i<n; ++i) {
|
81 | map[tupleid(res[i])] = -1;
|
82 | out.rem.push(res[i]);
|
83 | }
|
84 | res = res.slice(n);
|
85 | }
|
86 |
|
87 | if (pulse.mod.length) {
|
88 |
|
89 | pulse.visit(pulse.MOD, t => {
|
90 | if (map[tupleid(t)]) out.mod.push(t);
|
91 | });
|
92 | }
|
93 |
|
94 | if (pulse.add.length) {
|
95 |
|
96 | pulse.visit(pulse.ADD, update);
|
97 | }
|
98 |
|
99 | if (pulse.add.length || cap < 0) {
|
100 |
|
101 | out.add = res.filter(t => !map[tupleid(t)]);
|
102 | }
|
103 |
|
104 | this.count = cnt;
|
105 | this.value = out.source = res;
|
106 | return out;
|
107 | }
|
108 | });
|