UNPKG

4.7 kBJavaScriptView Raw
1import {groupkey} from './util/AggregateKeys';
2import {ValidAggregateOps} from './util/AggregateOps';
3import SortedList from './util/SortedList';
4import {ValidWindowOps} from './util/WindowOps';
5import WindowState from './util/WindowState';
6import {Transform, stableCompare, tupleid} from 'vega-dataflow';
7import {constant, inherits} from 'vega-util';
8import {bisector} from 'd3-array';
9
10/**
11 * Perform window calculations and write results to the input stream.
12 * @constructor
13 * @param {object} params - The parameters for this operator.
14 * @param {function(*,*): number} [params.sort] - A comparator function for sorting tuples within a window.
15 * @param {Array<function(object): *>} [params.groupby] - An array of accessors by which to partition tuples into separate windows.
16 * @param {Array<string>} params.ops - An array of strings indicating window operations to perform.
17 * @param {Array<function(object): *>} [params.fields] - An array of accessors
18 * for data fields to use as inputs to window operations.
19 * @param {Array<*>} [params.params] - An array of parameter values for window operations.
20 * @param {Array<string>} [params.as] - An array of output field names for window operations.
21 * @param {Array<number>} [params.frame] - Window frame definition as two-element array.
22 * @param {boolean} [params.ignorePeers=false] - If true, base window frame boundaries on row
23 * number alone, ignoring peers with identical sort values. If false (default),
24 * the window boundaries will be adjusted to include peer values.
25 */
26export default function Window(params) {
27 Transform.call(this, {}, params);
28 this._mlen = 0;
29 this._mods = [];
30}
31
32Window.Definition = {
33 'type': 'Window',
34 'metadata': {'modifies': true},
35 'params': [
36 { 'name': 'sort', 'type': 'compare' },
37 { 'name': 'groupby', 'type': 'field', 'array': true },
38 { 'name': 'ops', 'type': 'enum', 'array': true, 'values': ValidWindowOps.concat(ValidAggregateOps) },
39 { 'name': 'params', 'type': 'number', 'null': true, 'array': true },
40 { 'name': 'fields', 'type': 'field', 'null': true, 'array': true },
41 { 'name': 'as', 'type': 'string', 'null': true, 'array': true },
42 { 'name': 'frame', 'type': 'number', 'null': true, 'array': true, 'length': 2, 'default': [null, 0] },
43 { 'name': 'ignorePeers', 'type': 'boolean', 'default': false }
44 ]
45};
46
47inherits(Window, Transform, {
48 transform(_, pulse) {
49 this.stamp = pulse.stamp;
50
51 const mod = _.modified(),
52 cmp = stableCompare(_.sort),
53 key = groupkey(_.groupby),
54 group = t => this.group(key(t));
55
56 // initialize window state
57 let state = this.state;
58 if (!state || mod) {
59 state = this.state = new WindowState(_);
60 }
61
62 // partition input tuples
63 if (mod || pulse.modified(state.inputs)) {
64 this.value = {};
65 pulse.visit(pulse.SOURCE, t => group(t).add(t));
66 } else {
67 pulse.visit(pulse.REM, t => group(t).remove(t));
68 pulse.visit(pulse.ADD, t => group(t).add(t));
69 }
70
71 // perform window calculations for each modified partition
72 for (let i=0, n=this._mlen; i<n; ++i) {
73 processPartition(this._mods[i], state, cmp, _);
74 }
75 this._mlen = 0;
76 this._mods = [];
77
78 // TODO don't reflow everything?
79 return pulse.reflow(mod).modifies(state.outputs);
80 },
81
82 group(key) {
83 let group = this.value[key];
84
85 if (!group) {
86 group = this.value[key] = SortedList(tupleid);
87 group.stamp = -1;
88 }
89
90 if (group.stamp < this.stamp) {
91 group.stamp = this.stamp;
92 this._mods[this._mlen++] = group;
93 }
94
95 return group;
96 }
97});
98
99function processPartition(list, state, cmp, _) {
100 const sort = _.sort,
101 range = sort && !_.ignorePeers,
102 frame = _.frame || [null, 0],
103 data = list.data(cmp), // use cmp for stable sort
104 n = data.length,
105 b = range ? bisector(sort) : null,
106 w = {
107 i0: 0, i1: 0, p0: 0, p1: 0, index: 0,
108 data: data, compare: sort || constant(-1)
109 };
110
111 state.init();
112 for (let i=0; i<n; ++i) {
113 setWindow(w, frame, i, n);
114 if (range) adjustRange(w, b);
115 state.update(w, data[i]);
116 }
117}
118
119function setWindow(w, f, i, n) {
120 w.p0 = w.i0;
121 w.p1 = w.i1;
122 w.i0 = f[0] == null ? 0 : Math.max(0, i - Math.abs(f[0]));
123 w.i1 = f[1] == null ? n : Math.min(n, i + Math.abs(f[1]) + 1);
124 w.index = i;
125}
126
127// if frame type is 'range', adjust window for peer values
128function adjustRange(w, bisect) {
129 const r0 = w.i0,
130 r1 = w.i1 - 1,
131 c = w.compare,
132 d = w.data,
133 n = d.length - 1;
134
135 if (r0 > 0 && !c(d[r0], d[r0-1])) w.i0 = bisect.left(d, d[r0]);
136 if (r1 < n && !c(d[r1], d[r1+1])) w.i1 = bisect.right(d, d[r1]);
137}