UNPKG

1.98 kBJavaScriptView Raw
1import Aggregate from './Aggregate';
2import {ValidAggregateOps} from './util/AggregateOps';
3import {extend, inherits} from 'vega-util';
4
5/**
6 * Extend input tuples with aggregate values.
7 * Calcuates aggregate values and joins them with the input stream.
8 * @constructor
9 */
10export default function JoinAggregate(params) {
11 Aggregate.call(this, params);
12}
13
14JoinAggregate.Definition = {
15 'type': 'JoinAggregate',
16 'metadata': {'modifies': true},
17 'params': [
18 { 'name': 'groupby', 'type': 'field', 'array': true },
19 { 'name': 'fields', 'type': 'field', 'null': true, 'array': true },
20 { 'name': 'ops', 'type': 'enum', 'array': true, 'values': ValidAggregateOps },
21 { 'name': 'as', 'type': 'string', 'null': true, 'array': true },
22 { 'name': 'key', 'type': 'field' }
23 ]
24};
25
26inherits(JoinAggregate, Aggregate, {
27 transform(_, pulse) {
28 const aggr = this,
29 mod = _.modified();
30 let cells;
31
32 // process all input tuples to calculate aggregates
33 if (aggr.value && (mod || pulse.modified(aggr._inputs, true))) {
34 cells = aggr.value = mod ? aggr.init(_) : {};
35 pulse.visit(pulse.SOURCE, t => aggr.add(t));
36 } else {
37 cells = aggr.value = aggr.value || this.init(_);
38 pulse.visit(pulse.REM, t => aggr.rem(t));
39 pulse.visit(pulse.ADD, t => aggr.add(t));
40 }
41
42 // update aggregation cells
43 aggr.changes();
44
45 // write aggregate values to input tuples
46 pulse.visit(pulse.SOURCE, t => {
47 extend(t, cells[aggr.cellkey(t)].tuple);
48 });
49
50 return pulse.reflow(mod).modifies(this._outputs);
51 },
52
53 changes() {
54 const adds = this._adds,
55 mods = this._mods;
56 let i, n;
57
58 for (i=0, n=this._alen; i<n; ++i) {
59 this.celltuple(adds[i]);
60 adds[i] = null; // for garbage collection
61 }
62
63 for (i=0, n=this._mlen; i<n; ++i) {
64 this.celltuple(mods[i]);
65 mods[i] = null; // for garbage collection
66 }
67
68 this._alen = this._mlen = 0; // reset list of active cells
69 }
70});