UNPKG

1.76 kBJavaScriptView Raw
1import {Transform, tupleid} from 'vega-dataflow';
2import {fastmap, inherits} from 'vega-util';
3
4/**
5 * Filters data tuples according to a predicate function.
6 * @constructor
7 * @param {object} params - The parameters for this operator.
8 * @param {function(object): *} params.expr - The predicate expression function
9 * that determines a tuple's filter status. Truthy values pass the filter.
10 */
11export default function Filter(params) {
12 Transform.call(this, fastmap(), params);
13}
14
15Filter.Definition = {
16 'type': 'Filter',
17 'metadata': {'changes': true},
18 'params': [
19 { 'name': 'expr', 'type': 'expr', 'required': true }
20 ]
21};
22
23inherits(Filter, Transform, {
24 transform(_, pulse) {
25 const df = pulse.dataflow,
26 cache = this.value, // cache ids of filtered tuples
27 output = pulse.fork(),
28 add = output.add,
29 rem = output.rem,
30 mod = output.mod,
31 test = _.expr;
32 let isMod = true;
33
34 pulse.visit(pulse.REM, t => {
35 const id = tupleid(t);
36 if (!cache.has(id)) rem.push(t);
37 else cache.delete(id);
38 });
39
40 pulse.visit(pulse.ADD, t => {
41 if (test(t, _)) add.push(t);
42 else cache.set(tupleid(t), 1);
43 });
44
45 function revisit(t) {
46 const id = tupleid(t),
47 b = test(t, _),
48 s = cache.get(id);
49 if (b && s) {
50 cache.delete(id);
51 add.push(t);
52 } else if (!b && !s) {
53 cache.set(id, 1);
54 rem.push(t);
55 } else if (isMod && b && !s) {
56 mod.push(t);
57 }
58 }
59
60 pulse.visit(pulse.MOD, revisit);
61
62 if (_.modified()) {
63 isMod = false;
64 pulse.visit(pulse.REFLOW, revisit);
65 }
66
67 if (cache.empty > df.cleanThreshold) df.runAfter(cache.clean);
68 return output;
69 }
70});