1 | import Subflow from './Subflow';
|
2 | import {Transform, tupleid} from 'vega-dataflow';
|
3 | import {fastmap, hasOwnProperty, inherits} from 'vega-util';
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 | export default function Facet(params) {
|
14 | Transform.call(this, {}, params);
|
15 | this._keys = fastmap();
|
16 |
|
17 |
|
18 |
|
19 | var a = this._targets = [];
|
20 | a.active = 0;
|
21 | a.forEach = function(f) {
|
22 | for (var i=0, n=a.active; i<n; ++i) f(a[i], i, a);
|
23 | };
|
24 | }
|
25 |
|
26 | var prototype = inherits(Facet, Transform);
|
27 |
|
28 | prototype.activate = function(flow) {
|
29 | this._targets[this._targets.active++] = flow;
|
30 | };
|
31 |
|
32 | prototype.subflow = function(key, flow, pulse, parent) {
|
33 | var flows = this.value,
|
34 | sf = hasOwnProperty(flows, key) && flows[key],
|
35 | df, p;
|
36 |
|
37 | if (!sf) {
|
38 | p = parent || (p = this._group[key]) && p.tuple;
|
39 | df = pulse.dataflow;
|
40 | sf = df.add(new Subflow(pulse.fork(pulse.NO_SOURCE), this))
|
41 | .connect(flow(df, key, p));
|
42 | flows[key] = sf;
|
43 | this.activate(sf);
|
44 | } else if (sf.value.stamp < pulse.stamp) {
|
45 | sf.init(pulse);
|
46 | this.activate(sf);
|
47 | }
|
48 |
|
49 | return sf;
|
50 | };
|
51 |
|
52 | prototype.transform = function(_, pulse) {
|
53 | var df = pulse.dataflow,
|
54 | self = this,
|
55 | key = _.key,
|
56 | flow = _.subflow,
|
57 | cache = this._keys,
|
58 | rekey = _.modified('key');
|
59 |
|
60 | function subflow(key) {
|
61 | return self.subflow(key, flow, pulse);
|
62 | }
|
63 |
|
64 | this._group = _.group || {};
|
65 | this._targets.active = 0;
|
66 |
|
67 | pulse.visit(pulse.REM, function(t) {
|
68 | var id = tupleid(t),
|
69 | k = cache.get(id);
|
70 | if (k !== undefined) {
|
71 | cache.delete(id);
|
72 | subflow(k).rem(t);
|
73 | }
|
74 | });
|
75 |
|
76 | pulse.visit(pulse.ADD, function(t) {
|
77 | var k = key(t);
|
78 | cache.set(tupleid(t), k);
|
79 | subflow(k).add(t);
|
80 | });
|
81 |
|
82 | if (rekey || pulse.modified(key.fields)) {
|
83 | pulse.visit(pulse.MOD, function(t) {
|
84 | var id = tupleid(t),
|
85 | k0 = cache.get(id),
|
86 | k1 = key(t);
|
87 | if (k0 === k1) {
|
88 | subflow(k1).mod(t);
|
89 | } else {
|
90 | cache.set(id, k1);
|
91 | subflow(k0).rem(t);
|
92 | subflow(k1).add(t);
|
93 | }
|
94 | });
|
95 | } else if (pulse.changed(pulse.MOD)) {
|
96 | pulse.visit(pulse.MOD, function(t) {
|
97 | subflow(cache.get(tupleid(t))).mod(t);
|
98 | });
|
99 | }
|
100 |
|
101 | if (rekey) {
|
102 | pulse.visit(pulse.REFLOW, function(t) {
|
103 | var id = tupleid(t),
|
104 | k0 = cache.get(id),
|
105 | k1 = key(t);
|
106 | if (k0 !== k1) {
|
107 | cache.set(id, k1);
|
108 | subflow(k0).rem(t);
|
109 | subflow(k1).add(t);
|
110 | }
|
111 | });
|
112 | }
|
113 |
|
114 | if (cache.empty > df.cleanThreshold) df.runAfter(cache.clean);
|
115 | return pulse;
|
116 | };
|