1 | import {groupkey} from './util/AggregateKeys';
|
2 | import {ValidAggregateOps, compileMeasures, createMeasure, measureName} from './util/AggregateOps';
|
3 | import TupleStore from './util/TupleStore';
|
4 | import {Transform, ingest, replace} from 'vega-dataflow';
|
5 | import {accessorFields, accessorName, array, error, inherits} from 'vega-util';
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 | export default function Aggregate(params) {
|
21 | Transform.call(this, null, params);
|
22 |
|
23 | this._adds = [];
|
24 | this._mods = [];
|
25 | this._alen = 0;
|
26 | this._mlen = 0;
|
27 | this._drop = true;
|
28 | this._cross = false;
|
29 |
|
30 | this._dims = [];
|
31 | this._dnames = [];
|
32 |
|
33 | this._measures = [];
|
34 | this._countOnly = false;
|
35 | this._counts = null;
|
36 | this._prev = null;
|
37 |
|
38 | this._inputs = null;
|
39 | this._outputs = null;
|
40 | }
|
41 |
|
42 | Aggregate.Definition = {
|
43 | 'type': 'Aggregate',
|
44 | 'metadata': {'generates': true, 'changes': true},
|
45 | 'params': [
|
46 | { 'name': 'groupby', 'type': 'field', 'array': true },
|
47 | { 'name': 'ops', 'type': 'enum', 'array': true, 'values': ValidAggregateOps },
|
48 | { 'name': 'fields', 'type': 'field', 'null': true, 'array': true },
|
49 | { 'name': 'as', 'type': 'string', 'null': true, 'array': true },
|
50 | { 'name': 'drop', 'type': 'boolean', 'default': true },
|
51 | { 'name': 'cross', 'type': 'boolean', 'default': false },
|
52 | { 'name': 'key', 'type': 'field' }
|
53 | ]
|
54 | };
|
55 |
|
56 | inherits(Aggregate, Transform, {
|
57 | transform(_, pulse) {
|
58 | const aggr = this,
|
59 | out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS),
|
60 | mod = _.modified();
|
61 |
|
62 | aggr.stamp = out.stamp;
|
63 |
|
64 | if (aggr.value && (mod || pulse.modified(aggr._inputs, true))) {
|
65 | aggr._prev = aggr.value;
|
66 | aggr.value = mod ? aggr.init(_) : {};
|
67 | pulse.visit(pulse.SOURCE, t => aggr.add(t));
|
68 | } else {
|
69 | aggr.value = aggr.value || aggr.init(_);
|
70 | pulse.visit(pulse.REM, t => aggr.rem(t));
|
71 | pulse.visit(pulse.ADD, t => aggr.add(t));
|
72 | }
|
73 |
|
74 |
|
75 | out.modifies(aggr._outputs);
|
76 |
|
77 |
|
78 | aggr._drop = _.drop !== false;
|
79 |
|
80 |
|
81 |
|
82 | if (_.cross && aggr._dims.length > 1) {
|
83 | aggr._drop = false;
|
84 | aggr.cross();
|
85 | }
|
86 |
|
87 | if (pulse.clean() && aggr._drop) {
|
88 | out.clean(true).runAfter(() => this.clean());
|
89 | }
|
90 |
|
91 | return aggr.changes(out);
|
92 | },
|
93 |
|
94 | cross() {
|
95 | const aggr = this,
|
96 | curr = aggr.value,
|
97 | dims = aggr._dnames,
|
98 | vals = dims.map(() => ({})),
|
99 | n = dims.length;
|
100 |
|
101 |
|
102 | function collect(cells) {
|
103 | let key, i, t, v;
|
104 | for (key in cells) {
|
105 | t = cells[key].tuple;
|
106 | for (i=0; i<n; ++i) {
|
107 | vals[i][(v = t[dims[i]])] = v;
|
108 | }
|
109 | }
|
110 | }
|
111 | collect(aggr._prev);
|
112 | collect(curr);
|
113 |
|
114 |
|
115 | function generate(base, tuple, index) {
|
116 | const name = dims[index],
|
117 | v = vals[index++];
|
118 |
|
119 | for (const k in v) {
|
120 | const key = base ? base + '|' + k : k;
|
121 | tuple[name] = v[k];
|
122 | if (index < n) generate(key, tuple, index);
|
123 | else if (!curr[key]) aggr.cell(key, tuple);
|
124 | }
|
125 | }
|
126 | generate('', {}, 0);
|
127 | },
|
128 |
|
129 | init(_) {
|
130 |
|
131 | const inputs = (this._inputs = []),
|
132 | outputs = (this._outputs = []),
|
133 | inputMap = {};
|
134 |
|
135 | function inputVisit(get) {
|
136 | const fields = array(accessorFields(get)),
|
137 | n = fields.length;
|
138 | let i = 0, f;
|
139 | for (; i<n; ++i) {
|
140 | if (!inputMap[f=fields[i]]) {
|
141 | inputMap[f] = 1;
|
142 | inputs.push(f);
|
143 | }
|
144 | }
|
145 | }
|
146 |
|
147 |
|
148 | this._dims = array(_.groupby);
|
149 | this._dnames = this._dims.map(d => {
|
150 | const dname = accessorName(d);
|
151 | inputVisit(d);
|
152 | outputs.push(dname);
|
153 | return dname;
|
154 | });
|
155 | this.cellkey = _.key ? _.key : groupkey(this._dims);
|
156 |
|
157 |
|
158 | this._countOnly = true;
|
159 | this._counts = [];
|
160 | this._measures = [];
|
161 |
|
162 | const fields = _.fields || [null],
|
163 | ops = _.ops || ['count'],
|
164 | as = _.as || [],
|
165 | n = fields.length,
|
166 | map = {};
|
167 | let field, op, m, mname, outname, i;
|
168 |
|
169 | if (n !== ops.length) {
|
170 | error('Unmatched number of fields and aggregate ops.');
|
171 | }
|
172 |
|
173 | for (i=0; i<n; ++i) {
|
174 | field = fields[i];
|
175 | op = ops[i];
|
176 |
|
177 | if (field == null && op !== 'count') {
|
178 | error('Null aggregate field specified.');
|
179 | }
|
180 | mname = accessorName(field);
|
181 | outname = measureName(op, mname, as[i]);
|
182 | outputs.push(outname);
|
183 |
|
184 | if (op === 'count') {
|
185 | this._counts.push(outname);
|
186 | continue;
|
187 | }
|
188 |
|
189 | m = map[mname];
|
190 | if (!m) {
|
191 | inputVisit(field);
|
192 | m = (map[mname] = []);
|
193 | m.field = field;
|
194 | this._measures.push(m);
|
195 | }
|
196 |
|
197 | if (op !== 'count') this._countOnly = false;
|
198 | m.push(createMeasure(op, outname));
|
199 | }
|
200 |
|
201 | this._measures = this._measures.map(m => compileMeasures(m, m.field));
|
202 |
|
203 | return {};
|
204 | },
|
205 |
|
206 |
|
207 |
|
208 | cellkey: groupkey(),
|
209 |
|
210 | cell(key, t) {
|
211 | let cell = this.value[key];
|
212 | if (!cell) {
|
213 | cell = this.value[key] = this.newcell(key, t);
|
214 | this._adds[this._alen++] = cell;
|
215 | } else if (cell.num === 0 && this._drop && cell.stamp < this.stamp) {
|
216 | cell.stamp = this.stamp;
|
217 | this._adds[this._alen++] = cell;
|
218 | } else if (cell.stamp < this.stamp) {
|
219 | cell.stamp = this.stamp;
|
220 | this._mods[this._mlen++] = cell;
|
221 | }
|
222 | return cell;
|
223 | },
|
224 |
|
225 | newcell(key, t) {
|
226 | const cell = {
|
227 | key: key,
|
228 | num: 0,
|
229 | agg: null,
|
230 | tuple: this.newtuple(t, this._prev && this._prev[key]),
|
231 | stamp: this.stamp,
|
232 | store: false
|
233 | };
|
234 |
|
235 | if (!this._countOnly) {
|
236 | const measures = this._measures,
|
237 | n = measures.length;
|
238 |
|
239 | cell.agg = Array(n);
|
240 | for (let i=0; i<n; ++i) {
|
241 | cell.agg[i] = new measures[i](cell);
|
242 | }
|
243 | }
|
244 |
|
245 | if (cell.store) {
|
246 | cell.data = new TupleStore();
|
247 | }
|
248 |
|
249 | return cell;
|
250 | },
|
251 |
|
252 | newtuple(t, p) {
|
253 | const names = this._dnames,
|
254 | dims = this._dims,
|
255 | n = dims.length,
|
256 | x = {};
|
257 |
|
258 | for (let i=0; i<n; ++i) {
|
259 | x[names[i]] = dims[i](t);
|
260 | }
|
261 |
|
262 | return p ? replace(p.tuple, x) : ingest(x);
|
263 | },
|
264 |
|
265 | clean() {
|
266 | const cells = this.value;
|
267 | for (const key in cells) {
|
268 | if (cells[key].num === 0) {
|
269 | delete cells[key];
|
270 | }
|
271 | }
|
272 | },
|
273 |
|
274 |
|
275 |
|
276 | add(t) {
|
277 | const key = this.cellkey(t),
|
278 | cell = this.cell(key, t);
|
279 |
|
280 | cell.num += 1;
|
281 | if (this._countOnly) return;
|
282 |
|
283 | if (cell.store) cell.data.add(t);
|
284 |
|
285 | const agg = cell.agg;
|
286 | for (let i=0, n=agg.length; i<n; ++i) {
|
287 | agg[i].add(agg[i].get(t), t);
|
288 | }
|
289 | },
|
290 |
|
291 | rem(t) {
|
292 | const key = this.cellkey(t),
|
293 | cell = this.cell(key, t);
|
294 |
|
295 | cell.num -= 1;
|
296 | if (this._countOnly) return;
|
297 |
|
298 | if (cell.store) cell.data.rem(t);
|
299 |
|
300 | const agg = cell.agg;
|
301 | for (let i=0, n=agg.length; i<n; ++i) {
|
302 | agg[i].rem(agg[i].get(t), t);
|
303 | }
|
304 | },
|
305 |
|
306 | celltuple(cell) {
|
307 | const tuple = cell.tuple,
|
308 | counts = this._counts;
|
309 |
|
310 |
|
311 | if (cell.store) {
|
312 | cell.data.values();
|
313 | }
|
314 |
|
315 |
|
316 | for (let i=0, n=counts.length; i<n; ++i) {
|
317 | tuple[counts[i]] = cell.num;
|
318 | }
|
319 | if (!this._countOnly) {
|
320 | const agg = cell.agg;
|
321 | for (let i=0, n=agg.length; i<n; ++i) {
|
322 | agg[i].set(tuple);
|
323 | }
|
324 | }
|
325 |
|
326 | return tuple;
|
327 | },
|
328 |
|
329 | changes(out) {
|
330 | const adds = this._adds,
|
331 | mods = this._mods,
|
332 | prev = this._prev,
|
333 | drop = this._drop,
|
334 | add = out.add,
|
335 | rem = out.rem,
|
336 | mod = out.mod;
|
337 |
|
338 | let cell, key, i, n;
|
339 |
|
340 | if (prev) for (key in prev) {
|
341 | cell = prev[key];
|
342 | if (!drop || cell.num) rem.push(cell.tuple);
|
343 | }
|
344 |
|
345 | for (i=0, n=this._alen; i<n; ++i) {
|
346 | add.push(this.celltuple(adds[i]));
|
347 | adds[i] = null;
|
348 | }
|
349 |
|
350 | for (i=0, n=this._mlen; i<n; ++i) {
|
351 | cell = mods[i];
|
352 | (cell.num === 0 && drop ? rem : mod).push(this.celltuple(cell));
|
353 | mods[i] = null;
|
354 | }
|
355 |
|
356 | this._alen = this._mlen = 0;
|
357 | this._prev = null;
|
358 | return out;
|
359 | }
|
360 | });
|