1 | import {Transform, tupleid} from 'vega-dataflow';
|
2 | import {fastmap, inherits} from 'vega-util';
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 | export default function Filter(params) {
|
12 | Transform.call(this, fastmap(), params);
|
13 | }
|
14 |
|
15 | Filter.Definition = {
|
16 | 'type': 'Filter',
|
17 | 'metadata': {'changes': true},
|
18 | 'params': [
|
19 | { 'name': 'expr', 'type': 'expr', 'required': true }
|
20 | ]
|
21 | };
|
22 |
|
23 | inherits(Filter, Transform, {
|
24 | transform(_, pulse) {
|
25 | const df = pulse.dataflow,
|
26 | cache = this.value,
|
27 | output = pulse.fork(),
|
28 | add = output.add,
|
29 | rem = output.rem,
|
30 | mod = output.mod,
|
31 | test = _.expr;
|
32 | let isMod = true;
|
33 |
|
34 | pulse.visit(pulse.REM, t => {
|
35 | const id = tupleid(t);
|
36 | if (!cache.has(id)) rem.push(t);
|
37 | else cache.delete(id);
|
38 | });
|
39 |
|
40 | pulse.visit(pulse.ADD, t => {
|
41 | if (test(t, _)) add.push(t);
|
42 | else cache.set(tupleid(t), 1);
|
43 | });
|
44 |
|
45 | function revisit(t) {
|
46 | const id = tupleid(t),
|
47 | b = test(t, _),
|
48 | s = cache.get(id);
|
49 | if (b && s) {
|
50 | cache.delete(id);
|
51 | add.push(t);
|
52 | } else if (!b && !s) {
|
53 | cache.set(id, 1);
|
54 | rem.push(t);
|
55 | } else if (isMod && b && !s) {
|
56 | mod.push(t);
|
57 | }
|
58 | }
|
59 |
|
60 | pulse.visit(pulse.MOD, revisit);
|
61 |
|
62 | if (_.modified()) {
|
63 | isMod = false;
|
64 | pulse.visit(pulse.REFLOW, revisit);
|
65 | }
|
66 |
|
67 | if (cache.empty > df.cleanThreshold) df.runAfter(cache.clean);
|
68 | return output;
|
69 | }
|
70 | });
|