UNPKG

4.01 kBJavaScriptView Raw
1import Subflow from './Subflow';
2import {Transform, tupleid} from 'vega-dataflow';
3import {fastmap, hasOwnProperty, inherits} from 'vega-util';
4
5/**
6 * Facets a dataflow into a set of subflows based on a key.
7 * @constructor
8 * @param {object} params - The parameters for this operator.
9 * @param {function(Dataflow, string): Operator} params.subflow - A function
10 * that generates a subflow of operators and returns its root operator.
11 * @param {function(object): *} params.key - The key field to facet by.
12 */
13export default function Facet(params) {
14 Transform.call(this, {}, params);
15 this._keys = fastmap(); // cache previously calculated key values
16
17 // keep track of active subflows, use as targets array for listeners
18 // this allows us to limit propagation to only updated subflows
19 const a = this._targets = [];
20 a.active = 0;
21 a.forEach = f => {
22 for (let i=0, n=a.active; i<n; ++i) {
23 f(a[i], i, a);
24 }
25 };
26}
27
28inherits(Facet, Transform, {
29 activate(flow) {
30 this._targets[this._targets.active++] = flow;
31 },
32
33 // parent argument provided by PreFacet subclass
34 subflow(key, flow, pulse, parent) {
35 const flows = this.value;
36 let sf = hasOwnProperty(flows, key) && flows[key],
37 df, p;
38
39 if (!sf) {
40 p = parent || (p = this._group[key]) && p.tuple;
41 df = pulse.dataflow;
42 sf = new Subflow(pulse.fork(pulse.NO_SOURCE), this);
43 df.add(sf).connect(flow(df, key, p));
44 flows[key] = sf;
45 this.activate(sf);
46 } else if (sf.value.stamp < pulse.stamp) {
47 sf.init(pulse);
48 this.activate(sf);
49 }
50
51 return sf;
52 },
53
54 clean() {
55 const flows = this.value;
56 let detached = 0;
57 for (const key in flows) {
58 if (flows[key].count === 0) {
59 const detach = flows[key].detachSubflow;
60 if (detach) detach();
61 delete flows[key];
62 ++detached;
63 }
64 }
65
66 // remove inactive targets from the active targets array
67 if (detached) {
68 const active = this._targets.filter(sf => sf && sf.count > 0);
69 this.initTargets(active);
70 }
71 },
72
73 initTargets(act) {
74 const a = this._targets,
75 n = a.length,
76 m = act ? act.length : 0;
77 let i = 0;
78
79 for (; i<m; ++i) {
80 a[i] = act[i];
81 }
82 for (; i<n && a[i] != null; ++i) {
83 a[i] = null; // ensure old flows can be garbage collected
84 }
85 a.active = m;
86 },
87
88 transform(_, pulse) {
89 const df = pulse.dataflow,
90 key = _.key,
91 flow = _.subflow,
92 cache = this._keys,
93 rekey = _.modified('key'),
94 subflow = key => this.subflow(key, flow, pulse);
95
96 this._group = _.group || {};
97 this.initTargets(); // reset list of active subflows
98
99 pulse.visit(pulse.REM, t => {
100 const id = tupleid(t),
101 k = cache.get(id);
102 if (k !== undefined) {
103 cache.delete(id);
104 subflow(k).rem(t);
105 }
106 });
107
108 pulse.visit(pulse.ADD, t => {
109 const k = key(t);
110 cache.set(tupleid(t), k);
111 subflow(k).add(t);
112 });
113
114 if (rekey || pulse.modified(key.fields)) {
115 pulse.visit(pulse.MOD, t => {
116 const id = tupleid(t),
117 k0 = cache.get(id),
118 k1 = key(t);
119 if (k0 === k1) {
120 subflow(k1).mod(t);
121 } else {
122 cache.set(id, k1);
123 subflow(k0).rem(t);
124 subflow(k1).add(t);
125 }
126 });
127 } else if (pulse.changed(pulse.MOD)) {
128 pulse.visit(pulse.MOD, t => {
129 subflow(cache.get(tupleid(t))).mod(t);
130 });
131 }
132
133 if (rekey) {
134 pulse.visit(pulse.REFLOW, t => {
135 const id = tupleid(t),
136 k0 = cache.get(id),
137 k1 = key(t);
138 if (k0 !== k1) {
139 cache.set(id, k1);
140 subflow(k0).rem(t);
141 subflow(k1).add(t);
142 }
143 });
144 }
145
146 if (pulse.clean()) {
147 df.runAfter(() => { this.clean(); cache.clean(); });
148 } else if (cache.empty > df.cleanThreshold) {
149 df.runAfter(cache.clean);
150 }
151
152 return pulse;
153 }
154});