1 | import {Transform, tupleid} from 'vega-dataflow';
|
2 | import {fastmap, inherits} from 'vega-util';
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 | export default function Filter(params) {
|
12 | Transform.call(this, fastmap(), params);
|
13 | }
|
14 |
|
15 | Filter.Definition = {
|
16 | 'type': 'Filter',
|
17 | 'metadata': {'changes': true},
|
18 | 'params': [
|
19 | { 'name': 'expr', 'type': 'expr', 'required': true }
|
20 | ]
|
21 | };
|
22 |
|
23 | var prototype = inherits(Filter, Transform);
|
24 |
|
25 | prototype.transform = function(_, pulse) {
|
26 | var df = pulse.dataflow,
|
27 | cache = this.value,
|
28 | output = pulse.fork(),
|
29 | add = output.add,
|
30 | rem = output.rem,
|
31 | mod = output.mod,
|
32 | test = _.expr,
|
33 | isMod = true;
|
34 |
|
35 | pulse.visit(pulse.REM, function(t) {
|
36 | var id = tupleid(t);
|
37 | if (!cache.has(id)) rem.push(t);
|
38 | else cache.delete(id);
|
39 | });
|
40 |
|
41 | pulse.visit(pulse.ADD, function(t) {
|
42 | if (test(t, _)) add.push(t);
|
43 | else cache.set(tupleid(t), 1);
|
44 | });
|
45 |
|
46 | function revisit(t) {
|
47 | var id = tupleid(t),
|
48 | b = test(t, _),
|
49 | s = cache.get(id);
|
50 | if (b && s) {
|
51 | cache.delete(id);
|
52 | add.push(t);
|
53 | } else if (!b && !s) {
|
54 | cache.set(id, 1);
|
55 | rem.push(t);
|
56 | } else if (isMod && b && !s) {
|
57 | mod.push(t);
|
58 | }
|
59 | }
|
60 |
|
61 | pulse.visit(pulse.MOD, revisit);
|
62 |
|
63 | if (_.modified()) {
|
64 | isMod = false;
|
65 | pulse.visit(pulse.REFLOW, revisit);
|
66 | }
|
67 |
|
68 | if (cache.empty > df.cleanThreshold) df.runAfter(cache.clean);
|
69 | return output;
|
70 | };
|