UNPKG

9.74 kBJavaScriptView Raw
1import {groupkey} from './util/AggregateKeys';
2import {ValidAggregateOps, compileMeasures, createMeasure, measureName} from './util/AggregateOps';
3import TupleStore from './util/TupleStore';
4import {Transform, ingest, replace} from 'vega-dataflow';
5import {accessorFields, accessorName, array, error, inherits} from 'vega-util';
6
7/**
8 * Group-by aggregation operator.
9 * @constructor
10 * @param {object} params - The parameters for this operator.
11 * @param {Array<function(object): *>} [params.groupby] - An array of accessors to groupby.
12 * @param {Array<function(object): *>} [params.fields] - An array of accessors to aggregate.
13 * @param {Array<string>} [params.ops] - An array of strings indicating aggregation operations.
14 * @param {Array<string>} [params.as] - An array of output field names for aggregated values.
15 * @param {boolean} [params.cross=false] - A flag indicating that the full
16 * cross-product of groupby values should be generated, including empty cells.
17 * If true, the drop parameter is ignored and empty cells are retained.
18 * @param {boolean} [params.drop=true] - A flag indicating if empty cells should be removed.
19 */
20export default function Aggregate(params) {
21 Transform.call(this, null, params);
22
23 this._adds = []; // array of added output tuples
24 this._mods = []; // array of modified output tuples
25 this._alen = 0; // number of active added tuples
26 this._mlen = 0; // number of active modified tuples
27 this._drop = true; // should empty aggregation cells be removed
28 this._cross = false; // produce full cross-product of group-by values
29
30 this._dims = []; // group-by dimension accessors
31 this._dnames = []; // group-by dimension names
32
33 this._measures = []; // collection of aggregation monoids
34 this._countOnly = false; // flag indicating only count aggregation
35 this._counts = null; // collection of count fields
36 this._prev = null; // previous aggregation cells
37
38 this._inputs = null; // array of dependent input tuple field names
39 this._outputs = null; // array of output tuple field names
40}
41
42Aggregate.Definition = {
43 'type': 'Aggregate',
44 'metadata': {'generates': true, 'changes': true},
45 'params': [
46 { 'name': 'groupby', 'type': 'field', 'array': true },
47 { 'name': 'ops', 'type': 'enum', 'array': true, 'values': ValidAggregateOps },
48 { 'name': 'fields', 'type': 'field', 'null': true, 'array': true },
49 { 'name': 'as', 'type': 'string', 'null': true, 'array': true },
50 { 'name': 'drop', 'type': 'boolean', 'default': true },
51 { 'name': 'cross', 'type': 'boolean', 'default': false },
52 { 'name': 'key', 'type': 'field' }
53 ]
54};
55
56inherits(Aggregate, Transform, {
57 transform(_, pulse) {
58 const aggr = this,
59 out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS),
60 mod = _.modified();
61
62 aggr.stamp = out.stamp;
63
64 if (aggr.value && (mod || pulse.modified(aggr._inputs, true))) {
65 aggr._prev = aggr.value;
66 aggr.value = mod ? aggr.init(_) : {};
67 pulse.visit(pulse.SOURCE, t => aggr.add(t));
68 } else {
69 aggr.value = aggr.value || aggr.init(_);
70 pulse.visit(pulse.REM, t => aggr.rem(t));
71 pulse.visit(pulse.ADD, t => aggr.add(t));
72 }
73
74 // Indicate output fields and return aggregate tuples.
75 out.modifies(aggr._outputs);
76
77 // Should empty cells be dropped?
78 aggr._drop = _.drop !== false;
79
80 // If domain cross-product requested, generate empty cells as needed
81 // and ensure that empty cells are not dropped
82 if (_.cross && aggr._dims.length > 1) {
83 aggr._drop = false;
84 aggr.cross();
85 }
86
87 if (pulse.clean() && aggr._drop) {
88 out.clean(true).runAfter(() => this.clean());
89 }
90
91 return aggr.changes(out);
92 },
93
94 cross() {
95 const aggr = this,
96 curr = aggr.value,
97 dims = aggr._dnames,
98 vals = dims.map(() => ({})),
99 n = dims.length;
100
101 // collect all group-by domain values
102 function collect(cells) {
103 let key, i, t, v;
104 for (key in cells) {
105 t = cells[key].tuple;
106 for (i=0; i<n; ++i) {
107 vals[i][(v = t[dims[i]])] = v;
108 }
109 }
110 }
111 collect(aggr._prev);
112 collect(curr);
113
114 // iterate over key cross-product, create cells as needed
115 function generate(base, tuple, index) {
116 const name = dims[index],
117 v = vals[index++];
118
119 for (const k in v) {
120 const key = base ? base + '|' + k : k;
121 tuple[name] = v[k];
122 if (index < n) generate(key, tuple, index);
123 else if (!curr[key]) aggr.cell(key, tuple);
124 }
125 }
126 generate('', {}, 0);
127 },
128
129 init(_) {
130 // initialize input and output fields
131 const inputs = (this._inputs = []),
132 outputs = (this._outputs = []),
133 inputMap = {};
134
135 function inputVisit(get) {
136 const fields = array(accessorFields(get)),
137 n = fields.length;
138 let i = 0, f;
139 for (; i<n; ++i) {
140 if (!inputMap[f=fields[i]]) {
141 inputMap[f] = 1;
142 inputs.push(f);
143 }
144 }
145 }
146
147 // initialize group-by dimensions
148 this._dims = array(_.groupby);
149 this._dnames = this._dims.map(d => {
150 const dname = accessorName(d);
151 inputVisit(d);
152 outputs.push(dname);
153 return dname;
154 });
155 this.cellkey = _.key ? _.key : groupkey(this._dims);
156
157 // initialize aggregate measures
158 this._countOnly = true;
159 this._counts = [];
160 this._measures = [];
161
162 const fields = _.fields || [null],
163 ops = _.ops || ['count'],
164 as = _.as || [],
165 n = fields.length,
166 map = {};
167 let field, op, m, mname, outname, i;
168
169 if (n !== ops.length) {
170 error('Unmatched number of fields and aggregate ops.');
171 }
172
173 for (i=0; i<n; ++i) {
174 field = fields[i];
175 op = ops[i];
176
177 if (field == null && op !== 'count') {
178 error('Null aggregate field specified.');
179 }
180 mname = accessorName(field);
181 outname = measureName(op, mname, as[i]);
182 outputs.push(outname);
183
184 if (op === 'count') {
185 this._counts.push(outname);
186 continue;
187 }
188
189 m = map[mname];
190 if (!m) {
191 inputVisit(field);
192 m = (map[mname] = []);
193 m.field = field;
194 this._measures.push(m);
195 }
196
197 if (op !== 'count') this._countOnly = false;
198 m.push(createMeasure(op, outname));
199 }
200
201 this._measures = this._measures.map(m => compileMeasures(m, m.field));
202
203 return {}; // aggregation cells (this.value)
204 },
205
206 // -- Cell Management -----
207
208 cellkey: groupkey(),
209
210 cell(key, t) {
211 let cell = this.value[key];
212 if (!cell) {
213 cell = this.value[key] = this.newcell(key, t);
214 this._adds[this._alen++] = cell;
215 } else if (cell.num === 0 && this._drop && cell.stamp < this.stamp) {
216 cell.stamp = this.stamp;
217 this._adds[this._alen++] = cell;
218 } else if (cell.stamp < this.stamp) {
219 cell.stamp = this.stamp;
220 this._mods[this._mlen++] = cell;
221 }
222 return cell;
223 },
224
225 newcell(key, t) {
226 const cell = {
227 key: key,
228 num: 0,
229 agg: null,
230 tuple: this.newtuple(t, this._prev && this._prev[key]),
231 stamp: this.stamp,
232 store: false
233 };
234
235 if (!this._countOnly) {
236 const measures = this._measures,
237 n = measures.length;
238
239 cell.agg = Array(n);
240 for (let i=0; i<n; ++i) {
241 cell.agg[i] = new measures[i](cell);
242 }
243 }
244
245 if (cell.store) {
246 cell.data = new TupleStore();
247 }
248
249 return cell;
250 },
251
252 newtuple(t, p) {
253 const names = this._dnames,
254 dims = this._dims,
255 n = dims.length,
256 x = {};
257
258 for (let i=0; i<n; ++i) {
259 x[names[i]] = dims[i](t);
260 }
261
262 return p ? replace(p.tuple, x) : ingest(x);
263 },
264
265 clean() {
266 const cells = this.value;
267 for (const key in cells) {
268 if (cells[key].num === 0) {
269 delete cells[key];
270 }
271 }
272 },
273
274 // -- Process Tuples -----
275
276 add(t) {
277 const key = this.cellkey(t),
278 cell = this.cell(key, t);
279
280 cell.num += 1;
281 if (this._countOnly) return;
282
283 if (cell.store) cell.data.add(t);
284
285 const agg = cell.agg;
286 for (let i=0, n=agg.length; i<n; ++i) {
287 agg[i].add(agg[i].get(t), t);
288 }
289 },
290
291 rem(t) {
292 const key = this.cellkey(t),
293 cell = this.cell(key, t);
294
295 cell.num -= 1;
296 if (this._countOnly) return;
297
298 if (cell.store) cell.data.rem(t);
299
300 const agg = cell.agg;
301 for (let i=0, n=agg.length; i<n; ++i) {
302 agg[i].rem(agg[i].get(t), t);
303 }
304 },
305
306 celltuple(cell) {
307 const tuple = cell.tuple,
308 counts = this._counts;
309
310 // consolidate stored values
311 if (cell.store) {
312 cell.data.values();
313 }
314
315 // update tuple properties
316 for (let i=0, n=counts.length; i<n; ++i) {
317 tuple[counts[i]] = cell.num;
318 }
319 if (!this._countOnly) {
320 const agg = cell.agg;
321 for (let i=0, n=agg.length; i<n; ++i) {
322 agg[i].set(tuple);
323 }
324 }
325
326 return tuple;
327 },
328
329 changes(out) {
330 const adds = this._adds,
331 mods = this._mods,
332 prev = this._prev,
333 drop = this._drop,
334 add = out.add,
335 rem = out.rem,
336 mod = out.mod;
337
338 let cell, key, i, n;
339
340 if (prev) for (key in prev) {
341 cell = prev[key];
342 if (!drop || cell.num) rem.push(cell.tuple);
343 }
344
345 for (i=0, n=this._alen; i<n; ++i) {
346 add.push(this.celltuple(adds[i]));
347 adds[i] = null; // for garbage collection
348 }
349
350 for (i=0, n=this._mlen; i<n; ++i) {
351 cell = mods[i];
352 (cell.num === 0 && drop ? rem : mod).push(this.celltuple(cell));
353 mods[i] = null; // for garbage collection
354 }
355
356 this._alen = this._mlen = 0; // reset list of active cells
357 this._prev = null;
358 return out;
359 }
360});