UNPKG

9.22 kBJavaScriptView Raw
1import {groupkey} from './util/AggregateKeys';
2import {ValidAggregateOps, compileMeasures, createMeasure, measureName} from './util/AggregateOps';
3import TupleStore from './util/TupleStore';
4import {Transform, ingest, replace} from 'vega-dataflow';
5import {accessorFields, accessorName, array, error, inherits} from 'vega-util';
6
7/**
8 * Group-by aggregation operator.
9 * @constructor
10 * @param {object} params - The parameters for this operator.
11 * @param {Array<function(object): *>} [params.groupby] - An array of accessors to groupby.
12 * @param {Array<function(object): *>} [params.fields] - An array of accessors to aggregate.
13 * @param {Array<string>} [params.ops] - An array of strings indicating aggregation operations.
14 * @param {Array<string>} [params.as] - An array of output field names for aggregated values.
15 * @param {boolean} [params.cross=false] - A flag indicating that the full
16 * cross-product of groupby values should be generated, including empty cells.
17 * If true, the drop parameter is ignored and empty cells are retained.
18 * @param {boolean} [params.drop=true] - A flag indicating if empty cells should be removed.
19 */
20export default function Aggregate(params) {
21 Transform.call(this, null, params);
22
23 this._adds = []; // array of added output tuples
24 this._mods = []; // array of modified output tuples
25 this._alen = 0; // number of active added tuples
26 this._mlen = 0; // number of active modified tuples
27 this._drop = true; // should empty aggregation cells be removed
28 this._cross = false; // produce full cross-product of group-by values
29
30 this._dims = []; // group-by dimension accessors
31 this._dnames = []; // group-by dimension names
32
33 this._measures = []; // collection of aggregation monoids
34 this._countOnly = false; // flag indicating only count aggregation
35 this._counts = null; // collection of count fields
36 this._prev = null; // previous aggregation cells
37
38 this._inputs = null; // array of dependent input tuple field names
39 this._outputs = null; // array of output tuple field names
40}
41
42Aggregate.Definition = {
43 'type': 'Aggregate',
44 'metadata': {'generates': true, 'changes': true},
45 'params': [
46 { 'name': 'groupby', 'type': 'field', 'array': true },
47 { 'name': 'ops', 'type': 'enum', 'array': true, 'values': ValidAggregateOps },
48 { 'name': 'fields', 'type': 'field', 'null': true, 'array': true },
49 { 'name': 'as', 'type': 'string', 'null': true, 'array': true },
50 { 'name': 'drop', 'type': 'boolean', 'default': true },
51 { 'name': 'cross', 'type': 'boolean', 'default': false },
52 { 'name': 'key', 'type': 'field' }
53 ]
54};
55
56var prototype = inherits(Aggregate, Transform);
57
58prototype.transform = function(_, pulse) {
59 var aggr = this,
60 out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS),
61 mod = _.modified();
62
63 aggr.stamp = out.stamp;
64
65 if (aggr.value && (mod || pulse.modified(aggr._inputs, true))) {
66 aggr._prev = aggr.value;
67 aggr.value = mod ? aggr.init(_) : {};
68 pulse.visit(pulse.SOURCE, t => aggr.add(t));
69 } else {
70 aggr.value = aggr.value || aggr.init(_);
71 pulse.visit(pulse.REM, t => aggr.rem(t));
72 pulse.visit(pulse.ADD, t => aggr.add(t));
73 }
74
75 // Indicate output fields and return aggregate tuples.
76 out.modifies(aggr._outputs);
77
78 // Should empty cells be dropped?
79 aggr._drop = _.drop !== false;
80
81 // If domain cross-product requested, generate empty cells as needed
82 // and ensure that empty cells are not dropped
83 if (_.cross && aggr._dims.length > 1) {
84 aggr._drop = false;
85 aggr.cross();
86 }
87
88 return aggr.changes(out);
89};
90
91prototype.cross = function() {
92 var aggr = this,
93 curr = aggr.value,
94 dims = aggr._dnames,
95 vals = dims.map(function() { return {}; }),
96 n = dims.length;
97
98 // collect all group-by domain values
99 function collect(cells) {
100 var key, i, t, v;
101 for (key in cells) {
102 t = cells[key].tuple;
103 for (i=0; i<n; ++i) {
104 vals[i][(v = t[dims[i]])] = v;
105 }
106 }
107 }
108 collect(aggr._prev);
109 collect(curr);
110
111 // iterate over key cross-product, create cells as needed
112 function generate(base, tuple, index) {
113 var name = dims[index],
114 v = vals[index++],
115 k, key;
116
117 for (k in v) {
118 tuple[name] = v[k];
119 key = base ? base + '|' + k : k;
120 if (index < n) generate(key, tuple, index);
121 else if (!curr[key]) aggr.cell(key, tuple);
122 }
123 }
124 generate('', {}, 0);
125};
126
127prototype.init = function(_) {
128 // initialize input and output fields
129 var inputs = (this._inputs = []),
130 outputs = (this._outputs = []),
131 inputMap = {};
132
133 function inputVisit(get) {
134 var fields = array(accessorFields(get)),
135 i = 0, n = fields.length, f;
136 for (; i<n; ++i) {
137 if (!inputMap[f=fields[i]]) {
138 inputMap[f] = 1;
139 inputs.push(f);
140 }
141 }
142 }
143
144 // initialize group-by dimensions
145 this._dims = array(_.groupby);
146 this._dnames = this._dims.map(function(d) {
147 var dname = accessorName(d);
148 inputVisit(d);
149 outputs.push(dname);
150 return dname;
151 });
152 this.cellkey = _.key ? _.key : groupkey(this._dims);
153
154 // initialize aggregate measures
155 this._countOnly = true;
156 this._counts = [];
157 this._measures = [];
158
159 var fields = _.fields || [null],
160 ops = _.ops || ['count'],
161 as = _.as || [],
162 n = fields.length,
163 map = {},
164 field, op, m, mname, outname, i;
165
166 if (n !== ops.length) {
167 error('Unmatched number of fields and aggregate ops.');
168 }
169
170 for (i=0; i<n; ++i) {
171 field = fields[i];
172 op = ops[i];
173
174 if (field == null && op !== 'count') {
175 error('Null aggregate field specified.');
176 }
177 mname = accessorName(field);
178 outname = measureName(op, mname, as[i]);
179 outputs.push(outname);
180
181 if (op === 'count') {
182 this._counts.push(outname);
183 continue;
184 }
185
186 m = map[mname];
187 if (!m) {
188 inputVisit(field);
189 m = (map[mname] = []);
190 m.field = field;
191 this._measures.push(m);
192 }
193
194 if (op !== 'count') this._countOnly = false;
195 m.push(createMeasure(op, outname));
196 }
197
198 this._measures = this._measures.map(function(m) {
199 return compileMeasures(m, m.field);
200 });
201
202 return {}; // aggregation cells (this.value)
203};
204
205// -- Cell Management -----
206
207prototype.cellkey = groupkey();
208
209prototype.cell = function(key, t) {
210 var cell = this.value[key];
211 if (!cell) {
212 cell = this.value[key] = this.newcell(key, t);
213 this._adds[this._alen++] = cell;
214 } else if (cell.num === 0 && this._drop && cell.stamp < this.stamp) {
215 cell.stamp = this.stamp;
216 this._adds[this._alen++] = cell;
217 } else if (cell.stamp < this.stamp) {
218 cell.stamp = this.stamp;
219 this._mods[this._mlen++] = cell;
220 }
221 return cell;
222};
223
224prototype.newcell = function(key, t) {
225 var cell = {
226 key: key,
227 num: 0,
228 agg: null,
229 tuple: this.newtuple(t, this._prev && this._prev[key]),
230 stamp: this.stamp,
231 store: false
232 };
233
234 if (!this._countOnly) {
235 var measures = this._measures,
236 n = measures.length, i;
237
238 cell.agg = Array(n);
239 for (i=0; i<n; ++i) {
240 cell.agg[i] = new measures[i](cell);
241 }
242 }
243
244 if (cell.store) {
245 cell.data = new TupleStore();
246 }
247
248 return cell;
249};
250
251prototype.newtuple = function(t, p) {
252 var names = this._dnames,
253 dims = this._dims,
254 x = {}, i, n;
255
256 for (i=0, n=dims.length; i<n; ++i) {
257 x[names[i]] = dims[i](t);
258 }
259
260 return p ? replace(p.tuple, x) : ingest(x);
261};
262
263// -- Process Tuples -----
264
265prototype.add = function(t) {
266 var key = this.cellkey(t),
267 cell = this.cell(key, t),
268 agg, i, n;
269
270 cell.num += 1;
271 if (this._countOnly) return;
272
273 if (cell.store) cell.data.add(t);
274
275 agg = cell.agg;
276 for (i=0, n=agg.length; i<n; ++i) {
277 agg[i].add(agg[i].get(t), t);
278 }
279};
280
281prototype.rem = function(t) {
282 var key = this.cellkey(t),
283 cell = this.cell(key, t),
284 agg, i, n;
285
286 cell.num -= 1;
287 if (this._countOnly) return;
288
289 if (cell.store) cell.data.rem(t);
290
291 agg = cell.agg;
292 for (i=0, n=agg.length; i<n; ++i) {
293 agg[i].rem(agg[i].get(t), t);
294 }
295};
296
297prototype.celltuple = function(cell) {
298 var tuple = cell.tuple,
299 counts = this._counts,
300 agg, i, n;
301
302 // consolidate stored values
303 if (cell.store) {
304 cell.data.values();
305 }
306
307 // update tuple properties
308 for (i=0, n=counts.length; i<n; ++i) {
309 tuple[counts[i]] = cell.num;
310 }
311 if (!this._countOnly) {
312 agg = cell.agg;
313 for (i=0, n=agg.length; i<n; ++i) {
314 agg[i].set(tuple);
315 }
316 }
317
318 return tuple;
319};
320
321prototype.changes = function(out) {
322 var adds = this._adds,
323 mods = this._mods,
324 prev = this._prev,
325 drop = this._drop,
326 add = out.add,
327 rem = out.rem,
328 mod = out.mod,
329 cell, key, i, n;
330
331 if (prev) for (key in prev) {
332 cell = prev[key];
333 if (!drop || cell.num) rem.push(cell.tuple);
334 }
335
336 for (i=0, n=this._alen; i<n; ++i) {
337 add.push(this.celltuple(adds[i]));
338 adds[i] = null; // for garbage collection
339 }
340
341 for (i=0, n=this._mlen; i<n; ++i) {
342 cell = mods[i];
343 (cell.num === 0 && drop ? rem : mod).push(this.celltuple(cell));
344 mods[i] = null; // for garbage collection
345 }
346
347 this._alen = this._mlen = 0; // reset list of active cells
348 this._prev = null;
349 return out;
350};