1 | import Subflow from './Subflow';
|
2 | import {Transform, tupleid} from 'vega-dataflow';
|
3 | import {fastmap, hasOwnProperty, inherits} from 'vega-util';
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 | export default function Facet(params) {
|
14 | Transform.call(this, {}, params);
|
15 | this._keys = fastmap();
|
16 |
|
17 |
|
18 |
|
19 | const a = this._targets = [];
|
20 | a.active = 0;
|
21 | a.forEach = f => {
|
22 | for (let i=0, n=a.active; i<n; ++i) {
|
23 | f(a[i], i, a);
|
24 | }
|
25 | };
|
26 | }
|
27 |
|
28 | inherits(Facet, Transform, {
|
29 | activate(flow) {
|
30 | this._targets[this._targets.active++] = flow;
|
31 | },
|
32 |
|
33 |
|
34 | subflow(key, flow, pulse, parent) {
|
35 | const flows = this.value;
|
36 | let sf = hasOwnProperty(flows, key) && flows[key],
|
37 | df, p;
|
38 |
|
39 | if (!sf) {
|
40 | p = parent || (p = this._group[key]) && p.tuple;
|
41 | df = pulse.dataflow;
|
42 | sf = new Subflow(pulse.fork(pulse.NO_SOURCE), this);
|
43 | df.add(sf).connect(flow(df, key, p));
|
44 | flows[key] = sf;
|
45 | this.activate(sf);
|
46 | } else if (sf.value.stamp < pulse.stamp) {
|
47 | sf.init(pulse);
|
48 | this.activate(sf);
|
49 | }
|
50 |
|
51 | return sf;
|
52 | },
|
53 |
|
54 | clean() {
|
55 | const flows = this.value;
|
56 | let detached = 0;
|
57 | for (const key in flows) {
|
58 | if (flows[key].count === 0) {
|
59 | const detach = flows[key].detachSubflow;
|
60 | if (detach) detach();
|
61 | delete flows[key];
|
62 | ++detached;
|
63 | }
|
64 | }
|
65 |
|
66 |
|
67 | if (detached) {
|
68 | const active = this._targets.filter(sf => sf && sf.count > 0);
|
69 | this.initTargets(active);
|
70 | }
|
71 | },
|
72 |
|
73 | initTargets(act) {
|
74 | const a = this._targets,
|
75 | n = a.length,
|
76 | m = act ? act.length : 0;
|
77 | let i = 0;
|
78 |
|
79 | for (; i<m; ++i) {
|
80 | a[i] = act[i];
|
81 | }
|
82 | for (; i<n && a[i] != null; ++i) {
|
83 | a[i] = null;
|
84 | }
|
85 | a.active = m;
|
86 | },
|
87 |
|
88 | transform(_, pulse) {
|
89 | const df = pulse.dataflow,
|
90 | key = _.key,
|
91 | flow = _.subflow,
|
92 | cache = this._keys,
|
93 | rekey = _.modified('key'),
|
94 | subflow = key => this.subflow(key, flow, pulse);
|
95 |
|
96 | this._group = _.group || {};
|
97 | this.initTargets();
|
98 |
|
99 | pulse.visit(pulse.REM, t => {
|
100 | const id = tupleid(t),
|
101 | k = cache.get(id);
|
102 | if (k !== undefined) {
|
103 | cache.delete(id);
|
104 | subflow(k).rem(t);
|
105 | }
|
106 | });
|
107 |
|
108 | pulse.visit(pulse.ADD, t => {
|
109 | const k = key(t);
|
110 | cache.set(tupleid(t), k);
|
111 | subflow(k).add(t);
|
112 | });
|
113 |
|
114 | if (rekey || pulse.modified(key.fields)) {
|
115 | pulse.visit(pulse.MOD, t => {
|
116 | const id = tupleid(t),
|
117 | k0 = cache.get(id),
|
118 | k1 = key(t);
|
119 | if (k0 === k1) {
|
120 | subflow(k1).mod(t);
|
121 | } else {
|
122 | cache.set(id, k1);
|
123 | subflow(k0).rem(t);
|
124 | subflow(k1).add(t);
|
125 | }
|
126 | });
|
127 | } else if (pulse.changed(pulse.MOD)) {
|
128 | pulse.visit(pulse.MOD, t => {
|
129 | subflow(cache.get(tupleid(t))).mod(t);
|
130 | });
|
131 | }
|
132 |
|
133 | if (rekey) {
|
134 | pulse.visit(pulse.REFLOW, t => {
|
135 | const id = tupleid(t),
|
136 | k0 = cache.get(id),
|
137 | k1 = key(t);
|
138 | if (k0 !== k1) {
|
139 | cache.set(id, k1);
|
140 | subflow(k0).rem(t);
|
141 | subflow(k1).add(t);
|
142 | }
|
143 | });
|
144 | }
|
145 |
|
146 | if (pulse.clean()) {
|
147 | df.runAfter(() => { this.clean(); cache.clean(); });
|
148 | } else if (cache.empty > df.cleanThreshold) {
|
149 | df.runAfter(cache.clean);
|
150 | }
|
151 |
|
152 | return pulse;
|
153 | }
|
154 | });
|