1 | import {Transform, tupleid} from 'vega-dataflow';
|
2 | import {random} from 'vega-statistics';
|
3 | import {inherits} from 'vega-util';
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 | export default function Sample(params) {
|
13 | Transform.call(this, [], params);
|
14 | this.count = 0;
|
15 | }
|
16 |
|
17 | Sample.Definition = {
|
18 | 'type': 'Sample',
|
19 | 'metadata': {},
|
20 | 'params': [
|
21 | { 'name': 'size', 'type': 'number', 'default': 1000 }
|
22 | ]
|
23 | };
|
24 |
|
25 | var prototype = inherits(Sample, Transform);
|
26 |
|
27 | prototype.transform = function(_, pulse) {
|
28 | var out = pulse.fork(pulse.NO_SOURCE),
|
29 | mod = _.modified('size'),
|
30 | num = _.size,
|
31 | res = this.value,
|
32 | cnt = this.count,
|
33 | cap = 0,
|
34 | map = res.reduce(function(m, t) {
|
35 | m[tupleid(t)] = 1;
|
36 | return m;
|
37 | }, {});
|
38 |
|
39 |
|
40 | function update(t) {
|
41 | var p, idx;
|
42 |
|
43 | if (res.length < num) {
|
44 | res.push(t);
|
45 | } else {
|
46 | idx = ~~((cnt + 1) * random());
|
47 | if (idx < res.length && idx >= cap) {
|
48 | p = res[idx];
|
49 | if (map[tupleid(p)]) out.rem.push(p);
|
50 | res[idx] = t;
|
51 | }
|
52 | }
|
53 | ++cnt;
|
54 | }
|
55 |
|
56 | if (pulse.rem.length) {
|
57 |
|
58 | pulse.visit(pulse.REM, function(t) {
|
59 | var id = tupleid(t);
|
60 | if (map[id]) {
|
61 | map[id] = -1;
|
62 | out.rem.push(t);
|
63 | }
|
64 | --cnt;
|
65 | });
|
66 |
|
67 |
|
68 | res = res.filter(function(t) { return map[tupleid(t)] !== -1; });
|
69 | }
|
70 |
|
71 | if ((pulse.rem.length || mod) && res.length < num && pulse.source) {
|
72 |
|
73 | cap = cnt = res.length;
|
74 | pulse.visit(pulse.SOURCE, function(t) {
|
75 |
|
76 | if (!map[tupleid(t)]) update(t);
|
77 | });
|
78 | cap = -1;
|
79 | }
|
80 |
|
81 | if (mod && res.length > num) {
|
82 | for (var i=0, n=res.length-num; i<n; ++i) {
|
83 | map[tupleid(res[i])] = -1;
|
84 | out.rem.push(res[i]);
|
85 | }
|
86 | res = res.slice(n);
|
87 | }
|
88 |
|
89 | if (pulse.mod.length) {
|
90 |
|
91 | pulse.visit(pulse.MOD, function(t) {
|
92 | if (map[tupleid(t)]) out.mod.push(t);
|
93 | });
|
94 | }
|
95 |
|
96 | if (pulse.add.length) {
|
97 |
|
98 | pulse.visit(pulse.ADD, update);
|
99 | }
|
100 |
|
101 | if (pulse.add.length || cap < 0) {
|
102 |
|
103 | out.add = res.filter(function(t) { return !map[tupleid(t)]; });
|
104 | }
|
105 |
|
106 | this.count = cnt;
|
107 | this.value = out.source = res;
|
108 | return out;
|
109 | };
|