1 | import Aggregate from './Aggregate';
|
2 | import {ValidAggregateOps} from './util/AggregateOps';
|
3 | import {extend, inherits} from 'vega-util';
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 | export default function JoinAggregate(params) {
|
11 | Aggregate.call(this, params);
|
12 | }
|
13 |
|
14 | JoinAggregate.Definition = {
|
15 | 'type': 'JoinAggregate',
|
16 | 'metadata': {'modifies': true},
|
17 | 'params': [
|
18 | { 'name': 'groupby', 'type': 'field', 'array': true },
|
19 | { 'name': 'fields', 'type': 'field', 'null': true, 'array': true },
|
20 | { 'name': 'ops', 'type': 'enum', 'array': true, 'values': ValidAggregateOps },
|
21 | { 'name': 'as', 'type': 'string', 'null': true, 'array': true },
|
22 | { 'name': 'key', 'type': 'field' }
|
23 | ]
|
24 | };
|
25 |
|
26 | inherits(JoinAggregate, Aggregate, {
|
27 | transform(_, pulse) {
|
28 | const aggr = this,
|
29 | mod = _.modified();
|
30 | let cells;
|
31 |
|
32 |
|
33 | if (aggr.value && (mod || pulse.modified(aggr._inputs, true))) {
|
34 | cells = aggr.value = mod ? aggr.init(_) : {};
|
35 | pulse.visit(pulse.SOURCE, t => aggr.add(t));
|
36 | } else {
|
37 | cells = aggr.value = aggr.value || this.init(_);
|
38 | pulse.visit(pulse.REM, t => aggr.rem(t));
|
39 | pulse.visit(pulse.ADD, t => aggr.add(t));
|
40 | }
|
41 |
|
42 |
|
43 | aggr.changes();
|
44 |
|
45 |
|
46 | pulse.visit(pulse.SOURCE, t => {
|
47 | extend(t, cells[aggr.cellkey(t)].tuple);
|
48 | });
|
49 |
|
50 | return pulse.reflow(mod).modifies(this._outputs);
|
51 | },
|
52 |
|
53 | changes() {
|
54 | const adds = this._adds,
|
55 | mods = this._mods;
|
56 | let i, n;
|
57 |
|
58 | for (i=0, n=this._alen; i<n; ++i) {
|
59 | this.celltuple(adds[i]);
|
60 | adds[i] = null;
|
61 | }
|
62 |
|
63 | for (i=0, n=this._mlen; i<n; ++i) {
|
64 | this.celltuple(mods[i]);
|
65 | mods[i] = null;
|
66 | }
|
67 |
|
68 | this._alen = this._mlen = 0;
|
69 | }
|
70 | });
|