UNPKG

101 kBJavaScriptView Raw
1import { extend, identity, field, hasOwnProperty, extentIndex, inherits, array, accessorName, error, accessorFields, accessor, toNumber, merge, compare, truthy, extent, span, fastmap, isArray, key, ascending, peek, zero, constant } from 'vega-util';
2import { tupleid, Transform, replace, ingest, stableCompare, Operator, derive, rederive } from 'vega-dataflow';
3import { quartiles, bootstrapCI, bin, randomKDE, randomMixture, randomNormal, randomLogNormal, randomUniform, sampleCurve, dotbin, quantiles, random } from 'vega-statistics';
4import { median, mean, min, max, range, bisector } from 'd3-array';
5import { TIME_UNITS, utcInterval, timeInterval, timeBin, timeUnits, utcFloor, timeFloor } from 'vega-time';
6
7function multikey(f) {
8 return x => {
9 const n = f.length;
10 let i = 1,
11 k = String(f[0](x));
12 for (; i < n; ++i) {
13 k += '|' + f[i](x);
14 }
15 return k;
16 };
17}
18function groupkey(fields) {
19 return !fields || !fields.length ? function () {
20 return '';
21 } : fields.length === 1 ? fields[0] : multikey(fields);
22}
23
24function measureName(op, field, as) {
25 return as || op + (!field ? '' : '_' + field);
26}
27const noop = () => {};
28const base_op = {
29 init: noop,
30 add: noop,
31 rem: noop,
32 idx: 0
33};
34const AggregateOps = {
35 values: {
36 init: m => m.cell.store = true,
37 value: m => m.cell.data.values(),
38 idx: -1
39 },
40 count: {
41 value: m => m.cell.num
42 },
43 __count__: {
44 value: m => m.missing + m.valid
45 },
46 missing: {
47 value: m => m.missing
48 },
49 valid: {
50 value: m => m.valid
51 },
52 sum: {
53 init: m => m.sum = 0,
54 value: m => m.sum,
55 add: (m, v) => m.sum += +v,
56 rem: (m, v) => m.sum -= v
57 },
58 product: {
59 init: m => m.product = 1,
60 value: m => m.valid ? m.product : undefined,
61 add: (m, v) => m.product *= v,
62 rem: (m, v) => m.product /= v
63 },
64 mean: {
65 init: m => m.mean = 0,
66 value: m => m.valid ? m.mean : undefined,
67 add: (m, v) => (m.mean_d = v - m.mean, m.mean += m.mean_d / m.valid),
68 rem: (m, v) => (m.mean_d = v - m.mean, m.mean -= m.valid ? m.mean_d / m.valid : m.mean)
69 },
70 average: {
71 value: m => m.valid ? m.mean : undefined,
72 req: ['mean'],
73 idx: 1
74 },
75 variance: {
76 init: m => m.dev = 0,
77 value: m => m.valid > 1 ? m.dev / (m.valid - 1) : undefined,
78 add: (m, v) => m.dev += m.mean_d * (v - m.mean),
79 rem: (m, v) => m.dev -= m.mean_d * (v - m.mean),
80 req: ['mean'],
81 idx: 1
82 },
83 variancep: {
84 value: m => m.valid > 1 ? m.dev / m.valid : undefined,
85 req: ['variance'],
86 idx: 2
87 },
88 stdev: {
89 value: m => m.valid > 1 ? Math.sqrt(m.dev / (m.valid - 1)) : undefined,
90 req: ['variance'],
91 idx: 2
92 },
93 stdevp: {
94 value: m => m.valid > 1 ? Math.sqrt(m.dev / m.valid) : undefined,
95 req: ['variance'],
96 idx: 2
97 },
98 stderr: {
99 value: m => m.valid > 1 ? Math.sqrt(m.dev / (m.valid * (m.valid - 1))) : undefined,
100 req: ['variance'],
101 idx: 2
102 },
103 distinct: {
104 value: m => m.cell.data.distinct(m.get),
105 req: ['values'],
106 idx: 3
107 },
108 ci0: {
109 value: m => m.cell.data.ci0(m.get),
110 req: ['values'],
111 idx: 3
112 },
113 ci1: {
114 value: m => m.cell.data.ci1(m.get),
115 req: ['values'],
116 idx: 3
117 },
118 median: {
119 value: m => m.cell.data.q2(m.get),
120 req: ['values'],
121 idx: 3
122 },
123 q1: {
124 value: m => m.cell.data.q1(m.get),
125 req: ['values'],
126 idx: 3
127 },
128 q3: {
129 value: m => m.cell.data.q3(m.get),
130 req: ['values'],
131 idx: 3
132 },
133 min: {
134 init: m => m.min = undefined,
135 value: m => m.min = Number.isNaN(m.min) ? m.cell.data.min(m.get) : m.min,
136 add: (m, v) => {
137 if (v < m.min || m.min === undefined) m.min = v;
138 },
139 rem: (m, v) => {
140 if (v <= m.min) m.min = NaN;
141 },
142 req: ['values'],
143 idx: 4
144 },
145 max: {
146 init: m => m.max = undefined,
147 value: m => m.max = Number.isNaN(m.max) ? m.cell.data.max(m.get) : m.max,
148 add: (m, v) => {
149 if (v > m.max || m.max === undefined) m.max = v;
150 },
151 rem: (m, v) => {
152 if (v >= m.max) m.max = NaN;
153 },
154 req: ['values'],
155 idx: 4
156 },
157 argmin: {
158 init: m => m.argmin = undefined,
159 value: m => m.argmin || m.cell.data.argmin(m.get),
160 add: (m, v, t) => {
161 if (v < m.min) m.argmin = t;
162 },
163 rem: (m, v) => {
164 if (v <= m.min) m.argmin = undefined;
165 },
166 req: ['min', 'values'],
167 idx: 3
168 },
169 argmax: {
170 init: m => m.argmax = undefined,
171 value: m => m.argmax || m.cell.data.argmax(m.get),
172 add: (m, v, t) => {
173 if (v > m.max) m.argmax = t;
174 },
175 rem: (m, v) => {
176 if (v >= m.max) m.argmax = undefined;
177 },
178 req: ['max', 'values'],
179 idx: 3
180 }
181};
182const ValidAggregateOps = Object.keys(AggregateOps).filter(d => d !== '__count__');
183function measure(key, value) {
184 return out => extend({
185 name: key,
186 out: out || key
187 }, base_op, value);
188}
189[...ValidAggregateOps, '__count__'].forEach(key => {
190 AggregateOps[key] = measure(key, AggregateOps[key]);
191});
192function createMeasure(op, name) {
193 return AggregateOps[op](name);
194}
195function compareIndex(a, b) {
196 return a.idx - b.idx;
197}
198function resolve(agg) {
199 const map = {};
200 agg.forEach(a => map[a.name] = a);
201 const getreqs = a => {
202 if (!a.req) return;
203 a.req.forEach(key => {
204 if (!map[key]) getreqs(map[key] = AggregateOps[key]());
205 });
206 };
207 agg.forEach(getreqs);
208 return Object.values(map).sort(compareIndex);
209}
210function init() {
211 this.valid = 0;
212 this.missing = 0;
213 this._ops.forEach(op => op.init(this));
214}
215function add(v, t) {
216 if (v == null || v === '') {
217 ++this.missing;
218 return;
219 }
220 if (v !== v) return;
221 ++this.valid;
222 this._ops.forEach(op => op.add(this, v, t));
223}
224function rem(v, t) {
225 if (v == null || v === '') {
226 --this.missing;
227 return;
228 }
229 if (v !== v) return;
230 --this.valid;
231 this._ops.forEach(op => op.rem(this, v, t));
232}
233function set(t) {
234 this._out.forEach(op => t[op.out] = op.value(this));
235 return t;
236}
237function compileMeasures(agg, field) {
238 const get = field || identity,
239 ops = resolve(agg),
240 out = agg.slice().sort(compareIndex);
241 function ctr(cell) {
242 this._ops = ops;
243 this._out = out;
244 this.cell = cell;
245 this.init();
246 }
247 ctr.prototype.init = init;
248 ctr.prototype.add = add;
249 ctr.prototype.rem = rem;
250 ctr.prototype.set = set;
251 ctr.prototype.get = get;
252 ctr.fields = agg.map(op => op.out);
253 return ctr;
254}
255
256function TupleStore(key) {
257 this._key = key ? field(key) : tupleid;
258 this.reset();
259}
260const prototype$1 = TupleStore.prototype;
261prototype$1.reset = function () {
262 this._add = [];
263 this._rem = [];
264 this._ext = null;
265 this._get = null;
266 this._q = null;
267};
268prototype$1.add = function (v) {
269 this._add.push(v);
270};
271prototype$1.rem = function (v) {
272 this._rem.push(v);
273};
274prototype$1.values = function () {
275 this._get = null;
276 if (this._rem.length === 0) return this._add;
277 const a = this._add,
278 r = this._rem,
279 k = this._key,
280 n = a.length,
281 m = r.length,
282 x = Array(n - m),
283 map = {};
284 let i, j, v;
285
286 // use unique key field to clear removed values
287 for (i = 0; i < m; ++i) {
288 map[k(r[i])] = 1;
289 }
290 for (i = 0, j = 0; i < n; ++i) {
291 if (map[k(v = a[i])]) {
292 map[k(v)] = 0;
293 } else {
294 x[j++] = v;
295 }
296 }
297 this._rem = [];
298 return this._add = x;
299};
300
301// memoizing statistics methods
302
303prototype$1.distinct = function (get) {
304 const v = this.values(),
305 map = {};
306 let n = v.length,
307 count = 0,
308 s;
309 while (--n >= 0) {
310 s = get(v[n]) + '';
311 if (!hasOwnProperty(map, s)) {
312 map[s] = 1;
313 ++count;
314 }
315 }
316 return count;
317};
318prototype$1.extent = function (get) {
319 if (this._get !== get || !this._ext) {
320 const v = this.values(),
321 i = extentIndex(v, get);
322 this._ext = [v[i[0]], v[i[1]]];
323 this._get = get;
324 }
325 return this._ext;
326};
327prototype$1.argmin = function (get) {
328 return this.extent(get)[0] || {};
329};
330prototype$1.argmax = function (get) {
331 return this.extent(get)[1] || {};
332};
333prototype$1.min = function (get) {
334 const m = this.extent(get)[0];
335 return m != null ? get(m) : undefined;
336};
337prototype$1.max = function (get) {
338 const m = this.extent(get)[1];
339 return m != null ? get(m) : undefined;
340};
341prototype$1.quartile = function (get) {
342 if (this._get !== get || !this._q) {
343 this._q = quartiles(this.values(), get);
344 this._get = get;
345 }
346 return this._q;
347};
348prototype$1.q1 = function (get) {
349 return this.quartile(get)[0];
350};
351prototype$1.q2 = function (get) {
352 return this.quartile(get)[1];
353};
354prototype$1.q3 = function (get) {
355 return this.quartile(get)[2];
356};
357prototype$1.ci = function (get) {
358 if (this._get !== get || !this._ci) {
359 this._ci = bootstrapCI(this.values(), 1000, 0.05, get);
360 this._get = get;
361 }
362 return this._ci;
363};
364prototype$1.ci0 = function (get) {
365 return this.ci(get)[0];
366};
367prototype$1.ci1 = function (get) {
368 return this.ci(get)[1];
369};
370
371/**
372 * Group-by aggregation operator.
373 * @constructor
374 * @param {object} params - The parameters for this operator.
375 * @param {Array<function(object): *>} [params.groupby] - An array of accessors to groupby.
376 * @param {Array<function(object): *>} [params.fields] - An array of accessors to aggregate.
377 * @param {Array<string>} [params.ops] - An array of strings indicating aggregation operations.
378 * @param {Array<string>} [params.as] - An array of output field names for aggregated values.
379 * @param {boolean} [params.cross=false] - A flag indicating that the full
380 * cross-product of groupby values should be generated, including empty cells.
381 * If true, the drop parameter is ignored and empty cells are retained.
382 * @param {boolean} [params.drop=true] - A flag indicating if empty cells should be removed.
383 */
384function Aggregate(params) {
385 Transform.call(this, null, params);
386 this._adds = []; // array of added output tuples
387 this._mods = []; // array of modified output tuples
388 this._alen = 0; // number of active added tuples
389 this._mlen = 0; // number of active modified tuples
390 this._drop = true; // should empty aggregation cells be removed
391 this._cross = false; // produce full cross-product of group-by values
392
393 this._dims = []; // group-by dimension accessors
394 this._dnames = []; // group-by dimension names
395
396 this._measures = []; // collection of aggregation monoids
397 this._countOnly = false; // flag indicating only count aggregation
398 this._counts = null; // collection of count fields
399 this._prev = null; // previous aggregation cells
400
401 this._inputs = null; // array of dependent input tuple field names
402 this._outputs = null; // array of output tuple field names
403}
404
405Aggregate.Definition = {
406 'type': 'Aggregate',
407 'metadata': {
408 'generates': true,
409 'changes': true
410 },
411 'params': [{
412 'name': 'groupby',
413 'type': 'field',
414 'array': true
415 }, {
416 'name': 'ops',
417 'type': 'enum',
418 'array': true,
419 'values': ValidAggregateOps
420 }, {
421 'name': 'fields',
422 'type': 'field',
423 'null': true,
424 'array': true
425 }, {
426 'name': 'as',
427 'type': 'string',
428 'null': true,
429 'array': true
430 }, {
431 'name': 'drop',
432 'type': 'boolean',
433 'default': true
434 }, {
435 'name': 'cross',
436 'type': 'boolean',
437 'default': false
438 }, {
439 'name': 'key',
440 'type': 'field'
441 }]
442};
443inherits(Aggregate, Transform, {
444 transform(_, pulse) {
445 const aggr = this,
446 out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS),
447 mod = _.modified();
448 aggr.stamp = out.stamp;
449 if (aggr.value && (mod || pulse.modified(aggr._inputs, true))) {
450 aggr._prev = aggr.value;
451 aggr.value = mod ? aggr.init(_) : {};
452 pulse.visit(pulse.SOURCE, t => aggr.add(t));
453 } else {
454 aggr.value = aggr.value || aggr.init(_);
455 pulse.visit(pulse.REM, t => aggr.rem(t));
456 pulse.visit(pulse.ADD, t => aggr.add(t));
457 }
458
459 // Indicate output fields and return aggregate tuples.
460 out.modifies(aggr._outputs);
461
462 // Should empty cells be dropped?
463 aggr._drop = _.drop !== false;
464
465 // If domain cross-product requested, generate empty cells as needed
466 // and ensure that empty cells are not dropped
467 if (_.cross && aggr._dims.length > 1) {
468 aggr._drop = false;
469 aggr.cross();
470 }
471 if (pulse.clean() && aggr._drop) {
472 out.clean(true).runAfter(() => this.clean());
473 }
474 return aggr.changes(out);
475 },
476 cross() {
477 const aggr = this,
478 curr = aggr.value,
479 dims = aggr._dnames,
480 vals = dims.map(() => ({})),
481 n = dims.length;
482
483 // collect all group-by domain values
484 function collect(cells) {
485 let key, i, t, v;
486 for (key in cells) {
487 t = cells[key].tuple;
488 for (i = 0; i < n; ++i) {
489 vals[i][v = t[dims[i]]] = v;
490 }
491 }
492 }
493 collect(aggr._prev);
494 collect(curr);
495
496 // iterate over key cross-product, create cells as needed
497 function generate(base, tuple, index) {
498 const name = dims[index],
499 v = vals[index++];
500 for (const k in v) {
501 const key = base ? base + '|' + k : k;
502 tuple[name] = v[k];
503 if (index < n) generate(key, tuple, index);else if (!curr[key]) aggr.cell(key, tuple);
504 }
505 }
506 generate('', {}, 0);
507 },
508 init(_) {
509 // initialize input and output fields
510 const inputs = this._inputs = [],
511 outputs = this._outputs = [],
512 inputMap = {};
513 function inputVisit(get) {
514 const fields = array(accessorFields(get)),
515 n = fields.length;
516 let i = 0,
517 f;
518 for (; i < n; ++i) {
519 if (!inputMap[f = fields[i]]) {
520 inputMap[f] = 1;
521 inputs.push(f);
522 }
523 }
524 }
525
526 // initialize group-by dimensions
527 this._dims = array(_.groupby);
528 this._dnames = this._dims.map(d => {
529 const dname = accessorName(d);
530 inputVisit(d);
531 outputs.push(dname);
532 return dname;
533 });
534 this.cellkey = _.key ? _.key : groupkey(this._dims);
535
536 // initialize aggregate measures
537 this._countOnly = true;
538 this._counts = [];
539 this._measures = [];
540 const fields = _.fields || [null],
541 ops = _.ops || ['count'],
542 as = _.as || [],
543 n = fields.length,
544 map = {};
545 let field, op, m, mname, outname, i;
546 if (n !== ops.length) {
547 error('Unmatched number of fields and aggregate ops.');
548 }
549 for (i = 0; i < n; ++i) {
550 field = fields[i];
551 op = ops[i];
552 if (field == null && op !== 'count') {
553 error('Null aggregate field specified.');
554 }
555 mname = accessorName(field);
556 outname = measureName(op, mname, as[i]);
557 outputs.push(outname);
558 if (op === 'count') {
559 this._counts.push(outname);
560 continue;
561 }
562 m = map[mname];
563 if (!m) {
564 inputVisit(field);
565 m = map[mname] = [];
566 m.field = field;
567 this._measures.push(m);
568 }
569 if (op !== 'count') this._countOnly = false;
570 m.push(createMeasure(op, outname));
571 }
572 this._measures = this._measures.map(m => compileMeasures(m, m.field));
573 return {}; // aggregation cells (this.value)
574 },
575
576 // -- Cell Management -----
577
578 cellkey: groupkey(),
579 cell(key, t) {
580 let cell = this.value[key];
581 if (!cell) {
582 cell = this.value[key] = this.newcell(key, t);
583 this._adds[this._alen++] = cell;
584 } else if (cell.num === 0 && this._drop && cell.stamp < this.stamp) {
585 cell.stamp = this.stamp;
586 this._adds[this._alen++] = cell;
587 } else if (cell.stamp < this.stamp) {
588 cell.stamp = this.stamp;
589 this._mods[this._mlen++] = cell;
590 }
591 return cell;
592 },
593 newcell(key, t) {
594 const cell = {
595 key: key,
596 num: 0,
597 agg: null,
598 tuple: this.newtuple(t, this._prev && this._prev[key]),
599 stamp: this.stamp,
600 store: false
601 };
602 if (!this._countOnly) {
603 const measures = this._measures,
604 n = measures.length;
605 cell.agg = Array(n);
606 for (let i = 0; i < n; ++i) {
607 cell.agg[i] = new measures[i](cell);
608 }
609 }
610 if (cell.store) {
611 cell.data = new TupleStore();
612 }
613 return cell;
614 },
615 newtuple(t, p) {
616 const names = this._dnames,
617 dims = this._dims,
618 n = dims.length,
619 x = {};
620 for (let i = 0; i < n; ++i) {
621 x[names[i]] = dims[i](t);
622 }
623 return p ? replace(p.tuple, x) : ingest(x);
624 },
625 clean() {
626 const cells = this.value;
627 for (const key in cells) {
628 if (cells[key].num === 0) {
629 delete cells[key];
630 }
631 }
632 },
633 // -- Process Tuples -----
634
635 add(t) {
636 const key = this.cellkey(t),
637 cell = this.cell(key, t);
638 cell.num += 1;
639 if (this._countOnly) return;
640 if (cell.store) cell.data.add(t);
641 const agg = cell.agg;
642 for (let i = 0, n = agg.length; i < n; ++i) {
643 agg[i].add(agg[i].get(t), t);
644 }
645 },
646 rem(t) {
647 const key = this.cellkey(t),
648 cell = this.cell(key, t);
649 cell.num -= 1;
650 if (this._countOnly) return;
651 if (cell.store) cell.data.rem(t);
652 const agg = cell.agg;
653 for (let i = 0, n = agg.length; i < n; ++i) {
654 agg[i].rem(agg[i].get(t), t);
655 }
656 },
657 celltuple(cell) {
658 const tuple = cell.tuple,
659 counts = this._counts;
660
661 // consolidate stored values
662 if (cell.store) {
663 cell.data.values();
664 }
665
666 // update tuple properties
667 for (let i = 0, n = counts.length; i < n; ++i) {
668 tuple[counts[i]] = cell.num;
669 }
670 if (!this._countOnly) {
671 const agg = cell.agg;
672 for (let i = 0, n = agg.length; i < n; ++i) {
673 agg[i].set(tuple);
674 }
675 }
676 return tuple;
677 },
678 changes(out) {
679 const adds = this._adds,
680 mods = this._mods,
681 prev = this._prev,
682 drop = this._drop,
683 add = out.add,
684 rem = out.rem,
685 mod = out.mod;
686 let cell, key, i, n;
687 if (prev) for (key in prev) {
688 cell = prev[key];
689 if (!drop || cell.num) rem.push(cell.tuple);
690 }
691 for (i = 0, n = this._alen; i < n; ++i) {
692 add.push(this.celltuple(adds[i]));
693 adds[i] = null; // for garbage collection
694 }
695
696 for (i = 0, n = this._mlen; i < n; ++i) {
697 cell = mods[i];
698 (cell.num === 0 && drop ? rem : mod).push(this.celltuple(cell));
699 mods[i] = null; // for garbage collection
700 }
701
702 this._alen = this._mlen = 0; // reset list of active cells
703 this._prev = null;
704 return out;
705 }
706});
707
708// epsilon bias to offset floating point error (#1737)
709const EPSILON$1 = 1e-14;
710
711/**
712 * Generates a binning function for discretizing data.
713 * @constructor
714 * @param {object} params - The parameters for this operator. The
715 * provided values should be valid options for the {@link bin} function.
716 * @param {function(object): *} params.field - The data field to bin.
717 */
718function Bin(params) {
719 Transform.call(this, null, params);
720}
721Bin.Definition = {
722 'type': 'Bin',
723 'metadata': {
724 'modifies': true
725 },
726 'params': [{
727 'name': 'field',
728 'type': 'field',
729 'required': true
730 }, {
731 'name': 'interval',
732 'type': 'boolean',
733 'default': true
734 }, {
735 'name': 'anchor',
736 'type': 'number'
737 }, {
738 'name': 'maxbins',
739 'type': 'number',
740 'default': 20
741 }, {
742 'name': 'base',
743 'type': 'number',
744 'default': 10
745 }, {
746 'name': 'divide',
747 'type': 'number',
748 'array': true,
749 'default': [5, 2]
750 }, {
751 'name': 'extent',
752 'type': 'number',
753 'array': true,
754 'length': 2,
755 'required': true
756 }, {
757 'name': 'span',
758 'type': 'number'
759 }, {
760 'name': 'step',
761 'type': 'number'
762 }, {
763 'name': 'steps',
764 'type': 'number',
765 'array': true
766 }, {
767 'name': 'minstep',
768 'type': 'number',
769 'default': 0
770 }, {
771 'name': 'nice',
772 'type': 'boolean',
773 'default': true
774 }, {
775 'name': 'name',
776 'type': 'string'
777 }, {
778 'name': 'as',
779 'type': 'string',
780 'array': true,
781 'length': 2,
782 'default': ['bin0', 'bin1']
783 }]
784};
785inherits(Bin, Transform, {
786 transform(_, pulse) {
787 const band = _.interval !== false,
788 bins = this._bins(_),
789 start = bins.start,
790 step = bins.step,
791 as = _.as || ['bin0', 'bin1'],
792 b0 = as[0],
793 b1 = as[1];
794 let flag;
795 if (_.modified()) {
796 pulse = pulse.reflow(true);
797 flag = pulse.SOURCE;
798 } else {
799 flag = pulse.modified(accessorFields(_.field)) ? pulse.ADD_MOD : pulse.ADD;
800 }
801 pulse.visit(flag, band ? t => {
802 const v = bins(t);
803 // minimum bin value (inclusive)
804 t[b0] = v;
805 // maximum bin value (exclusive)
806 // use convoluted math for better floating point agreement
807 // see https://github.com/vega/vega/issues/830
808 // infinite values propagate through this formula! #2227
809 t[b1] = v == null ? null : start + step * (1 + (v - start) / step);
810 } : t => t[b0] = bins(t));
811 return pulse.modifies(band ? as : b0);
812 },
813 _bins(_) {
814 if (this.value && !_.modified()) {
815 return this.value;
816 }
817 const field = _.field,
818 bins = bin(_),
819 step = bins.step;
820 let start = bins.start,
821 stop = start + Math.ceil((bins.stop - start) / step) * step,
822 a,
823 d;
824 if ((a = _.anchor) != null) {
825 d = a - (start + step * Math.floor((a - start) / step));
826 start += d;
827 stop += d;
828 }
829 const f = function (t) {
830 let v = toNumber(field(t));
831 return v == null ? null : v < start ? -Infinity : v > stop ? +Infinity : (v = Math.max(start, Math.min(v, stop - step)), start + step * Math.floor(EPSILON$1 + (v - start) / step));
832 };
833 f.start = start;
834 f.stop = bins.stop;
835 f.step = step;
836 return this.value = accessor(f, accessorFields(field), _.name || 'bin_' + accessorName(field));
837 }
838});
839
840function SortedList (idFunc, source, input) {
841 const $ = idFunc;
842 let data = source || [],
843 add = input || [],
844 rem = {},
845 cnt = 0;
846 return {
847 add: t => add.push(t),
848 remove: t => rem[$(t)] = ++cnt,
849 size: () => data.length,
850 data: (compare, resort) => {
851 if (cnt) {
852 data = data.filter(t => !rem[$(t)]);
853 rem = {};
854 cnt = 0;
855 }
856 if (resort && compare) {
857 data.sort(compare);
858 }
859 if (add.length) {
860 data = compare ? merge(compare, data, add.sort(compare)) : data.concat(add);
861 add = [];
862 }
863 return data;
864 }
865 };
866}
867
868/**
869 * Collects all data tuples that pass through this operator.
870 * @constructor
871 * @param {object} params - The parameters for this operator.
872 * @param {function(*,*): number} [params.sort] - An optional
873 * comparator function for additionally sorting the collected tuples.
874 */
875function Collect(params) {
876 Transform.call(this, [], params);
877}
878Collect.Definition = {
879 'type': 'Collect',
880 'metadata': {
881 'source': true
882 },
883 'params': [{
884 'name': 'sort',
885 'type': 'compare'
886 }]
887};
888inherits(Collect, Transform, {
889 transform(_, pulse) {
890 const out = pulse.fork(pulse.ALL),
891 list = SortedList(tupleid, this.value, out.materialize(out.ADD).add),
892 sort = _.sort,
893 mod = pulse.changed() || sort && (_.modified('sort') || pulse.modified(sort.fields));
894 out.visit(out.REM, list.remove);
895 this.modified(mod);
896 this.value = out.source = list.data(stableCompare(sort), mod);
897
898 // propagate tree root if defined
899 if (pulse.source && pulse.source.root) {
900 this.value.root = pulse.source.root;
901 }
902 return out;
903 }
904});
905
906/**
907 * Generates a comparator function.
908 * @constructor
909 * @param {object} params - The parameters for this operator.
910 * @param {Array<string|function>} params.fields - The fields to compare.
911 * @param {Array<string>} [params.orders] - The sort orders.
912 * Each entry should be one of "ascending" (default) or "descending".
913 */
914function Compare(params) {
915 Operator.call(this, null, update$5, params);
916}
917inherits(Compare, Operator);
918function update$5(_) {
919 return this.value && !_.modified() ? this.value : compare(_.fields, _.orders);
920}
921
922/**
923 * Count regexp-defined pattern occurrences in a text field.
924 * @constructor
925 * @param {object} params - The parameters for this operator.
926 * @param {function(object): *} params.field - An accessor for the text field.
927 * @param {string} [params.pattern] - RegExp string defining the text pattern.
928 * @param {string} [params.case] - One of 'lower', 'upper' or null (mixed) case.
929 * @param {string} [params.stopwords] - RegExp string of words to ignore.
930 */
931function CountPattern(params) {
932 Transform.call(this, null, params);
933}
934CountPattern.Definition = {
935 'type': 'CountPattern',
936 'metadata': {
937 'generates': true,
938 'changes': true
939 },
940 'params': [{
941 'name': 'field',
942 'type': 'field',
943 'required': true
944 }, {
945 'name': 'case',
946 'type': 'enum',
947 'values': ['upper', 'lower', 'mixed'],
948 'default': 'mixed'
949 }, {
950 'name': 'pattern',
951 'type': 'string',
952 'default': '[\\w"]+'
953 }, {
954 'name': 'stopwords',
955 'type': 'string',
956 'default': ''
957 }, {
958 'name': 'as',
959 'type': 'string',
960 'array': true,
961 'length': 2,
962 'default': ['text', 'count']
963 }]
964};
965function tokenize(text, tcase, match) {
966 switch (tcase) {
967 case 'upper':
968 text = text.toUpperCase();
969 break;
970 case 'lower':
971 text = text.toLowerCase();
972 break;
973 }
974 return text.match(match);
975}
976inherits(CountPattern, Transform, {
977 transform(_, pulse) {
978 const process = update => tuple => {
979 var tokens = tokenize(get(tuple), _.case, match) || [],
980 t;
981 for (var i = 0, n = tokens.length; i < n; ++i) {
982 if (!stop.test(t = tokens[i])) update(t);
983 }
984 };
985 const init = this._parameterCheck(_, pulse),
986 counts = this._counts,
987 match = this._match,
988 stop = this._stop,
989 get = _.field,
990 as = _.as || ['text', 'count'],
991 add = process(t => counts[t] = 1 + (counts[t] || 0)),
992 rem = process(t => counts[t] -= 1);
993 if (init) {
994 pulse.visit(pulse.SOURCE, add);
995 } else {
996 pulse.visit(pulse.ADD, add);
997 pulse.visit(pulse.REM, rem);
998 }
999 return this._finish(pulse, as); // generate output tuples
1000 },
1001
1002 _parameterCheck(_, pulse) {
1003 let init = false;
1004 if (_.modified('stopwords') || !this._stop) {
1005 this._stop = new RegExp('^' + (_.stopwords || '') + '$', 'i');
1006 init = true;
1007 }
1008 if (_.modified('pattern') || !this._match) {
1009 this._match = new RegExp(_.pattern || '[\\w\']+', 'g');
1010 init = true;
1011 }
1012 if (_.modified('field') || pulse.modified(_.field.fields)) {
1013 init = true;
1014 }
1015 if (init) this._counts = {};
1016 return init;
1017 },
1018 _finish(pulse, as) {
1019 const counts = this._counts,
1020 tuples = this._tuples || (this._tuples = {}),
1021 text = as[0],
1022 count = as[1],
1023 out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS);
1024 let w, t, c;
1025 for (w in counts) {
1026 t = tuples[w];
1027 c = counts[w] || 0;
1028 if (!t && c) {
1029 tuples[w] = t = ingest({});
1030 t[text] = w;
1031 t[count] = c;
1032 out.add.push(t);
1033 } else if (c === 0) {
1034 if (t) out.rem.push(t);
1035 counts[w] = null;
1036 tuples[w] = null;
1037 } else if (t[count] !== c) {
1038 t[count] = c;
1039 out.mod.push(t);
1040 }
1041 }
1042 return out.modifies(as);
1043 }
1044});
1045
1046/**
1047 * Perform a cross-product of a tuple stream with itself.
1048 * @constructor
1049 * @param {object} params - The parameters for this operator.
1050 * @param {function(object):boolean} [params.filter] - An optional filter
1051 * function for selectively including tuples in the cross product.
1052 * @param {Array<string>} [params.as] - The names of the output fields.
1053 */
1054function Cross(params) {
1055 Transform.call(this, null, params);
1056}
1057Cross.Definition = {
1058 'type': 'Cross',
1059 'metadata': {
1060 'generates': true
1061 },
1062 'params': [{
1063 'name': 'filter',
1064 'type': 'expr'
1065 }, {
1066 'name': 'as',
1067 'type': 'string',
1068 'array': true,
1069 'length': 2,
1070 'default': ['a', 'b']
1071 }]
1072};
1073inherits(Cross, Transform, {
1074 transform(_, pulse) {
1075 const out = pulse.fork(pulse.NO_SOURCE),
1076 as = _.as || ['a', 'b'],
1077 a = as[0],
1078 b = as[1],
1079 reset = !this.value || pulse.changed(pulse.ADD_REM) || _.modified('as') || _.modified('filter');
1080 let data = this.value;
1081 if (reset) {
1082 if (data) out.rem = data;
1083 data = pulse.materialize(pulse.SOURCE).source;
1084 out.add = this.value = cross(data, a, b, _.filter || truthy);
1085 } else {
1086 out.mod = data;
1087 }
1088 out.source = this.value;
1089 return out.modifies(as);
1090 }
1091});
1092function cross(input, a, b, filter) {
1093 var data = [],
1094 t = {},
1095 n = input.length,
1096 i = 0,
1097 j,
1098 left;
1099 for (; i < n; ++i) {
1100 t[a] = left = input[i];
1101 for (j = 0; j < n; ++j) {
1102 t[b] = input[j];
1103 if (filter(t)) {
1104 data.push(ingest(t));
1105 t = {};
1106 t[a] = left;
1107 }
1108 }
1109 }
1110 return data;
1111}
1112
1113const Distributions = {
1114 kde: randomKDE,
1115 mixture: randomMixture,
1116 normal: randomNormal,
1117 lognormal: randomLogNormal,
1118 uniform: randomUniform
1119};
1120const DISTRIBUTIONS = 'distributions',
1121 FUNCTION = 'function',
1122 FIELD = 'field';
1123
1124/**
1125 * Parse a parameter object for a probability distribution.
1126 * @param {object} def - The distribution parameter object.
1127 * @param {function():Array<object>} - A method for requesting
1128 * source data. Used for distributions (such as KDE) that
1129 * require sample data points. This method will only be
1130 * invoked if the 'from' parameter for a target data source
1131 * is not provided. Typically this method returns backing
1132 * source data for a Pulse object.
1133 * @return {object} - The output distribution object.
1134 */
1135function parse(def, data) {
1136 const func = def[FUNCTION];
1137 if (!hasOwnProperty(Distributions, func)) {
1138 error('Unknown distribution function: ' + func);
1139 }
1140 const d = Distributions[func]();
1141 for (const name in def) {
1142 // if data field, extract values
1143 if (name === FIELD) {
1144 d.data((def.from || data()).map(def[name]));
1145 }
1146
1147 // if distribution mixture, recurse to parse each definition
1148 else if (name === DISTRIBUTIONS) {
1149 d[name](def[name].map(_ => parse(_, data)));
1150 }
1151
1152 // otherwise, simply set the parameter
1153 else if (typeof d[name] === FUNCTION) {
1154 d[name](def[name]);
1155 }
1156 }
1157 return d;
1158}
1159
1160/**
1161 * Grid sample points for a probability density. Given a distribution and
1162 * a sampling extent, will generate points suitable for plotting either
1163 * PDF (probability density function) or CDF (cumulative distribution
1164 * function) curves.
1165 * @constructor
1166 * @param {object} params - The parameters for this operator.
1167 * @param {object} params.distribution - The probability distribution. This
1168 * is an object parameter dependent on the distribution type.
1169 * @param {string} [params.method='pdf'] - The distribution method to sample.
1170 * One of 'pdf' or 'cdf'.
1171 * @param {Array<number>} [params.extent] - The [min, max] extent over which
1172 * to sample the distribution. This argument is required in most cases, but
1173 * can be omitted if the distribution (e.g., 'kde') supports a 'data' method
1174 * that returns numerical sample points from which the extent can be deduced.
1175 * @param {number} [params.minsteps=25] - The minimum number of curve samples
1176 * for plotting the density.
1177 * @param {number} [params.maxsteps=200] - The maximum number of curve samples
1178 * for plotting the density.
1179 * @param {number} [params.steps] - The exact number of curve samples for
1180 * plotting the density. If specified, overrides both minsteps and maxsteps
1181 * to set an exact number of uniform samples. Useful in conjunction with
1182 * a fixed extent to ensure consistent sample points for stacked densities.
1183 */
1184function Density(params) {
1185 Transform.call(this, null, params);
1186}
1187const distributions = [{
1188 'key': {
1189 'function': 'normal'
1190 },
1191 'params': [{
1192 'name': 'mean',
1193 'type': 'number',
1194 'default': 0
1195 }, {
1196 'name': 'stdev',
1197 'type': 'number',
1198 'default': 1
1199 }]
1200}, {
1201 'key': {
1202 'function': 'lognormal'
1203 },
1204 'params': [{
1205 'name': 'mean',
1206 'type': 'number',
1207 'default': 0
1208 }, {
1209 'name': 'stdev',
1210 'type': 'number',
1211 'default': 1
1212 }]
1213}, {
1214 'key': {
1215 'function': 'uniform'
1216 },
1217 'params': [{
1218 'name': 'min',
1219 'type': 'number',
1220 'default': 0
1221 }, {
1222 'name': 'max',
1223 'type': 'number',
1224 'default': 1
1225 }]
1226}, {
1227 'key': {
1228 'function': 'kde'
1229 },
1230 'params': [{
1231 'name': 'field',
1232 'type': 'field',
1233 'required': true
1234 }, {
1235 'name': 'from',
1236 'type': 'data'
1237 }, {
1238 'name': 'bandwidth',
1239 'type': 'number',
1240 'default': 0
1241 }]
1242}];
1243const mixture = {
1244 'key': {
1245 'function': 'mixture'
1246 },
1247 'params': [{
1248 'name': 'distributions',
1249 'type': 'param',
1250 'array': true,
1251 'params': distributions
1252 }, {
1253 'name': 'weights',
1254 'type': 'number',
1255 'array': true
1256 }]
1257};
1258Density.Definition = {
1259 'type': 'Density',
1260 'metadata': {
1261 'generates': true
1262 },
1263 'params': [{
1264 'name': 'extent',
1265 'type': 'number',
1266 'array': true,
1267 'length': 2
1268 }, {
1269 'name': 'steps',
1270 'type': 'number'
1271 }, {
1272 'name': 'minsteps',
1273 'type': 'number',
1274 'default': 25
1275 }, {
1276 'name': 'maxsteps',
1277 'type': 'number',
1278 'default': 200
1279 }, {
1280 'name': 'method',
1281 'type': 'string',
1282 'default': 'pdf',
1283 'values': ['pdf', 'cdf']
1284 }, {
1285 'name': 'distribution',
1286 'type': 'param',
1287 'params': distributions.concat(mixture)
1288 }, {
1289 'name': 'as',
1290 'type': 'string',
1291 'array': true,
1292 'default': ['value', 'density']
1293 }]
1294};
1295inherits(Density, Transform, {
1296 transform(_, pulse) {
1297 const out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS);
1298 if (!this.value || pulse.changed() || _.modified()) {
1299 const dist = parse(_.distribution, source(pulse)),
1300 minsteps = _.steps || _.minsteps || 25,
1301 maxsteps = _.steps || _.maxsteps || 200;
1302 let method = _.method || 'pdf';
1303 if (method !== 'pdf' && method !== 'cdf') {
1304 error('Invalid density method: ' + method);
1305 }
1306 if (!_.extent && !dist.data) {
1307 error('Missing density extent parameter.');
1308 }
1309 method = dist[method];
1310 const as = _.as || ['value', 'density'],
1311 domain = _.extent || extent(dist.data()),
1312 values = sampleCurve(method, domain, minsteps, maxsteps).map(v => {
1313 const tuple = {};
1314 tuple[as[0]] = v[0];
1315 tuple[as[1]] = v[1];
1316 return ingest(tuple);
1317 });
1318 if (this.value) out.rem = this.value;
1319 this.value = out.add = out.source = values;
1320 }
1321 return out;
1322 }
1323});
1324function source(pulse) {
1325 return () => pulse.materialize(pulse.SOURCE).source;
1326}
1327
1328// use either provided alias or accessor field name
1329function fieldNames(fields, as) {
1330 if (!fields) return null;
1331 return fields.map((f, i) => as[i] || accessorName(f));
1332}
1333function partition$1(data, groupby, field) {
1334 const groups = [],
1335 get = f => f(t);
1336 let map, i, n, t, k, g;
1337
1338 // partition data points into groups
1339 if (groupby == null) {
1340 groups.push(data.map(field));
1341 } else {
1342 for (map = {}, i = 0, n = data.length; i < n; ++i) {
1343 t = data[i];
1344 k = groupby.map(get);
1345 g = map[k];
1346 if (!g) {
1347 map[k] = g = [];
1348 g.dims = k;
1349 groups.push(g);
1350 }
1351 g.push(field(t));
1352 }
1353 }
1354 return groups;
1355}
1356
1357const Output = 'bin';
1358
1359/**
1360 * Dot density binning for dot plot construction.
1361 * Based on Leland Wilkinson, Dot Plots, The American Statistician, 1999.
1362 * https://www.cs.uic.edu/~wilkinson/Publications/dotplots.pdf
1363 * @constructor
1364 * @param {object} params - The parameters for this operator.
1365 * @param {function(object): *} params.field - The value field to bin.
1366 * @param {Array<function(object): *>} [params.groupby] - An array of accessors to groupby.
1367 * @param {number} [params.step] - The step size (bin width) within which dots should be
1368 * stacked. Defaults to 1/30 of the extent of the data *field*.
1369 * @param {boolean} [params.smooth=false] - A boolean flag indicating if dot density
1370 * stacks should be smoothed to reduce variance.
1371 */
1372function DotBin(params) {
1373 Transform.call(this, null, params);
1374}
1375DotBin.Definition = {
1376 'type': 'DotBin',
1377 'metadata': {
1378 'modifies': true
1379 },
1380 'params': [{
1381 'name': 'field',
1382 'type': 'field',
1383 'required': true
1384 }, {
1385 'name': 'groupby',
1386 'type': 'field',
1387 'array': true
1388 }, {
1389 'name': 'step',
1390 'type': 'number'
1391 }, {
1392 'name': 'smooth',
1393 'type': 'boolean',
1394 'default': false
1395 }, {
1396 'name': 'as',
1397 'type': 'string',
1398 'default': Output
1399 }]
1400};
1401const autostep = (data, field) => span(extent(data, field)) / 30;
1402inherits(DotBin, Transform, {
1403 transform(_, pulse) {
1404 if (this.value && !(_.modified() || pulse.changed())) {
1405 return pulse; // early exit
1406 }
1407
1408 const source = pulse.materialize(pulse.SOURCE).source,
1409 groups = partition$1(pulse.source, _.groupby, identity),
1410 smooth = _.smooth || false,
1411 field = _.field,
1412 step = _.step || autostep(source, field),
1413 sort = stableCompare((a, b) => field(a) - field(b)),
1414 as = _.as || Output,
1415 n = groups.length;
1416
1417 // compute dotplot bins per group
1418 let min = Infinity,
1419 max = -Infinity,
1420 i = 0,
1421 j;
1422 for (; i < n; ++i) {
1423 const g = groups[i].sort(sort);
1424 j = -1;
1425 for (const v of dotbin(g, step, smooth, field)) {
1426 if (v < min) min = v;
1427 if (v > max) max = v;
1428 g[++j][as] = v;
1429 }
1430 }
1431 this.value = {
1432 start: min,
1433 stop: max,
1434 step: step
1435 };
1436 return pulse.reflow(true).modifies(as);
1437 }
1438});
1439
1440/**
1441 * Wraps an expression function with access to external parameters.
1442 * @constructor
1443 * @param {object} params - The parameters for this operator.
1444 * @param {function} params.expr - The expression function. The
1445 * function should accept both a datum and a parameter object.
1446 * This operator's value will be a new function that wraps the
1447 * expression function with access to this operator's parameters.
1448 */
1449function Expression(params) {
1450 Operator.call(this, null, update$4, params);
1451 this.modified(true);
1452}
1453inherits(Expression, Operator);
1454function update$4(_) {
1455 const expr = _.expr;
1456 return this.value && !_.modified('expr') ? this.value : accessor(datum => expr(datum, _), accessorFields(expr), accessorName(expr));
1457}
1458
1459/**
1460 * Computes extents (min/max) for a data field.
1461 * @constructor
1462 * @param {object} params - The parameters for this operator.
1463 * @param {function(object): *} params.field - The field over which to compute extends.
1464 */
1465function Extent(params) {
1466 Transform.call(this, [undefined, undefined], params);
1467}
1468Extent.Definition = {
1469 'type': 'Extent',
1470 'metadata': {},
1471 'params': [{
1472 'name': 'field',
1473 'type': 'field',
1474 'required': true
1475 }]
1476};
1477inherits(Extent, Transform, {
1478 transform(_, pulse) {
1479 const extent = this.value,
1480 field = _.field,
1481 mod = pulse.changed() || pulse.modified(field.fields) || _.modified('field');
1482 let min = extent[0],
1483 max = extent[1];
1484 if (mod || min == null) {
1485 min = +Infinity;
1486 max = -Infinity;
1487 }
1488 pulse.visit(mod ? pulse.SOURCE : pulse.ADD, t => {
1489 const v = toNumber(field(t));
1490 if (v != null) {
1491 // NaNs will fail all comparisons!
1492 if (v < min) min = v;
1493 if (v > max) max = v;
1494 }
1495 });
1496 if (!Number.isFinite(min) || !Number.isFinite(max)) {
1497 let name = accessorName(field);
1498 if (name) name = ` for field "${name}"`;
1499 pulse.dataflow.warn(`Infinite extent${name}: [${min}, ${max}]`);
1500 min = max = undefined;
1501 }
1502 this.value = [min, max];
1503 }
1504});
1505
1506/**
1507 * Provides a bridge between a parent transform and a target subflow that
1508 * consumes only a subset of the tuples that pass through the parent.
1509 * @constructor
1510 * @param {Pulse} pulse - A pulse to use as the value of this operator.
1511 * @param {Transform} parent - The parent transform (typically a Facet instance).
1512 */
1513function Subflow(pulse, parent) {
1514 Operator.call(this, pulse);
1515 this.parent = parent;
1516 this.count = 0;
1517}
1518inherits(Subflow, Operator, {
1519 /**
1520 * Routes pulses from this subflow to a target transform.
1521 * @param {Transform} target - A transform that receives the subflow of tuples.
1522 */
1523 connect(target) {
1524 this.detachSubflow = target.detachSubflow;
1525 this.targets().add(target);
1526 return target.source = this;
1527 },
1528 /**
1529 * Add an 'add' tuple to the subflow pulse.
1530 * @param {Tuple} t - The tuple being added.
1531 */
1532 add(t) {
1533 this.count += 1;
1534 this.value.add.push(t);
1535 },
1536 /**
1537 * Add a 'rem' tuple to the subflow pulse.
1538 * @param {Tuple} t - The tuple being removed.
1539 */
1540 rem(t) {
1541 this.count -= 1;
1542 this.value.rem.push(t);
1543 },
1544 /**
1545 * Add a 'mod' tuple to the subflow pulse.
1546 * @param {Tuple} t - The tuple being modified.
1547 */
1548 mod(t) {
1549 this.value.mod.push(t);
1550 },
1551 /**
1552 * Re-initialize this operator's pulse value.
1553 * @param {Pulse} pulse - The pulse to copy from.
1554 * @see Pulse.init
1555 */
1556 init(pulse) {
1557 this.value.init(pulse, pulse.NO_SOURCE);
1558 },
1559 /**
1560 * Evaluate this operator. This method overrides the
1561 * default behavior to simply return the contained pulse value.
1562 * @return {Pulse}
1563 */
1564 evaluate() {
1565 // assert: this.value.stamp === pulse.stamp
1566 return this.value;
1567 }
1568});
1569
1570/**
1571 * Facets a dataflow into a set of subflows based on a key.
1572 * @constructor
1573 * @param {object} params - The parameters for this operator.
1574 * @param {function(Dataflow, string): Operator} params.subflow - A function
1575 * that generates a subflow of operators and returns its root operator.
1576 * @param {function(object): *} params.key - The key field to facet by.
1577 */
1578function Facet(params) {
1579 Transform.call(this, {}, params);
1580 this._keys = fastmap(); // cache previously calculated key values
1581
1582 // keep track of active subflows, use as targets array for listeners
1583 // this allows us to limit propagation to only updated subflows
1584 const a = this._targets = [];
1585 a.active = 0;
1586 a.forEach = f => {
1587 for (let i = 0, n = a.active; i < n; ++i) {
1588 f(a[i], i, a);
1589 }
1590 };
1591}
1592inherits(Facet, Transform, {
1593 activate(flow) {
1594 this._targets[this._targets.active++] = flow;
1595 },
1596 // parent argument provided by PreFacet subclass
1597 subflow(key, flow, pulse, parent) {
1598 const flows = this.value;
1599 let sf = hasOwnProperty(flows, key) && flows[key],
1600 df,
1601 p;
1602 if (!sf) {
1603 p = parent || (p = this._group[key]) && p.tuple;
1604 df = pulse.dataflow;
1605 sf = new Subflow(pulse.fork(pulse.NO_SOURCE), this);
1606 df.add(sf).connect(flow(df, key, p));
1607 flows[key] = sf;
1608 this.activate(sf);
1609 } else if (sf.value.stamp < pulse.stamp) {
1610 sf.init(pulse);
1611 this.activate(sf);
1612 }
1613 return sf;
1614 },
1615 clean() {
1616 const flows = this.value;
1617 let detached = 0;
1618 for (const key in flows) {
1619 if (flows[key].count === 0) {
1620 const detach = flows[key].detachSubflow;
1621 if (detach) detach();
1622 delete flows[key];
1623 ++detached;
1624 }
1625 }
1626
1627 // remove inactive targets from the active targets array
1628 if (detached) {
1629 const active = this._targets.filter(sf => sf && sf.count > 0);
1630 this.initTargets(active);
1631 }
1632 },
1633 initTargets(act) {
1634 const a = this._targets,
1635 n = a.length,
1636 m = act ? act.length : 0;
1637 let i = 0;
1638 for (; i < m; ++i) {
1639 a[i] = act[i];
1640 }
1641 for (; i < n && a[i] != null; ++i) {
1642 a[i] = null; // ensure old flows can be garbage collected
1643 }
1644
1645 a.active = m;
1646 },
1647 transform(_, pulse) {
1648 const df = pulse.dataflow,
1649 key = _.key,
1650 flow = _.subflow,
1651 cache = this._keys,
1652 rekey = _.modified('key'),
1653 subflow = key => this.subflow(key, flow, pulse);
1654 this._group = _.group || {};
1655 this.initTargets(); // reset list of active subflows
1656
1657 pulse.visit(pulse.REM, t => {
1658 const id = tupleid(t),
1659 k = cache.get(id);
1660 if (k !== undefined) {
1661 cache.delete(id);
1662 subflow(k).rem(t);
1663 }
1664 });
1665 pulse.visit(pulse.ADD, t => {
1666 const k = key(t);
1667 cache.set(tupleid(t), k);
1668 subflow(k).add(t);
1669 });
1670 if (rekey || pulse.modified(key.fields)) {
1671 pulse.visit(pulse.MOD, t => {
1672 const id = tupleid(t),
1673 k0 = cache.get(id),
1674 k1 = key(t);
1675 if (k0 === k1) {
1676 subflow(k1).mod(t);
1677 } else {
1678 cache.set(id, k1);
1679 subflow(k0).rem(t);
1680 subflow(k1).add(t);
1681 }
1682 });
1683 } else if (pulse.changed(pulse.MOD)) {
1684 pulse.visit(pulse.MOD, t => {
1685 subflow(cache.get(tupleid(t))).mod(t);
1686 });
1687 }
1688 if (rekey) {
1689 pulse.visit(pulse.REFLOW, t => {
1690 const id = tupleid(t),
1691 k0 = cache.get(id),
1692 k1 = key(t);
1693 if (k0 !== k1) {
1694 cache.set(id, k1);
1695 subflow(k0).rem(t);
1696 subflow(k1).add(t);
1697 }
1698 });
1699 }
1700 if (pulse.clean()) {
1701 df.runAfter(() => {
1702 this.clean();
1703 cache.clean();
1704 });
1705 } else if (cache.empty > df.cleanThreshold) {
1706 df.runAfter(cache.clean);
1707 }
1708 return pulse;
1709 }
1710});
1711
1712/**
1713 * Generates one or more field accessor functions.
1714 * If the 'name' parameter is an array, an array of field accessors
1715 * will be created and the 'as' parameter will be ignored.
1716 * @constructor
1717 * @param {object} params - The parameters for this operator.
1718 * @param {string} params.name - The field name(s) to access.
1719 * @param {string} params.as - The accessor function name.
1720 */
1721function Field(params) {
1722 Operator.call(this, null, update$3, params);
1723}
1724inherits(Field, Operator);
1725function update$3(_) {
1726 return this.value && !_.modified() ? this.value : isArray(_.name) ? array(_.name).map(f => field(f)) : field(_.name, _.as);
1727}
1728
1729/**
1730 * Filters data tuples according to a predicate function.
1731 * @constructor
1732 * @param {object} params - The parameters for this operator.
1733 * @param {function(object): *} params.expr - The predicate expression function
1734 * that determines a tuple's filter status. Truthy values pass the filter.
1735 */
1736function Filter(params) {
1737 Transform.call(this, fastmap(), params);
1738}
1739Filter.Definition = {
1740 'type': 'Filter',
1741 'metadata': {
1742 'changes': true
1743 },
1744 'params': [{
1745 'name': 'expr',
1746 'type': 'expr',
1747 'required': true
1748 }]
1749};
1750inherits(Filter, Transform, {
1751 transform(_, pulse) {
1752 const df = pulse.dataflow,
1753 cache = this.value,
1754 // cache ids of filtered tuples
1755 output = pulse.fork(),
1756 add = output.add,
1757 rem = output.rem,
1758 mod = output.mod,
1759 test = _.expr;
1760 let isMod = true;
1761 pulse.visit(pulse.REM, t => {
1762 const id = tupleid(t);
1763 if (!cache.has(id)) rem.push(t);else cache.delete(id);
1764 });
1765 pulse.visit(pulse.ADD, t => {
1766 if (test(t, _)) add.push(t);else cache.set(tupleid(t), 1);
1767 });
1768 function revisit(t) {
1769 const id = tupleid(t),
1770 b = test(t, _),
1771 s = cache.get(id);
1772 if (b && s) {
1773 cache.delete(id);
1774 add.push(t);
1775 } else if (!b && !s) {
1776 cache.set(id, 1);
1777 rem.push(t);
1778 } else if (isMod && b && !s) {
1779 mod.push(t);
1780 }
1781 }
1782 pulse.visit(pulse.MOD, revisit);
1783 if (_.modified()) {
1784 isMod = false;
1785 pulse.visit(pulse.REFLOW, revisit);
1786 }
1787 if (cache.empty > df.cleanThreshold) df.runAfter(cache.clean);
1788 return output;
1789 }
1790});
1791
1792/**
1793 * Flattens array-typed field values into new data objects.
1794 * If multiple fields are specified, they are treated as parallel arrays,
1795 * with output values included for each matching index (or null if missing).
1796 * @constructor
1797 * @param {object} params - The parameters for this operator.
1798 * @param {Array<function(object): *>} params.fields - An array of field
1799 * accessors for the tuple fields that should be flattened.
1800 * @param {string} [params.index] - Optional output field name for index
1801 * value. If unspecified, no index field is included in the output.
1802 * @param {Array<string>} [params.as] - Output field names for flattened
1803 * array fields. Any unspecified fields will use the field name provided
1804 * by the fields accessors.
1805 */
1806function Flatten(params) {
1807 Transform.call(this, [], params);
1808}
1809Flatten.Definition = {
1810 'type': 'Flatten',
1811 'metadata': {
1812 'generates': true
1813 },
1814 'params': [{
1815 'name': 'fields',
1816 'type': 'field',
1817 'array': true,
1818 'required': true
1819 }, {
1820 'name': 'index',
1821 'type': 'string'
1822 }, {
1823 'name': 'as',
1824 'type': 'string',
1825 'array': true
1826 }]
1827};
1828inherits(Flatten, Transform, {
1829 transform(_, pulse) {
1830 const out = pulse.fork(pulse.NO_SOURCE),
1831 fields = _.fields,
1832 as = fieldNames(fields, _.as || []),
1833 index = _.index || null,
1834 m = as.length;
1835
1836 // remove any previous results
1837 out.rem = this.value;
1838
1839 // generate flattened tuples
1840 pulse.visit(pulse.SOURCE, t => {
1841 const arrays = fields.map(f => f(t)),
1842 maxlen = arrays.reduce((l, a) => Math.max(l, a.length), 0);
1843 let i = 0,
1844 j,
1845 d,
1846 v;
1847 for (; i < maxlen; ++i) {
1848 d = derive(t);
1849 for (j = 0; j < m; ++j) {
1850 d[as[j]] = (v = arrays[j][i]) == null ? null : v;
1851 }
1852 if (index) {
1853 d[index] = i;
1854 }
1855 out.add.push(d);
1856 }
1857 });
1858 this.value = out.source = out.add;
1859 if (index) out.modifies(index);
1860 return out.modifies(as);
1861 }
1862});
1863
1864/**
1865 * Folds one more tuple fields into multiple tuples in which the field
1866 * name and values are available under new 'key' and 'value' fields.
1867 * @constructor
1868 * @param {object} params - The parameters for this operator.
1869 * @param {function(object): *} params.fields - An array of field accessors
1870 * for the tuple fields that should be folded.
1871 * @param {Array<string>} [params.as] - Output field names for folded key
1872 * and value fields, defaults to ['key', 'value'].
1873 */
1874function Fold(params) {
1875 Transform.call(this, [], params);
1876}
1877Fold.Definition = {
1878 'type': 'Fold',
1879 'metadata': {
1880 'generates': true
1881 },
1882 'params': [{
1883 'name': 'fields',
1884 'type': 'field',
1885 'array': true,
1886 'required': true
1887 }, {
1888 'name': 'as',
1889 'type': 'string',
1890 'array': true,
1891 'length': 2,
1892 'default': ['key', 'value']
1893 }]
1894};
1895inherits(Fold, Transform, {
1896 transform(_, pulse) {
1897 const out = pulse.fork(pulse.NO_SOURCE),
1898 fields = _.fields,
1899 fnames = fields.map(accessorName),
1900 as = _.as || ['key', 'value'],
1901 k = as[0],
1902 v = as[1],
1903 n = fields.length;
1904 out.rem = this.value;
1905 pulse.visit(pulse.SOURCE, t => {
1906 for (let i = 0, d; i < n; ++i) {
1907 d = derive(t);
1908 d[k] = fnames[i];
1909 d[v] = fields[i](t);
1910 out.add.push(d);
1911 }
1912 });
1913 this.value = out.source = out.add;
1914 return out.modifies(as);
1915 }
1916});
1917
1918/**
1919 * Invokes a function for each data tuple and saves the results as a new field.
1920 * @constructor
1921 * @param {object} params - The parameters for this operator.
1922 * @param {function(object): *} params.expr - The formula function to invoke for each tuple.
1923 * @param {string} params.as - The field name under which to save the result.
1924 * @param {boolean} [params.initonly=false] - If true, the formula is applied to
1925 * added tuples only, and does not update in response to modifications.
1926 */
1927function Formula(params) {
1928 Transform.call(this, null, params);
1929}
1930Formula.Definition = {
1931 'type': 'Formula',
1932 'metadata': {
1933 'modifies': true
1934 },
1935 'params': [{
1936 'name': 'expr',
1937 'type': 'expr',
1938 'required': true
1939 }, {
1940 'name': 'as',
1941 'type': 'string',
1942 'required': true
1943 }, {
1944 'name': 'initonly',
1945 'type': 'boolean'
1946 }]
1947};
1948inherits(Formula, Transform, {
1949 transform(_, pulse) {
1950 const func = _.expr,
1951 as = _.as,
1952 mod = _.modified(),
1953 flag = _.initonly ? pulse.ADD : mod ? pulse.SOURCE : pulse.modified(func.fields) || pulse.modified(as) ? pulse.ADD_MOD : pulse.ADD;
1954 if (mod) {
1955 // parameters updated, need to reflow
1956 pulse = pulse.materialize().reflow(true);
1957 }
1958 if (!_.initonly) {
1959 pulse.modifies(as);
1960 }
1961 return pulse.visit(flag, t => t[as] = func(t, _));
1962 }
1963});
1964
1965/**
1966 * Generates data tuples using a provided generator function.
1967 * @constructor
1968 * @param {object} params - The parameters for this operator.
1969 * @param {function(Parameters): object} params.generator - A tuple generator
1970 * function. This function is given the operator parameters as input.
1971 * Changes to any additional parameters will not trigger re-calculation
1972 * of previously generated tuples. Only future tuples are affected.
1973 * @param {number} params.size - The number of tuples to produce.
1974 */
1975function Generate(params) {
1976 Transform.call(this, [], params);
1977}
1978inherits(Generate, Transform, {
1979 transform(_, pulse) {
1980 const out = pulse.fork(pulse.ALL),
1981 gen = _.generator;
1982 let data = this.value,
1983 num = _.size - data.length,
1984 add,
1985 rem,
1986 t;
1987 if (num > 0) {
1988 // need more tuples, generate and add
1989 for (add = []; --num >= 0;) {
1990 add.push(t = ingest(gen(_)));
1991 data.push(t);
1992 }
1993 out.add = out.add.length ? out.materialize(out.ADD).add.concat(add) : add;
1994 } else {
1995 // need fewer tuples, remove
1996 rem = data.slice(0, -num);
1997 out.rem = out.rem.length ? out.materialize(out.REM).rem.concat(rem) : rem;
1998 data = data.slice(-num);
1999 }
2000 out.source = this.value = data;
2001 return out;
2002 }
2003});
2004
2005const Methods = {
2006 value: 'value',
2007 median: median,
2008 mean: mean,
2009 min: min,
2010 max: max
2011};
2012const Empty = [];
2013
2014/**
2015 * Impute missing values.
2016 * @constructor
2017 * @param {object} params - The parameters for this operator.
2018 * @param {function(object): *} params.field - The value field to impute.
2019 * @param {Array<function(object): *>} [params.groupby] - An array of
2020 * accessors to determine series within which to perform imputation.
2021 * @param {function(object): *} params.key - An accessor for a key value.
2022 * Each key value should be unique within a group. New tuples will be
2023 * imputed for any key values that are not found within a group.
2024 * @param {Array<*>} [params.keyvals] - Optional array of required key
2025 * values. New tuples will be imputed for any key values that are not
2026 * found within a group. In addition, these values will be automatically
2027 * augmented with the key values observed in the input data.
2028 * @param {string} [method='value'] - The imputation method to use. One of
2029 * 'value', 'mean', 'median', 'max', 'min'.
2030 * @param {*} [value=0] - The constant value to use for imputation
2031 * when using method 'value'.
2032 */
2033function Impute(params) {
2034 Transform.call(this, [], params);
2035}
2036Impute.Definition = {
2037 'type': 'Impute',
2038 'metadata': {
2039 'changes': true
2040 },
2041 'params': [{
2042 'name': 'field',
2043 'type': 'field',
2044 'required': true
2045 }, {
2046 'name': 'key',
2047 'type': 'field',
2048 'required': true
2049 }, {
2050 'name': 'keyvals',
2051 'array': true
2052 }, {
2053 'name': 'groupby',
2054 'type': 'field',
2055 'array': true
2056 }, {
2057 'name': 'method',
2058 'type': 'enum',
2059 'default': 'value',
2060 'values': ['value', 'mean', 'median', 'max', 'min']
2061 }, {
2062 'name': 'value',
2063 'default': 0
2064 }]
2065};
2066function getValue(_) {
2067 var m = _.method || Methods.value,
2068 v;
2069 if (Methods[m] == null) {
2070 error('Unrecognized imputation method: ' + m);
2071 } else if (m === Methods.value) {
2072 v = _.value !== undefined ? _.value : 0;
2073 return () => v;
2074 } else {
2075 return Methods[m];
2076 }
2077}
2078function getField(_) {
2079 const f = _.field;
2080 return t => t ? f(t) : NaN;
2081}
2082inherits(Impute, Transform, {
2083 transform(_, pulse) {
2084 var out = pulse.fork(pulse.ALL),
2085 impute = getValue(_),
2086 field = getField(_),
2087 fName = accessorName(_.field),
2088 kName = accessorName(_.key),
2089 gNames = (_.groupby || []).map(accessorName),
2090 groups = partition(pulse.source, _.groupby, _.key, _.keyvals),
2091 curr = [],
2092 prev = this.value,
2093 m = groups.domain.length,
2094 group,
2095 value,
2096 gVals,
2097 kVal,
2098 g,
2099 i,
2100 j,
2101 l,
2102 n,
2103 t;
2104 for (g = 0, l = groups.length; g < l; ++g) {
2105 group = groups[g];
2106 gVals = group.values;
2107 value = NaN;
2108
2109 // add tuples for missing values
2110 for (j = 0; j < m; ++j) {
2111 if (group[j] != null) continue;
2112 kVal = groups.domain[j];
2113 t = {
2114 _impute: true
2115 };
2116 for (i = 0, n = gVals.length; i < n; ++i) t[gNames[i]] = gVals[i];
2117 t[kName] = kVal;
2118 t[fName] = Number.isNaN(value) ? value = impute(group, field) : value;
2119 curr.push(ingest(t));
2120 }
2121 }
2122
2123 // update pulse with imputed tuples
2124 if (curr.length) out.add = out.materialize(out.ADD).add.concat(curr);
2125 if (prev.length) out.rem = out.materialize(out.REM).rem.concat(prev);
2126 this.value = curr;
2127 return out;
2128 }
2129});
2130function partition(data, groupby, key, keyvals) {
2131 var get = f => f(t),
2132 groups = [],
2133 domain = keyvals ? keyvals.slice() : [],
2134 kMap = {},
2135 gMap = {},
2136 gVals,
2137 gKey,
2138 group,
2139 i,
2140 j,
2141 k,
2142 n,
2143 t;
2144 domain.forEach((k, i) => kMap[k] = i + 1);
2145 for (i = 0, n = data.length; i < n; ++i) {
2146 t = data[i];
2147 k = key(t);
2148 j = kMap[k] || (kMap[k] = domain.push(k));
2149 gKey = (gVals = groupby ? groupby.map(get) : Empty) + '';
2150 if (!(group = gMap[gKey])) {
2151 group = gMap[gKey] = [];
2152 groups.push(group);
2153 group.values = gVals;
2154 }
2155 group[j - 1] = t;
2156 }
2157 groups.domain = domain;
2158 return groups;
2159}
2160
2161/**
2162 * Extend input tuples with aggregate values.
2163 * Calcuates aggregate values and joins them with the input stream.
2164 * @constructor
2165 */
2166function JoinAggregate(params) {
2167 Aggregate.call(this, params);
2168}
2169JoinAggregate.Definition = {
2170 'type': 'JoinAggregate',
2171 'metadata': {
2172 'modifies': true
2173 },
2174 'params': [{
2175 'name': 'groupby',
2176 'type': 'field',
2177 'array': true
2178 }, {
2179 'name': 'fields',
2180 'type': 'field',
2181 'null': true,
2182 'array': true
2183 }, {
2184 'name': 'ops',
2185 'type': 'enum',
2186 'array': true,
2187 'values': ValidAggregateOps
2188 }, {
2189 'name': 'as',
2190 'type': 'string',
2191 'null': true,
2192 'array': true
2193 }, {
2194 'name': 'key',
2195 'type': 'field'
2196 }]
2197};
2198inherits(JoinAggregate, Aggregate, {
2199 transform(_, pulse) {
2200 const aggr = this,
2201 mod = _.modified();
2202 let cells;
2203
2204 // process all input tuples to calculate aggregates
2205 if (aggr.value && (mod || pulse.modified(aggr._inputs, true))) {
2206 cells = aggr.value = mod ? aggr.init(_) : {};
2207 pulse.visit(pulse.SOURCE, t => aggr.add(t));
2208 } else {
2209 cells = aggr.value = aggr.value || this.init(_);
2210 pulse.visit(pulse.REM, t => aggr.rem(t));
2211 pulse.visit(pulse.ADD, t => aggr.add(t));
2212 }
2213
2214 // update aggregation cells
2215 aggr.changes();
2216
2217 // write aggregate values to input tuples
2218 pulse.visit(pulse.SOURCE, t => {
2219 extend(t, cells[aggr.cellkey(t)].tuple);
2220 });
2221 return pulse.reflow(mod).modifies(this._outputs);
2222 },
2223 changes() {
2224 const adds = this._adds,
2225 mods = this._mods;
2226 let i, n;
2227 for (i = 0, n = this._alen; i < n; ++i) {
2228 this.celltuple(adds[i]);
2229 adds[i] = null; // for garbage collection
2230 }
2231
2232 for (i = 0, n = this._mlen; i < n; ++i) {
2233 this.celltuple(mods[i]);
2234 mods[i] = null; // for garbage collection
2235 }
2236
2237 this._alen = this._mlen = 0; // reset list of active cells
2238 }
2239});
2240
2241/**
2242 * Compute kernel density estimates (KDE) for one or more data groups.
2243 * @constructor
2244 * @param {object} params - The parameters for this operator.
2245 * @param {Array<function(object): *>} [params.groupby] - An array of accessors
2246 * to groupby.
2247 * @param {function(object): *} params.field - An accessor for the data field
2248 * to estimate.
2249 * @param {number} [params.bandwidth=0] - The KDE kernel bandwidth.
2250 * If zero or unspecified, the bandwidth is automatically determined.
2251 * @param {boolean} [params.counts=false] - A boolean flag indicating if the
2252 * output values should be probability estimates (false, default) or
2253 * smoothed counts (true).
2254 * @param {string} [params.cumulative=false] - A boolean flag indicating if a
2255 * density (false) or cumulative distribution (true) should be generated.
2256 * @param {Array<number>} [params.extent] - The domain extent over which to
2257 * plot the density. If unspecified, the [min, max] data extent is used.
2258 * @param {string} [params.resolve='independent'] - Indicates how parameters for
2259 * multiple densities should be resolved. If "independent" (the default), each
2260 * density may have its own domain extent and dynamic number of curve sample
2261 * steps. If "shared", the KDE transform will ensure that all densities are
2262 * defined over a shared domain and curve steps, enabling stacking.
2263 * @param {number} [params.minsteps=25] - The minimum number of curve samples
2264 * for plotting the density.
2265 * @param {number} [params.maxsteps=200] - The maximum number of curve samples
2266 * for plotting the density.
2267 * @param {number} [params.steps] - The exact number of curve samples for
2268 * plotting the density. If specified, overrides both minsteps and maxsteps
2269 * to set an exact number of uniform samples. Useful in conjunction with
2270 * a fixed extent to ensure consistent sample points for stacked densities.
2271 */
2272function KDE(params) {
2273 Transform.call(this, null, params);
2274}
2275KDE.Definition = {
2276 'type': 'KDE',
2277 'metadata': {
2278 'generates': true
2279 },
2280 'params': [{
2281 'name': 'groupby',
2282 'type': 'field',
2283 'array': true
2284 }, {
2285 'name': 'field',
2286 'type': 'field',
2287 'required': true
2288 }, {
2289 'name': 'cumulative',
2290 'type': 'boolean',
2291 'default': false
2292 }, {
2293 'name': 'counts',
2294 'type': 'boolean',
2295 'default': false
2296 }, {
2297 'name': 'bandwidth',
2298 'type': 'number',
2299 'default': 0
2300 }, {
2301 'name': 'extent',
2302 'type': 'number',
2303 'array': true,
2304 'length': 2
2305 }, {
2306 'name': 'resolve',
2307 'type': 'enum',
2308 'values': ['shared', 'independent'],
2309 'default': 'independent'
2310 }, {
2311 'name': 'steps',
2312 'type': 'number'
2313 }, {
2314 'name': 'minsteps',
2315 'type': 'number',
2316 'default': 25
2317 }, {
2318 'name': 'maxsteps',
2319 'type': 'number',
2320 'default': 200
2321 }, {
2322 'name': 'as',
2323 'type': 'string',
2324 'array': true,
2325 'default': ['value', 'density']
2326 }]
2327};
2328inherits(KDE, Transform, {
2329 transform(_, pulse) {
2330 const out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS);
2331 if (!this.value || pulse.changed() || _.modified()) {
2332 const source = pulse.materialize(pulse.SOURCE).source,
2333 groups = partition$1(source, _.groupby, _.field),
2334 names = (_.groupby || []).map(accessorName),
2335 bandwidth = _.bandwidth,
2336 method = _.cumulative ? 'cdf' : 'pdf',
2337 as = _.as || ['value', 'density'],
2338 values = [];
2339 let domain = _.extent,
2340 minsteps = _.steps || _.minsteps || 25,
2341 maxsteps = _.steps || _.maxsteps || 200;
2342 if (method !== 'pdf' && method !== 'cdf') {
2343 error('Invalid density method: ' + method);
2344 }
2345 if (_.resolve === 'shared') {
2346 if (!domain) domain = extent(source, _.field);
2347 minsteps = maxsteps = _.steps || maxsteps;
2348 }
2349 groups.forEach(g => {
2350 const density = randomKDE(g, bandwidth)[method],
2351 scale = _.counts ? g.length : 1,
2352 local = domain || extent(g);
2353 sampleCurve(density, local, minsteps, maxsteps).forEach(v => {
2354 const t = {};
2355 for (let i = 0; i < names.length; ++i) {
2356 t[names[i]] = g.dims[i];
2357 }
2358 t[as[0]] = v[0];
2359 t[as[1]] = v[1] * scale;
2360 values.push(ingest(t));
2361 });
2362 });
2363 if (this.value) out.rem = this.value;
2364 this.value = out.add = out.source = values;
2365 }
2366 return out;
2367 }
2368});
2369
2370/**
2371 * Generates a key function.
2372 * @constructor
2373 * @param {object} params - The parameters for this operator.
2374 * @param {Array<string>} params.fields - The field name(s) for the key function.
2375 * @param {boolean} params.flat - A boolean flag indicating if the field names
2376 * should be treated as flat property names, side-stepping nested field
2377 * lookups normally indicated by dot or bracket notation.
2378 */
2379function Key(params) {
2380 Operator.call(this, null, update$2, params);
2381}
2382inherits(Key, Operator);
2383function update$2(_) {
2384 return this.value && !_.modified() ? this.value : key(_.fields, _.flat);
2385}
2386
2387/**
2388 * Load and parse data from an external source. Marshalls parameter
2389 * values and then invokes the Dataflow request method.
2390 * @constructor
2391 * @param {object} params - The parameters for this operator.
2392 * @param {string} params.url - The URL to load from.
2393 * @param {object} params.format - The data format options.
2394 */
2395function Load(params) {
2396 Transform.call(this, [], params);
2397 this._pending = null;
2398}
2399inherits(Load, Transform, {
2400 transform(_, pulse) {
2401 const df = pulse.dataflow;
2402 if (this._pending) {
2403 // update state and return pulse
2404 return output(this, pulse, this._pending);
2405 }
2406 if (stop(_)) return pulse.StopPropagation;
2407 if (_.values) {
2408 // parse and ingest values, return output pulse
2409 return output(this, pulse, df.parse(_.values, _.format));
2410 } else if (_.async) {
2411 // return promise for non-blocking async loading
2412 const p = df.request(_.url, _.format).then(res => {
2413 this._pending = array(res.data);
2414 return df => df.touch(this);
2415 });
2416 return {
2417 async: p
2418 };
2419 } else {
2420 // return promise for synchronous loading
2421 return df.request(_.url, _.format).then(res => output(this, pulse, array(res.data)));
2422 }
2423 }
2424});
2425function stop(_) {
2426 return _.modified('async') && !(_.modified('values') || _.modified('url') || _.modified('format'));
2427}
2428function output(op, pulse, data) {
2429 data.forEach(ingest);
2430 const out = pulse.fork(pulse.NO_FIELDS & pulse.NO_SOURCE);
2431 out.rem = op.value;
2432 op.value = out.source = out.add = data;
2433 op._pending = null;
2434 if (out.rem.length) out.clean(true);
2435 return out;
2436}
2437
2438/**
2439 * Extend tuples by joining them with values from a lookup table.
2440 * @constructor
2441 * @param {object} params - The parameters for this operator.
2442 * @param {Map} params.index - The lookup table map.
2443 * @param {Array<function(object): *} params.fields - The fields to lookup.
2444 * @param {Array<string>} params.as - Output field names for each lookup value.
2445 * @param {*} [params.default] - A default value to use if lookup fails.
2446 */
2447function Lookup(params) {
2448 Transform.call(this, {}, params);
2449}
2450Lookup.Definition = {
2451 'type': 'Lookup',
2452 'metadata': {
2453 'modifies': true
2454 },
2455 'params': [{
2456 'name': 'index',
2457 'type': 'index',
2458 'params': [{
2459 'name': 'from',
2460 'type': 'data',
2461 'required': true
2462 }, {
2463 'name': 'key',
2464 'type': 'field',
2465 'required': true
2466 }]
2467 }, {
2468 'name': 'values',
2469 'type': 'field',
2470 'array': true
2471 }, {
2472 'name': 'fields',
2473 'type': 'field',
2474 'array': true,
2475 'required': true
2476 }, {
2477 'name': 'as',
2478 'type': 'string',
2479 'array': true
2480 }, {
2481 'name': 'default',
2482 'default': null
2483 }]
2484};
2485inherits(Lookup, Transform, {
2486 transform(_, pulse) {
2487 const keys = _.fields,
2488 index = _.index,
2489 values = _.values,
2490 defaultValue = _.default == null ? null : _.default,
2491 reset = _.modified(),
2492 n = keys.length;
2493 let flag = reset ? pulse.SOURCE : pulse.ADD,
2494 out = pulse,
2495 as = _.as,
2496 set,
2497 m,
2498 mods;
2499 if (values) {
2500 m = values.length;
2501 if (n > 1 && !as) {
2502 error('Multi-field lookup requires explicit "as" parameter.');
2503 }
2504 if (as && as.length !== n * m) {
2505 error('The "as" parameter has too few output field names.');
2506 }
2507 as = as || values.map(accessorName);
2508 set = function (t) {
2509 for (var i = 0, k = 0, j, v; i < n; ++i) {
2510 v = index.get(keys[i](t));
2511 if (v == null) for (j = 0; j < m; ++j, ++k) t[as[k]] = defaultValue;else for (j = 0; j < m; ++j, ++k) t[as[k]] = values[j](v);
2512 }
2513 };
2514 } else {
2515 if (!as) {
2516 error('Missing output field names.');
2517 }
2518 set = function (t) {
2519 for (var i = 0, v; i < n; ++i) {
2520 v = index.get(keys[i](t));
2521 t[as[i]] = v == null ? defaultValue : v;
2522 }
2523 };
2524 }
2525 if (reset) {
2526 out = pulse.reflow(true);
2527 } else {
2528 mods = keys.some(k => pulse.modified(k.fields));
2529 flag |= mods ? pulse.MOD : 0;
2530 }
2531 pulse.visit(flag, set);
2532 return out.modifies(as);
2533 }
2534});
2535
2536/**
2537 * Computes global min/max extents over a collection of extents.
2538 * @constructor
2539 * @param {object} params - The parameters for this operator.
2540 * @param {Array<Array<number>>} params.extents - The input extents.
2541 */
2542function MultiExtent(params) {
2543 Operator.call(this, null, update$1, params);
2544}
2545inherits(MultiExtent, Operator);
2546function update$1(_) {
2547 if (this.value && !_.modified()) {
2548 return this.value;
2549 }
2550 const ext = _.extents,
2551 n = ext.length;
2552 let min = +Infinity,
2553 max = -Infinity,
2554 i,
2555 e;
2556 for (i = 0; i < n; ++i) {
2557 e = ext[i];
2558 if (e[0] < min) min = e[0];
2559 if (e[1] > max) max = e[1];
2560 }
2561 return [min, max];
2562}
2563
2564/**
2565 * Merge a collection of value arrays.
2566 * @constructor
2567 * @param {object} params - The parameters for this operator.
2568 * @param {Array<Array<*>>} params.values - The input value arrrays.
2569 */
2570function MultiValues(params) {
2571 Operator.call(this, null, update, params);
2572}
2573inherits(MultiValues, Operator);
2574function update(_) {
2575 return this.value && !_.modified() ? this.value : _.values.reduce((data, _) => data.concat(_), []);
2576}
2577
2578/**
2579 * Operator whose value is simply its parameter hash. This operator is
2580 * useful for enabling reactive updates to values of nested objects.
2581 * @constructor
2582 * @param {object} params - The parameters for this operator.
2583 */
2584function Params(params) {
2585 Transform.call(this, null, params);
2586}
2587inherits(Params, Transform, {
2588 transform(_, pulse) {
2589 this.modified(_.modified());
2590 this.value = _;
2591 return pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS); // do not pass tuples
2592 }
2593});
2594
2595/**
2596 * Aggregate and pivot selected field values to become new fields.
2597 * This operator is useful to construction cross-tabulations.
2598 * @constructor
2599 * @param {Array<function(object): *>} [params.groupby] - An array of accessors
2600 * to groupby. These fields act just like groupby fields of an Aggregate transform.
2601 * @param {function(object): *} params.field - The field to pivot on. The unique
2602 * values of this field become new field names in the output stream.
2603 * @param {function(object): *} params.value - The field to populate pivoted fields.
2604 * The aggregate values of this field become the values of the new pivoted fields.
2605 * @param {string} [params.op] - The aggregation operation for the value field,
2606 * applied per cell in the output stream. The default is "sum".
2607 * @param {number} [params.limit] - An optional parameter indicating the maximum
2608 * number of pivoted fields to generate. The pivoted field names are sorted in
2609 * ascending order prior to enforcing the limit.
2610 */
2611function Pivot(params) {
2612 Aggregate.call(this, params);
2613}
2614Pivot.Definition = {
2615 'type': 'Pivot',
2616 'metadata': {
2617 'generates': true,
2618 'changes': true
2619 },
2620 'params': [{
2621 'name': 'groupby',
2622 'type': 'field',
2623 'array': true
2624 }, {
2625 'name': 'field',
2626 'type': 'field',
2627 'required': true
2628 }, {
2629 'name': 'value',
2630 'type': 'field',
2631 'required': true
2632 }, {
2633 'name': 'op',
2634 'type': 'enum',
2635 'values': ValidAggregateOps,
2636 'default': 'sum'
2637 }, {
2638 'name': 'limit',
2639 'type': 'number',
2640 'default': 0
2641 }, {
2642 'name': 'key',
2643 'type': 'field'
2644 }]
2645};
2646inherits(Pivot, Aggregate, {
2647 _transform: Aggregate.prototype.transform,
2648 transform(_, pulse) {
2649 return this._transform(aggregateParams(_, pulse), pulse);
2650 }
2651});
2652
2653// Shoehorn a pivot transform into an aggregate transform!
2654// First collect all unique pivot field values.
2655// Then generate aggregate fields for each output pivot field.
2656function aggregateParams(_, pulse) {
2657 const key = _.field,
2658 value = _.value,
2659 op = (_.op === 'count' ? '__count__' : _.op) || 'sum',
2660 fields = accessorFields(key).concat(accessorFields(value)),
2661 keys = pivotKeys(key, _.limit || 0, pulse);
2662
2663 // if data stream content changes, pivot fields may change
2664 // flag parameter modification to ensure re-initialization
2665 if (pulse.changed()) _.set('__pivot__', null, null, true);
2666 return {
2667 key: _.key,
2668 groupby: _.groupby,
2669 ops: keys.map(() => op),
2670 fields: keys.map(k => get(k, key, value, fields)),
2671 as: keys.map(k => k + ''),
2672 modified: _.modified.bind(_)
2673 };
2674}
2675
2676// Generate aggregate field accessor.
2677// Output NaN for non-existent values; aggregator will ignore!
2678function get(k, key, value, fields) {
2679 return accessor(d => key(d) === k ? value(d) : NaN, fields, k + '');
2680}
2681
2682// Collect (and optionally limit) all unique pivot values.
2683function pivotKeys(key, limit, pulse) {
2684 const map = {},
2685 list = [];
2686 pulse.visit(pulse.SOURCE, t => {
2687 const k = key(t);
2688 if (!map[k]) {
2689 map[k] = 1;
2690 list.push(k);
2691 }
2692 });
2693 list.sort(ascending);
2694 return limit ? list.slice(0, limit) : list;
2695}
2696
2697/**
2698 * Partitions pre-faceted data into tuple subflows.
2699 * @constructor
2700 * @param {object} params - The parameters for this operator.
2701 * @param {function(Dataflow, string): Operator} params.subflow - A function
2702 * that generates a subflow of operators and returns its root operator.
2703 * @param {function(object): Array<object>} params.field - The field
2704 * accessor for an array of subflow tuple objects.
2705 */
2706function PreFacet(params) {
2707 Facet.call(this, params);
2708}
2709inherits(PreFacet, Facet, {
2710 transform(_, pulse) {
2711 const flow = _.subflow,
2712 field = _.field,
2713 subflow = t => this.subflow(tupleid(t), flow, pulse, t);
2714 if (_.modified('field') || field && pulse.modified(accessorFields(field))) {
2715 error('PreFacet does not support field modification.');
2716 }
2717 this.initTargets(); // reset list of active subflows
2718
2719 if (field) {
2720 pulse.visit(pulse.MOD, t => {
2721 const sf = subflow(t);
2722 field(t).forEach(_ => sf.mod(_));
2723 });
2724 pulse.visit(pulse.ADD, t => {
2725 const sf = subflow(t);
2726 field(t).forEach(_ => sf.add(ingest(_)));
2727 });
2728 pulse.visit(pulse.REM, t => {
2729 const sf = subflow(t);
2730 field(t).forEach(_ => sf.rem(_));
2731 });
2732 } else {
2733 pulse.visit(pulse.MOD, t => subflow(t).mod(t));
2734 pulse.visit(pulse.ADD, t => subflow(t).add(t));
2735 pulse.visit(pulse.REM, t => subflow(t).rem(t));
2736 }
2737 if (pulse.clean()) {
2738 pulse.runAfter(() => this.clean());
2739 }
2740 return pulse;
2741 }
2742});
2743
2744/**
2745 * Performs a relational projection, copying selected fields from source
2746 * tuples to a new set of derived tuples.
2747 * @constructor
2748 * @param {object} params - The parameters for this operator.
2749 * @param {Array<function(object): *} params.fields - The fields to project,
2750 * as an array of field accessors. If unspecified, all fields will be
2751 * copied with names unchanged.
2752 * @param {Array<string>} [params.as] - Output field names for each projected
2753 * field. Any unspecified fields will use the field name provided by
2754 * the field accessor.
2755 */
2756function Project(params) {
2757 Transform.call(this, null, params);
2758}
2759Project.Definition = {
2760 'type': 'Project',
2761 'metadata': {
2762 'generates': true,
2763 'changes': true
2764 },
2765 'params': [{
2766 'name': 'fields',
2767 'type': 'field',
2768 'array': true
2769 }, {
2770 'name': 'as',
2771 'type': 'string',
2772 'null': true,
2773 'array': true
2774 }]
2775};
2776inherits(Project, Transform, {
2777 transform(_, pulse) {
2778 const out = pulse.fork(pulse.NO_SOURCE),
2779 fields = _.fields,
2780 as = fieldNames(_.fields, _.as || []),
2781 derive = fields ? (s, t) => project(s, t, fields, as) : rederive;
2782 let lut;
2783 if (this.value) {
2784 lut = this.value;
2785 } else {
2786 pulse = pulse.addAll();
2787 lut = this.value = {};
2788 }
2789 pulse.visit(pulse.REM, t => {
2790 const id = tupleid(t);
2791 out.rem.push(lut[id]);
2792 lut[id] = null;
2793 });
2794 pulse.visit(pulse.ADD, t => {
2795 const dt = derive(t, ingest({}));
2796 lut[tupleid(t)] = dt;
2797 out.add.push(dt);
2798 });
2799 pulse.visit(pulse.MOD, t => {
2800 out.mod.push(derive(t, lut[tupleid(t)]));
2801 });
2802 return out;
2803 }
2804});
2805function project(s, t, fields, as) {
2806 for (let i = 0, n = fields.length; i < n; ++i) {
2807 t[as[i]] = fields[i](s);
2808 }
2809 return t;
2810}
2811
2812/**
2813 * Proxy the value of another operator as a pure signal value.
2814 * Ensures no tuples are propagated.
2815 * @constructor
2816 * @param {object} params - The parameters for this operator.
2817 * @param {*} params.value - The value to proxy, becomes the value of this operator.
2818 */
2819function Proxy(params) {
2820 Transform.call(this, null, params);
2821}
2822inherits(Proxy, Transform, {
2823 transform(_, pulse) {
2824 this.value = _.value;
2825 return _.modified('value') ? pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS) : pulse.StopPropagation;
2826 }
2827});
2828
2829/**
2830 * Generates sample quantile values from an input data stream.
2831 * @constructor
2832 * @param {object} params - The parameters for this operator.
2833 * @param {function(object): *} params.field - An accessor for the data field
2834 * over which to calculate quantile values.
2835 * @param {Array<function(object): *>} [params.groupby] - An array of accessors
2836 * to groupby.
2837 * @param {Array<number>} [params.probs] - An array of probabilities in
2838 * the range (0, 1) for which to compute quantile values. If not specified,
2839 * the *step* parameter will be used.
2840 * @param {Array<number>} [params.step=0.01] - A probability step size for
2841 * sampling quantile values. All values from one-half the step size up to
2842 * 1 (exclusive) will be sampled. This parameter is only used if the
2843 * *quantiles* parameter is not provided.
2844 */
2845function Quantile(params) {
2846 Transform.call(this, null, params);
2847}
2848Quantile.Definition = {
2849 'type': 'Quantile',
2850 'metadata': {
2851 'generates': true,
2852 'changes': true
2853 },
2854 'params': [{
2855 'name': 'groupby',
2856 'type': 'field',
2857 'array': true
2858 }, {
2859 'name': 'field',
2860 'type': 'field',
2861 'required': true
2862 }, {
2863 'name': 'probs',
2864 'type': 'number',
2865 'array': true
2866 }, {
2867 'name': 'step',
2868 'type': 'number',
2869 'default': 0.01
2870 }, {
2871 'name': 'as',
2872 'type': 'string',
2873 'array': true,
2874 'default': ['prob', 'value']
2875 }]
2876};
2877const EPSILON = 1e-14;
2878inherits(Quantile, Transform, {
2879 transform(_, pulse) {
2880 const out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS),
2881 as = _.as || ['prob', 'value'];
2882 if (this.value && !_.modified() && !pulse.changed()) {
2883 out.source = this.value;
2884 return out;
2885 }
2886 const source = pulse.materialize(pulse.SOURCE).source,
2887 groups = partition$1(source, _.groupby, _.field),
2888 names = (_.groupby || []).map(accessorName),
2889 values = [],
2890 step = _.step || 0.01,
2891 p = _.probs || range(step / 2, 1 - EPSILON, step),
2892 n = p.length;
2893 groups.forEach(g => {
2894 const q = quantiles(g, p);
2895 for (let i = 0; i < n; ++i) {
2896 const t = {};
2897 for (let i = 0; i < names.length; ++i) {
2898 t[names[i]] = g.dims[i];
2899 }
2900 t[as[0]] = p[i];
2901 t[as[1]] = q[i];
2902 values.push(ingest(t));
2903 }
2904 });
2905 if (this.value) out.rem = this.value;
2906 this.value = out.add = out.source = values;
2907 return out;
2908 }
2909});
2910
2911/**
2912 * Relays a data stream between data processing pipelines.
2913 * If the derive parameter is set, this transform will create derived
2914 * copies of observed tuples. This provides derived data streams in which
2915 * modifications to the tuples do not pollute an upstream data source.
2916 * @param {object} params - The parameters for this operator.
2917 * @param {number} [params.derive=false] - Boolean flag indicating if
2918 * the transform should make derived copies of incoming tuples.
2919 * @constructor
2920 */
2921function Relay(params) {
2922 Transform.call(this, null, params);
2923}
2924inherits(Relay, Transform, {
2925 transform(_, pulse) {
2926 let out, lut;
2927 if (this.value) {
2928 lut = this.value;
2929 } else {
2930 out = pulse = pulse.addAll();
2931 lut = this.value = {};
2932 }
2933 if (_.derive) {
2934 out = pulse.fork(pulse.NO_SOURCE);
2935 pulse.visit(pulse.REM, t => {
2936 const id = tupleid(t);
2937 out.rem.push(lut[id]);
2938 lut[id] = null;
2939 });
2940 pulse.visit(pulse.ADD, t => {
2941 const dt = derive(t);
2942 lut[tupleid(t)] = dt;
2943 out.add.push(dt);
2944 });
2945 pulse.visit(pulse.MOD, t => {
2946 const dt = lut[tupleid(t)];
2947 for (const k in t) {
2948 dt[k] = t[k];
2949 // down stream writes may overwrite re-derived tuples
2950 // conservatively mark all source fields as modified
2951 out.modifies(k);
2952 }
2953 out.mod.push(dt);
2954 });
2955 }
2956 return out;
2957 }
2958});
2959
2960/**
2961 * Samples tuples passing through this operator.
2962 * Uses reservoir sampling to maintain a representative sample.
2963 * @constructor
2964 * @param {object} params - The parameters for this operator.
2965 * @param {number} [params.size=1000] - The maximum number of samples.
2966 */
2967function Sample(params) {
2968 Transform.call(this, [], params);
2969 this.count = 0;
2970}
2971Sample.Definition = {
2972 'type': 'Sample',
2973 'metadata': {},
2974 'params': [{
2975 'name': 'size',
2976 'type': 'number',
2977 'default': 1000
2978 }]
2979};
2980inherits(Sample, Transform, {
2981 transform(_, pulse) {
2982 const out = pulse.fork(pulse.NO_SOURCE),
2983 mod = _.modified('size'),
2984 num = _.size,
2985 map = this.value.reduce((m, t) => (m[tupleid(t)] = 1, m), {});
2986 let res = this.value,
2987 cnt = this.count,
2988 cap = 0;
2989
2990 // sample reservoir update function
2991 function update(t) {
2992 let p, idx;
2993 if (res.length < num) {
2994 res.push(t);
2995 } else {
2996 idx = ~~((cnt + 1) * random());
2997 if (idx < res.length && idx >= cap) {
2998 p = res[idx];
2999 if (map[tupleid(p)]) out.rem.push(p); // eviction
3000 res[idx] = t;
3001 }
3002 }
3003 ++cnt;
3004 }
3005 if (pulse.rem.length) {
3006 // find all tuples that should be removed, add to output
3007 pulse.visit(pulse.REM, t => {
3008 const id = tupleid(t);
3009 if (map[id]) {
3010 map[id] = -1;
3011 out.rem.push(t);
3012 }
3013 --cnt;
3014 });
3015
3016 // filter removed tuples out of the sample reservoir
3017 res = res.filter(t => map[tupleid(t)] !== -1);
3018 }
3019 if ((pulse.rem.length || mod) && res.length < num && pulse.source) {
3020 // replenish sample if backing data source is available
3021 cap = cnt = res.length;
3022 pulse.visit(pulse.SOURCE, t => {
3023 // update, but skip previously sampled tuples
3024 if (!map[tupleid(t)]) update(t);
3025 });
3026 cap = -1;
3027 }
3028 if (mod && res.length > num) {
3029 const n = res.length - num;
3030 for (let i = 0; i < n; ++i) {
3031 map[tupleid(res[i])] = -1;
3032 out.rem.push(res[i]);
3033 }
3034 res = res.slice(n);
3035 }
3036 if (pulse.mod.length) {
3037 // propagate modified tuples in the sample reservoir
3038 pulse.visit(pulse.MOD, t => {
3039 if (map[tupleid(t)]) out.mod.push(t);
3040 });
3041 }
3042 if (pulse.add.length) {
3043 // update sample reservoir
3044 pulse.visit(pulse.ADD, update);
3045 }
3046 if (pulse.add.length || cap < 0) {
3047 // output newly added tuples
3048 out.add = res.filter(t => !map[tupleid(t)]);
3049 }
3050 this.count = cnt;
3051 this.value = out.source = res;
3052 return out;
3053 }
3054});
3055
3056/**
3057 * Generates data tuples for a specified sequence range of numbers.
3058 * @constructor
3059 * @param {object} params - The parameters for this operator.
3060 * @param {number} params.start - The first number in the sequence.
3061 * @param {number} params.stop - The last number (exclusive) in the sequence.
3062 * @param {number} [params.step=1] - The step size between numbers in the sequence.
3063 */
3064function Sequence(params) {
3065 Transform.call(this, null, params);
3066}
3067Sequence.Definition = {
3068 'type': 'Sequence',
3069 'metadata': {
3070 'generates': true,
3071 'changes': true
3072 },
3073 'params': [{
3074 'name': 'start',
3075 'type': 'number',
3076 'required': true
3077 }, {
3078 'name': 'stop',
3079 'type': 'number',
3080 'required': true
3081 }, {
3082 'name': 'step',
3083 'type': 'number',
3084 'default': 1
3085 }, {
3086 'name': 'as',
3087 'type': 'string',
3088 'default': 'data'
3089 }]
3090};
3091inherits(Sequence, Transform, {
3092 transform(_, pulse) {
3093 if (this.value && !_.modified()) return;
3094 const out = pulse.materialize().fork(pulse.MOD),
3095 as = _.as || 'data';
3096 out.rem = this.value ? pulse.rem.concat(this.value) : pulse.rem;
3097 this.value = range(_.start, _.stop, _.step || 1).map(v => {
3098 const t = {};
3099 t[as] = v;
3100 return ingest(t);
3101 });
3102 out.add = pulse.add.concat(this.value);
3103 return out;
3104 }
3105});
3106
3107/**
3108 * Propagates a new pulse without any tuples so long as the input
3109 * pulse contains some added, removed or modified tuples.
3110 * @param {object} params - The parameters for this operator.
3111 * @constructor
3112 */
3113function Sieve(params) {
3114 Transform.call(this, null, params);
3115 this.modified(true); // always treat as modified
3116}
3117
3118inherits(Sieve, Transform, {
3119 transform(_, pulse) {
3120 this.value = pulse.source;
3121 return pulse.changed() ? pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS) : pulse.StopPropagation;
3122 }
3123});
3124
3125/**
3126 * Discretize dates to specific time units.
3127 * @constructor
3128 * @param {object} params - The parameters for this operator.
3129 * @param {function(object): *} params.field - The data field containing date/time values.
3130 */
3131function TimeUnit(params) {
3132 Transform.call(this, null, params);
3133}
3134const OUTPUT = ['unit0', 'unit1'];
3135TimeUnit.Definition = {
3136 'type': 'TimeUnit',
3137 'metadata': {
3138 'modifies': true
3139 },
3140 'params': [{
3141 'name': 'field',
3142 'type': 'field',
3143 'required': true
3144 }, {
3145 'name': 'interval',
3146 'type': 'boolean',
3147 'default': true
3148 }, {
3149 'name': 'units',
3150 'type': 'enum',
3151 'values': TIME_UNITS,
3152 'array': true
3153 }, {
3154 'name': 'step',
3155 'type': 'number',
3156 'default': 1
3157 }, {
3158 'name': 'maxbins',
3159 'type': 'number',
3160 'default': 40
3161 }, {
3162 'name': 'extent',
3163 'type': 'date',
3164 'array': true
3165 }, {
3166 'name': 'timezone',
3167 'type': 'enum',
3168 'default': 'local',
3169 'values': ['local', 'utc']
3170 }, {
3171 'name': 'as',
3172 'type': 'string',
3173 'array': true,
3174 'length': 2,
3175 'default': OUTPUT
3176 }]
3177};
3178inherits(TimeUnit, Transform, {
3179 transform(_, pulse) {
3180 const field = _.field,
3181 band = _.interval !== false,
3182 utc = _.timezone === 'utc',
3183 floor = this._floor(_, pulse),
3184 offset = (utc ? utcInterval : timeInterval)(floor.unit).offset,
3185 as = _.as || OUTPUT,
3186 u0 = as[0],
3187 u1 = as[1],
3188 step = floor.step;
3189 let min = floor.start || Infinity,
3190 max = floor.stop || -Infinity,
3191 flag = pulse.ADD;
3192 if (_.modified() || pulse.changed(pulse.REM) || pulse.modified(accessorFields(field))) {
3193 pulse = pulse.reflow(true);
3194 flag = pulse.SOURCE;
3195 min = Infinity;
3196 max = -Infinity;
3197 }
3198 pulse.visit(flag, t => {
3199 const v = field(t);
3200 let a, b;
3201 if (v == null) {
3202 t[u0] = null;
3203 if (band) t[u1] = null;
3204 } else {
3205 t[u0] = a = b = floor(v);
3206 if (band) t[u1] = b = offset(a, step);
3207 if (a < min) min = a;
3208 if (b > max) max = b;
3209 }
3210 });
3211 floor.start = min;
3212 floor.stop = max;
3213 return pulse.modifies(band ? as : u0);
3214 },
3215 _floor(_, pulse) {
3216 const utc = _.timezone === 'utc';
3217
3218 // get parameters
3219 const {
3220 units,
3221 step
3222 } = _.units ? {
3223 units: _.units,
3224 step: _.step || 1
3225 } : timeBin({
3226 extent: _.extent || extent(pulse.materialize(pulse.SOURCE).source, _.field),
3227 maxbins: _.maxbins
3228 });
3229
3230 // check / standardize time units
3231 const tunits = timeUnits(units),
3232 prev = this.value || {},
3233 floor = (utc ? utcFloor : timeFloor)(tunits, step);
3234 floor.unit = peek(tunits);
3235 floor.units = tunits;
3236 floor.step = step;
3237 floor.start = prev.start;
3238 floor.stop = prev.stop;
3239 return this.value = floor;
3240 }
3241});
3242
3243/**
3244 * An index that maps from unique, string-coerced, field values to tuples.
3245 * Assumes that the field serves as a unique key with no duplicate values.
3246 * @constructor
3247 * @param {object} params - The parameters for this operator.
3248 * @param {function(object): *} params.field - The field accessor to index.
3249 */
3250function TupleIndex(params) {
3251 Transform.call(this, fastmap(), params);
3252}
3253inherits(TupleIndex, Transform, {
3254 transform(_, pulse) {
3255 const df = pulse.dataflow,
3256 field = _.field,
3257 index = this.value,
3258 set = t => index.set(field(t), t);
3259 let mod = true;
3260 if (_.modified('field') || pulse.modified(field.fields)) {
3261 index.clear();
3262 pulse.visit(pulse.SOURCE, set);
3263 } else if (pulse.changed()) {
3264 pulse.visit(pulse.REM, t => index.delete(field(t)));
3265 pulse.visit(pulse.ADD, set);
3266 } else {
3267 mod = false;
3268 }
3269 this.modified(mod);
3270 if (index.empty > df.cleanThreshold) df.runAfter(index.clean);
3271 return pulse.fork();
3272 }
3273});
3274
3275/**
3276 * Extracts an array of values. Assumes the source data has already been
3277 * reduced as needed (e.g., by an upstream Aggregate transform).
3278 * @constructor
3279 * @param {object} params - The parameters for this operator.
3280 * @param {function(object): *} params.field - The domain field to extract.
3281 * @param {function(*,*): number} [params.sort] - An optional
3282 * comparator function for sorting the values. The comparator will be
3283 * applied to backing tuples prior to value extraction.
3284 */
3285function Values(params) {
3286 Transform.call(this, null, params);
3287}
3288inherits(Values, Transform, {
3289 transform(_, pulse) {
3290 const run = !this.value || _.modified('field') || _.modified('sort') || pulse.changed() || _.sort && pulse.modified(_.sort.fields);
3291 if (run) {
3292 this.value = (_.sort ? pulse.source.slice().sort(stableCompare(_.sort)) : pulse.source).map(_.field);
3293 }
3294 }
3295});
3296
3297function WindowOp(op, field, param, as) {
3298 const fn = WindowOps[op](field, param);
3299 return {
3300 init: fn.init || zero,
3301 update: function (w, t) {
3302 t[as] = fn.next(w);
3303 }
3304 };
3305}
3306const WindowOps = {
3307 row_number: function () {
3308 return {
3309 next: w => w.index + 1
3310 };
3311 },
3312 rank: function () {
3313 let rank;
3314 return {
3315 init: () => rank = 1,
3316 next: w => {
3317 const i = w.index,
3318 data = w.data;
3319 return i && w.compare(data[i - 1], data[i]) ? rank = i + 1 : rank;
3320 }
3321 };
3322 },
3323 dense_rank: function () {
3324 let drank;
3325 return {
3326 init: () => drank = 1,
3327 next: w => {
3328 const i = w.index,
3329 d = w.data;
3330 return i && w.compare(d[i - 1], d[i]) ? ++drank : drank;
3331 }
3332 };
3333 },
3334 percent_rank: function () {
3335 const rank = WindowOps.rank(),
3336 next = rank.next;
3337 return {
3338 init: rank.init,
3339 next: w => (next(w) - 1) / (w.data.length - 1)
3340 };
3341 },
3342 cume_dist: function () {
3343 let cume;
3344 return {
3345 init: () => cume = 0,
3346 next: w => {
3347 const d = w.data,
3348 c = w.compare;
3349 let i = w.index;
3350 if (cume < i) {
3351 while (i + 1 < d.length && !c(d[i], d[i + 1])) ++i;
3352 cume = i;
3353 }
3354 return (1 + cume) / d.length;
3355 }
3356 };
3357 },
3358 ntile: function (field, num) {
3359 num = +num;
3360 if (!(num > 0)) error('ntile num must be greater than zero.');
3361 const cume = WindowOps.cume_dist(),
3362 next = cume.next;
3363 return {
3364 init: cume.init,
3365 next: w => Math.ceil(num * next(w))
3366 };
3367 },
3368 lag: function (field, offset) {
3369 offset = +offset || 1;
3370 return {
3371 next: w => {
3372 const i = w.index - offset;
3373 return i >= 0 ? field(w.data[i]) : null;
3374 }
3375 };
3376 },
3377 lead: function (field, offset) {
3378 offset = +offset || 1;
3379 return {
3380 next: w => {
3381 const i = w.index + offset,
3382 d = w.data;
3383 return i < d.length ? field(d[i]) : null;
3384 }
3385 };
3386 },
3387 first_value: function (field) {
3388 return {
3389 next: w => field(w.data[w.i0])
3390 };
3391 },
3392 last_value: function (field) {
3393 return {
3394 next: w => field(w.data[w.i1 - 1])
3395 };
3396 },
3397 nth_value: function (field, nth) {
3398 nth = +nth;
3399 if (!(nth > 0)) error('nth_value nth must be greater than zero.');
3400 return {
3401 next: w => {
3402 const i = w.i0 + (nth - 1);
3403 return i < w.i1 ? field(w.data[i]) : null;
3404 }
3405 };
3406 },
3407 prev_value: function (field) {
3408 let prev;
3409 return {
3410 init: () => prev = null,
3411 next: w => {
3412 const v = field(w.data[w.index]);
3413 return v != null ? prev = v : prev;
3414 }
3415 };
3416 },
3417 next_value: function (field) {
3418 let v, i;
3419 return {
3420 init: () => (v = null, i = -1),
3421 next: w => {
3422 const d = w.data;
3423 return w.index <= i ? v : (i = find(field, d, w.index)) < 0 ? (i = d.length, v = null) : v = field(d[i]);
3424 }
3425 };
3426 }
3427};
3428function find(field, data, index) {
3429 for (let n = data.length; index < n; ++index) {
3430 const v = field(data[index]);
3431 if (v != null) return index;
3432 }
3433 return -1;
3434}
3435const ValidWindowOps = Object.keys(WindowOps);
3436
3437function WindowState(_) {
3438 const ops = array(_.ops),
3439 fields = array(_.fields),
3440 params = array(_.params),
3441 as = array(_.as),
3442 outputs = this.outputs = [],
3443 windows = this.windows = [],
3444 inputs = {},
3445 map = {},
3446 counts = [],
3447 measures = [];
3448 let countOnly = true;
3449 function visitInputs(f) {
3450 array(accessorFields(f)).forEach(_ => inputs[_] = 1);
3451 }
3452 visitInputs(_.sort);
3453 ops.forEach((op, i) => {
3454 const field = fields[i],
3455 mname = accessorName(field),
3456 name = measureName(op, mname, as[i]);
3457 visitInputs(field);
3458 outputs.push(name);
3459
3460 // Window operation
3461 if (hasOwnProperty(WindowOps, op)) {
3462 windows.push(WindowOp(op, fields[i], params[i], name));
3463 }
3464
3465 // Aggregate operation
3466 else {
3467 if (field == null && op !== 'count') {
3468 error('Null aggregate field specified.');
3469 }
3470 if (op === 'count') {
3471 counts.push(name);
3472 return;
3473 }
3474 countOnly = false;
3475 let m = map[mname];
3476 if (!m) {
3477 m = map[mname] = [];
3478 m.field = field;
3479 measures.push(m);
3480 }
3481 m.push(createMeasure(op, name));
3482 }
3483 });
3484 if (counts.length || measures.length) {
3485 this.cell = cell(measures, counts, countOnly);
3486 }
3487 this.inputs = Object.keys(inputs);
3488}
3489const prototype = WindowState.prototype;
3490prototype.init = function () {
3491 this.windows.forEach(_ => _.init());
3492 if (this.cell) this.cell.init();
3493};
3494prototype.update = function (w, t) {
3495 const cell = this.cell,
3496 wind = this.windows,
3497 data = w.data,
3498 m = wind && wind.length;
3499 let j;
3500 if (cell) {
3501 for (j = w.p0; j < w.i0; ++j) cell.rem(data[j]);
3502 for (j = w.p1; j < w.i1; ++j) cell.add(data[j]);
3503 cell.set(t);
3504 }
3505 for (j = 0; j < m; ++j) wind[j].update(w, t);
3506};
3507function cell(measures, counts, countOnly) {
3508 measures = measures.map(m => compileMeasures(m, m.field));
3509 const cell = {
3510 num: 0,
3511 agg: null,
3512 store: false,
3513 count: counts
3514 };
3515 if (!countOnly) {
3516 var n = measures.length,
3517 a = cell.agg = Array(n),
3518 i = 0;
3519 for (; i < n; ++i) a[i] = new measures[i](cell);
3520 }
3521 if (cell.store) {
3522 var store = cell.data = new TupleStore();
3523 }
3524 cell.add = function (t) {
3525 cell.num += 1;
3526 if (countOnly) return;
3527 if (store) store.add(t);
3528 for (let i = 0; i < n; ++i) {
3529 a[i].add(a[i].get(t), t);
3530 }
3531 };
3532 cell.rem = function (t) {
3533 cell.num -= 1;
3534 if (countOnly) return;
3535 if (store) store.rem(t);
3536 for (let i = 0; i < n; ++i) {
3537 a[i].rem(a[i].get(t), t);
3538 }
3539 };
3540 cell.set = function (t) {
3541 let i, n;
3542
3543 // consolidate stored values
3544 if (store) store.values();
3545
3546 // update tuple properties
3547 for (i = 0, n = counts.length; i < n; ++i) t[counts[i]] = cell.num;
3548 if (!countOnly) for (i = 0, n = a.length; i < n; ++i) a[i].set(t);
3549 };
3550 cell.init = function () {
3551 cell.num = 0;
3552 if (store) store.reset();
3553 for (let i = 0; i < n; ++i) a[i].init();
3554 };
3555 return cell;
3556}
3557
3558/**
3559 * Perform window calculations and write results to the input stream.
3560 * @constructor
3561 * @param {object} params - The parameters for this operator.
3562 * @param {function(*,*): number} [params.sort] - A comparator function for sorting tuples within a window.
3563 * @param {Array<function(object): *>} [params.groupby] - An array of accessors by which to partition tuples into separate windows.
3564 * @param {Array<string>} params.ops - An array of strings indicating window operations to perform.
3565 * @param {Array<function(object): *>} [params.fields] - An array of accessors
3566 * for data fields to use as inputs to window operations.
3567 * @param {Array<*>} [params.params] - An array of parameter values for window operations.
3568 * @param {Array<string>} [params.as] - An array of output field names for window operations.
3569 * @param {Array<number>} [params.frame] - Window frame definition as two-element array.
3570 * @param {boolean} [params.ignorePeers=false] - If true, base window frame boundaries on row
3571 * number alone, ignoring peers with identical sort values. If false (default),
3572 * the window boundaries will be adjusted to include peer values.
3573 */
3574function Window(params) {
3575 Transform.call(this, {}, params);
3576 this._mlen = 0;
3577 this._mods = [];
3578}
3579Window.Definition = {
3580 'type': 'Window',
3581 'metadata': {
3582 'modifies': true
3583 },
3584 'params': [{
3585 'name': 'sort',
3586 'type': 'compare'
3587 }, {
3588 'name': 'groupby',
3589 'type': 'field',
3590 'array': true
3591 }, {
3592 'name': 'ops',
3593 'type': 'enum',
3594 'array': true,
3595 'values': ValidWindowOps.concat(ValidAggregateOps)
3596 }, {
3597 'name': 'params',
3598 'type': 'number',
3599 'null': true,
3600 'array': true
3601 }, {
3602 'name': 'fields',
3603 'type': 'field',
3604 'null': true,
3605 'array': true
3606 }, {
3607 'name': 'as',
3608 'type': 'string',
3609 'null': true,
3610 'array': true
3611 }, {
3612 'name': 'frame',
3613 'type': 'number',
3614 'null': true,
3615 'array': true,
3616 'length': 2,
3617 'default': [null, 0]
3618 }, {
3619 'name': 'ignorePeers',
3620 'type': 'boolean',
3621 'default': false
3622 }]
3623};
3624inherits(Window, Transform, {
3625 transform(_, pulse) {
3626 this.stamp = pulse.stamp;
3627 const mod = _.modified(),
3628 cmp = stableCompare(_.sort),
3629 key = groupkey(_.groupby),
3630 group = t => this.group(key(t));
3631
3632 // initialize window state
3633 let state = this.state;
3634 if (!state || mod) {
3635 state = this.state = new WindowState(_);
3636 }
3637
3638 // partition input tuples
3639 if (mod || pulse.modified(state.inputs)) {
3640 this.value = {};
3641 pulse.visit(pulse.SOURCE, t => group(t).add(t));
3642 } else {
3643 pulse.visit(pulse.REM, t => group(t).remove(t));
3644 pulse.visit(pulse.ADD, t => group(t).add(t));
3645 }
3646
3647 // perform window calculations for each modified partition
3648 for (let i = 0, n = this._mlen; i < n; ++i) {
3649 processPartition(this._mods[i], state, cmp, _);
3650 }
3651 this._mlen = 0;
3652 this._mods = [];
3653
3654 // TODO don't reflow everything?
3655 return pulse.reflow(mod).modifies(state.outputs);
3656 },
3657 group(key) {
3658 let group = this.value[key];
3659 if (!group) {
3660 group = this.value[key] = SortedList(tupleid);
3661 group.stamp = -1;
3662 }
3663 if (group.stamp < this.stamp) {
3664 group.stamp = this.stamp;
3665 this._mods[this._mlen++] = group;
3666 }
3667 return group;
3668 }
3669});
3670function processPartition(list, state, cmp, _) {
3671 const sort = _.sort,
3672 range = sort && !_.ignorePeers,
3673 frame = _.frame || [null, 0],
3674 data = list.data(cmp),
3675 // use cmp for stable sort
3676 n = data.length,
3677 b = range ? bisector(sort) : null,
3678 w = {
3679 i0: 0,
3680 i1: 0,
3681 p0: 0,
3682 p1: 0,
3683 index: 0,
3684 data: data,
3685 compare: sort || constant(-1)
3686 };
3687 state.init();
3688 for (let i = 0; i < n; ++i) {
3689 setWindow(w, frame, i, n);
3690 if (range) adjustRange(w, b);
3691 state.update(w, data[i]);
3692 }
3693}
3694function setWindow(w, f, i, n) {
3695 w.p0 = w.i0;
3696 w.p1 = w.i1;
3697 w.i0 = f[0] == null ? 0 : Math.max(0, i - Math.abs(f[0]));
3698 w.i1 = f[1] == null ? n : Math.min(n, i + Math.abs(f[1]) + 1);
3699 w.index = i;
3700}
3701
3702// if frame type is 'range', adjust window for peer values
3703function adjustRange(w, bisect) {
3704 const r0 = w.i0,
3705 r1 = w.i1 - 1,
3706 c = w.compare,
3707 d = w.data,
3708 n = d.length - 1;
3709 if (r0 > 0 && !c(d[r0], d[r0 - 1])) w.i0 = bisect.left(d, d[r0]);
3710 if (r1 < n && !c(d[r1], d[r1 + 1])) w.i1 = bisect.right(d, d[r1]);
3711}
3712
3713export { Aggregate as aggregate, Bin as bin, Collect as collect, Compare as compare, CountPattern as countpattern, Cross as cross, Density as density, DotBin as dotbin, Expression as expression, Extent as extent, Facet as facet, Field as field, Filter as filter, Flatten as flatten, Fold as fold, Formula as formula, Generate as generate, Impute as impute, JoinAggregate as joinaggregate, KDE as kde, Key as key, Load as load, Lookup as lookup, MultiExtent as multiextent, MultiValues as multivalues, Params as params, Pivot as pivot, PreFacet as prefacet, Project as project, Proxy as proxy, Quantile as quantile, Relay as relay, Sample as sample, Sequence as sequence, Sieve as sieve, Subflow as subflow, TimeUnit as timeunit, TupleIndex as tupleindex, Values as values, Window as window };