UNPKG

1.71 kBJavaScriptView Raw
1import {Transform, tupleid} from 'vega-dataflow';
2import {fastmap, inherits} from 'vega-util';
3
4/**
5 * Filters data tuples according to a predicate function.
6 * @constructor
7 * @param {object} params - The parameters for this operator.
8 * @param {function(object): *} params.expr - The predicate expression function
9 * that determines a tuple's filter status. Truthy values pass the filter.
10 */
11export default function Filter(params) {
12 Transform.call(this, fastmap(), params);
13}
14
15Filter.Definition = {
16 'type': 'Filter',
17 'metadata': {'changes': true},
18 'params': [
19 { 'name': 'expr', 'type': 'expr', 'required': true }
20 ]
21};
22
23var prototype = inherits(Filter, Transform);
24
25prototype.transform = function(_, pulse) {
26 var df = pulse.dataflow,
27 cache = this.value, // cache ids of filtered tuples
28 output = pulse.fork(),
29 add = output.add,
30 rem = output.rem,
31 mod = output.mod,
32 test = _.expr,
33 isMod = true;
34
35 pulse.visit(pulse.REM, function(t) {
36 var id = tupleid(t);
37 if (!cache.has(id)) rem.push(t);
38 else cache.delete(id);
39 });
40
41 pulse.visit(pulse.ADD, function(t) {
42 if (test(t, _)) add.push(t);
43 else cache.set(tupleid(t), 1);
44 });
45
46 function revisit(t) {
47 var id = tupleid(t),
48 b = test(t, _),
49 s = cache.get(id);
50 if (b && s) {
51 cache.delete(id);
52 add.push(t);
53 } else if (!b && !s) {
54 cache.set(id, 1);
55 rem.push(t);
56 } else if (isMod && b && !s) {
57 mod.push(t);
58 }
59 }
60
61 pulse.visit(pulse.MOD, revisit);
62
63 if (_.modified()) {
64 isMod = false;
65 pulse.visit(pulse.REFLOW, revisit);
66 }
67
68 if (cache.empty > df.cleanThreshold) df.runAfter(cache.clean);
69 return output;
70};