UNPKG

3.04 kBJavaScriptView Raw
1import Subflow from './Subflow';
2import {Transform, tupleid} from 'vega-dataflow';
3import {fastmap, hasOwnProperty, inherits} from 'vega-util';
4
5/**
6 * Facets a dataflow into a set of subflows based on a key.
7 * @constructor
8 * @param {object} params - The parameters for this operator.
9 * @param {function(Dataflow, string): Operator} params.subflow - A function
10 * that generates a subflow of operators and returns its root operator.
11 * @param {function(object): *} params.key - The key field to facet by.
12 */
13export default function Facet(params) {
14 Transform.call(this, {}, params);
15 this._keys = fastmap(); // cache previously calculated key values
16
17 // keep track of active subflows, use as targets array for listeners
18 // this allows us to limit propagation to only updated subflows
19 var a = this._targets = [];
20 a.active = 0;
21 a.forEach = function(f) {
22 for (var i=0, n=a.active; i<n; ++i) f(a[i], i, a);
23 };
24}
25
26var prototype = inherits(Facet, Transform);
27
28prototype.activate = function(flow) {
29 this._targets[this._targets.active++] = flow;
30};
31
32prototype.subflow = function(key, flow, pulse, parent) {
33 var flows = this.value,
34 sf = hasOwnProperty(flows, key) && flows[key],
35 df, p;
36
37 if (!sf) {
38 p = parent || (p = this._group[key]) && p.tuple;
39 df = pulse.dataflow;
40 sf = df.add(new Subflow(pulse.fork(pulse.NO_SOURCE), this))
41 .connect(flow(df, key, p));
42 flows[key] = sf;
43 this.activate(sf);
44 } else if (sf.value.stamp < pulse.stamp) {
45 sf.init(pulse);
46 this.activate(sf);
47 }
48
49 return sf;
50};
51
52prototype.transform = function(_, pulse) {
53 var df = pulse.dataflow,
54 self = this,
55 key = _.key,
56 flow = _.subflow,
57 cache = this._keys,
58 rekey = _.modified('key');
59
60 function subflow(key) {
61 return self.subflow(key, flow, pulse);
62 }
63
64 this._group = _.group || {};
65 this._targets.active = 0; // reset list of active subflows
66
67 pulse.visit(pulse.REM, function(t) {
68 var id = tupleid(t),
69 k = cache.get(id);
70 if (k !== undefined) {
71 cache.delete(id);
72 subflow(k).rem(t);
73 }
74 });
75
76 pulse.visit(pulse.ADD, function(t) {
77 var k = key(t);
78 cache.set(tupleid(t), k);
79 subflow(k).add(t);
80 });
81
82 if (rekey || pulse.modified(key.fields)) {
83 pulse.visit(pulse.MOD, function(t) {
84 var id = tupleid(t),
85 k0 = cache.get(id),
86 k1 = key(t);
87 if (k0 === k1) {
88 subflow(k1).mod(t);
89 } else {
90 cache.set(id, k1);
91 subflow(k0).rem(t);
92 subflow(k1).add(t);
93 }
94 });
95 } else if (pulse.changed(pulse.MOD)) {
96 pulse.visit(pulse.MOD, function(t) {
97 subflow(cache.get(tupleid(t))).mod(t);
98 });
99 }
100
101 if (rekey) {
102 pulse.visit(pulse.REFLOW, function(t) {
103 var id = tupleid(t),
104 k0 = cache.get(id),
105 k1 = key(t);
106 if (k0 !== k1) {
107 cache.set(id, k1);
108 subflow(k0).rem(t);
109 subflow(k1).add(t);
110 }
111 });
112 }
113
114 if (cache.empty > df.cleanThreshold) df.runAfter(cache.clean);
115 return pulse;
116};