1 | import {groupkey} from './util/AggregateKeys';
|
2 | import {ValidAggregateOps, compileMeasures, createMeasure, measureName} from './util/AggregateOps';
|
3 | import TupleStore from './util/TupleStore';
|
4 | import {Transform, ingest, replace} from 'vega-dataflow';
|
5 | import {accessorFields, accessorName, array, error, inherits} from 'vega-util';
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 | export default function Aggregate(params) {
|
21 | Transform.call(this, null, params);
|
22 |
|
23 | this._adds = [];
|
24 | this._mods = [];
|
25 | this._alen = 0;
|
26 | this._mlen = 0;
|
27 | this._drop = true;
|
28 | this._cross = false;
|
29 |
|
30 | this._dims = [];
|
31 | this._dnames = [];
|
32 |
|
33 | this._measures = [];
|
34 | this._countOnly = false;
|
35 | this._counts = null;
|
36 | this._prev = null;
|
37 |
|
38 | this._inputs = null;
|
39 | this._outputs = null;
|
40 | }
|
41 |
|
42 | Aggregate.Definition = {
|
43 | 'type': 'Aggregate',
|
44 | 'metadata': {'generates': true, 'changes': true},
|
45 | 'params': [
|
46 | { 'name': 'groupby', 'type': 'field', 'array': true },
|
47 | { 'name': 'ops', 'type': 'enum', 'array': true, 'values': ValidAggregateOps },
|
48 | { 'name': 'fields', 'type': 'field', 'null': true, 'array': true },
|
49 | { 'name': 'as', 'type': 'string', 'null': true, 'array': true },
|
50 | { 'name': 'drop', 'type': 'boolean', 'default': true },
|
51 | { 'name': 'cross', 'type': 'boolean', 'default': false },
|
52 | { 'name': 'key', 'type': 'field' }
|
53 | ]
|
54 | };
|
55 |
|
56 | var prototype = inherits(Aggregate, Transform);
|
57 |
|
58 | prototype.transform = function(_, pulse) {
|
59 | var aggr = this,
|
60 | out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS),
|
61 | mod = _.modified();
|
62 |
|
63 | aggr.stamp = out.stamp;
|
64 |
|
65 | if (aggr.value && (mod || pulse.modified(aggr._inputs, true))) {
|
66 | aggr._prev = aggr.value;
|
67 | aggr.value = mod ? aggr.init(_) : {};
|
68 | pulse.visit(pulse.SOURCE, t => aggr.add(t));
|
69 | } else {
|
70 | aggr.value = aggr.value || aggr.init(_);
|
71 | pulse.visit(pulse.REM, t => aggr.rem(t));
|
72 | pulse.visit(pulse.ADD, t => aggr.add(t));
|
73 | }
|
74 |
|
75 |
|
76 | out.modifies(aggr._outputs);
|
77 |
|
78 |
|
79 | aggr._drop = _.drop !== false;
|
80 |
|
81 |
|
82 |
|
83 | if (_.cross && aggr._dims.length > 1) {
|
84 | aggr._drop = false;
|
85 | aggr.cross();
|
86 | }
|
87 |
|
88 | return aggr.changes(out);
|
89 | };
|
90 |
|
91 | prototype.cross = function() {
|
92 | var aggr = this,
|
93 | curr = aggr.value,
|
94 | dims = aggr._dnames,
|
95 | vals = dims.map(function() { return {}; }),
|
96 | n = dims.length;
|
97 |
|
98 |
|
99 | function collect(cells) {
|
100 | var key, i, t, v;
|
101 | for (key in cells) {
|
102 | t = cells[key].tuple;
|
103 | for (i=0; i<n; ++i) {
|
104 | vals[i][(v = t[dims[i]])] = v;
|
105 | }
|
106 | }
|
107 | }
|
108 | collect(aggr._prev);
|
109 | collect(curr);
|
110 |
|
111 |
|
112 | function generate(base, tuple, index) {
|
113 | var name = dims[index],
|
114 | v = vals[index++],
|
115 | k, key;
|
116 |
|
117 | for (k in v) {
|
118 | tuple[name] = v[k];
|
119 | key = base ? base + '|' + k : k;
|
120 | if (index < n) generate(key, tuple, index);
|
121 | else if (!curr[key]) aggr.cell(key, tuple);
|
122 | }
|
123 | }
|
124 | generate('', {}, 0);
|
125 | };
|
126 |
|
127 | prototype.init = function(_) {
|
128 |
|
129 | var inputs = (this._inputs = []),
|
130 | outputs = (this._outputs = []),
|
131 | inputMap = {};
|
132 |
|
133 | function inputVisit(get) {
|
134 | var fields = array(accessorFields(get)),
|
135 | i = 0, n = fields.length, f;
|
136 | for (; i<n; ++i) {
|
137 | if (!inputMap[f=fields[i]]) {
|
138 | inputMap[f] = 1;
|
139 | inputs.push(f);
|
140 | }
|
141 | }
|
142 | }
|
143 |
|
144 |
|
145 | this._dims = array(_.groupby);
|
146 | this._dnames = this._dims.map(function(d) {
|
147 | var dname = accessorName(d);
|
148 | inputVisit(d);
|
149 | outputs.push(dname);
|
150 | return dname;
|
151 | });
|
152 | this.cellkey = _.key ? _.key : groupkey(this._dims);
|
153 |
|
154 |
|
155 | this._countOnly = true;
|
156 | this._counts = [];
|
157 | this._measures = [];
|
158 |
|
159 | var fields = _.fields || [null],
|
160 | ops = _.ops || ['count'],
|
161 | as = _.as || [],
|
162 | n = fields.length,
|
163 | map = {},
|
164 | field, op, m, mname, outname, i;
|
165 |
|
166 | if (n !== ops.length) {
|
167 | error('Unmatched number of fields and aggregate ops.');
|
168 | }
|
169 |
|
170 | for (i=0; i<n; ++i) {
|
171 | field = fields[i];
|
172 | op = ops[i];
|
173 |
|
174 | if (field == null && op !== 'count') {
|
175 | error('Null aggregate field specified.');
|
176 | }
|
177 | mname = accessorName(field);
|
178 | outname = measureName(op, mname, as[i]);
|
179 | outputs.push(outname);
|
180 |
|
181 | if (op === 'count') {
|
182 | this._counts.push(outname);
|
183 | continue;
|
184 | }
|
185 |
|
186 | m = map[mname];
|
187 | if (!m) {
|
188 | inputVisit(field);
|
189 | m = (map[mname] = []);
|
190 | m.field = field;
|
191 | this._measures.push(m);
|
192 | }
|
193 |
|
194 | if (op !== 'count') this._countOnly = false;
|
195 | m.push(createMeasure(op, outname));
|
196 | }
|
197 |
|
198 | this._measures = this._measures.map(function(m) {
|
199 | return compileMeasures(m, m.field);
|
200 | });
|
201 |
|
202 | return {};
|
203 | };
|
204 |
|
205 |
|
206 |
|
207 | prototype.cellkey = groupkey();
|
208 |
|
209 | prototype.cell = function(key, t) {
|
210 | var cell = this.value[key];
|
211 | if (!cell) {
|
212 | cell = this.value[key] = this.newcell(key, t);
|
213 | this._adds[this._alen++] = cell;
|
214 | } else if (cell.num === 0 && this._drop && cell.stamp < this.stamp) {
|
215 | cell.stamp = this.stamp;
|
216 | this._adds[this._alen++] = cell;
|
217 | } else if (cell.stamp < this.stamp) {
|
218 | cell.stamp = this.stamp;
|
219 | this._mods[this._mlen++] = cell;
|
220 | }
|
221 | return cell;
|
222 | };
|
223 |
|
224 | prototype.newcell = function(key, t) {
|
225 | var cell = {
|
226 | key: key,
|
227 | num: 0,
|
228 | agg: null,
|
229 | tuple: this.newtuple(t, this._prev && this._prev[key]),
|
230 | stamp: this.stamp,
|
231 | store: false
|
232 | };
|
233 |
|
234 | if (!this._countOnly) {
|
235 | var measures = this._measures,
|
236 | n = measures.length, i;
|
237 |
|
238 | cell.agg = Array(n);
|
239 | for (i=0; i<n; ++i) {
|
240 | cell.agg[i] = new measures[i](cell);
|
241 | }
|
242 | }
|
243 |
|
244 | if (cell.store) {
|
245 | cell.data = new TupleStore();
|
246 | }
|
247 |
|
248 | return cell;
|
249 | };
|
250 |
|
251 | prototype.newtuple = function(t, p) {
|
252 | var names = this._dnames,
|
253 | dims = this._dims,
|
254 | x = {}, i, n;
|
255 |
|
256 | for (i=0, n=dims.length; i<n; ++i) {
|
257 | x[names[i]] = dims[i](t);
|
258 | }
|
259 |
|
260 | return p ? replace(p.tuple, x) : ingest(x);
|
261 | };
|
262 |
|
263 |
|
264 |
|
265 | prototype.add = function(t) {
|
266 | var key = this.cellkey(t),
|
267 | cell = this.cell(key, t),
|
268 | agg, i, n;
|
269 |
|
270 | cell.num += 1;
|
271 | if (this._countOnly) return;
|
272 |
|
273 | if (cell.store) cell.data.add(t);
|
274 |
|
275 | agg = cell.agg;
|
276 | for (i=0, n=agg.length; i<n; ++i) {
|
277 | agg[i].add(agg[i].get(t), t);
|
278 | }
|
279 | };
|
280 |
|
281 | prototype.rem = function(t) {
|
282 | var key = this.cellkey(t),
|
283 | cell = this.cell(key, t),
|
284 | agg, i, n;
|
285 |
|
286 | cell.num -= 1;
|
287 | if (this._countOnly) return;
|
288 |
|
289 | if (cell.store) cell.data.rem(t);
|
290 |
|
291 | agg = cell.agg;
|
292 | for (i=0, n=agg.length; i<n; ++i) {
|
293 | agg[i].rem(agg[i].get(t), t);
|
294 | }
|
295 | };
|
296 |
|
297 | prototype.celltuple = function(cell) {
|
298 | var tuple = cell.tuple,
|
299 | counts = this._counts,
|
300 | agg, i, n;
|
301 |
|
302 |
|
303 | if (cell.store) {
|
304 | cell.data.values();
|
305 | }
|
306 |
|
307 |
|
308 | for (i=0, n=counts.length; i<n; ++i) {
|
309 | tuple[counts[i]] = cell.num;
|
310 | }
|
311 | if (!this._countOnly) {
|
312 | agg = cell.agg;
|
313 | for (i=0, n=agg.length; i<n; ++i) {
|
314 | agg[i].set(tuple);
|
315 | }
|
316 | }
|
317 |
|
318 | return tuple;
|
319 | };
|
320 |
|
321 | prototype.changes = function(out) {
|
322 | var adds = this._adds,
|
323 | mods = this._mods,
|
324 | prev = this._prev,
|
325 | drop = this._drop,
|
326 | add = out.add,
|
327 | rem = out.rem,
|
328 | mod = out.mod,
|
329 | cell, key, i, n;
|
330 |
|
331 | if (prev) for (key in prev) {
|
332 | cell = prev[key];
|
333 | if (!drop || cell.num) rem.push(cell.tuple);
|
334 | }
|
335 |
|
336 | for (i=0, n=this._alen; i<n; ++i) {
|
337 | add.push(this.celltuple(adds[i]));
|
338 | adds[i] = null;
|
339 | }
|
340 |
|
341 | for (i=0, n=this._mlen; i<n; ++i) {
|
342 | cell = mods[i];
|
343 | (cell.num === 0 && drop ? rem : mod).push(this.celltuple(cell));
|
344 | mods[i] = null;
|
345 | }
|
346 |
|
347 | this._alen = this._mlen = 0;
|
348 | this._prev = null;
|
349 | return out;
|
350 | };
|