UNPKG

4.77 kBJavaScriptView Raw
1import {groupkey} from './util/AggregateKeys';
2import {ValidAggregateOps} from './util/AggregateOps';
3import SortedList from './util/SortedList';
4import {ValidWindowOps} from './util/WindowOps';
5import WindowState from './util/WindowState';
6import {Transform, stableCompare, tupleid} from 'vega-dataflow';
7import {constant, inherits} from 'vega-util';
8import {bisector} from 'd3-array';
9
10/**
11 * Perform window calculations and write results to the input stream.
12 * @constructor
13 * @param {object} params - The parameters for this operator.
14 * @param {function(*,*): number} [params.sort] - A comparator function for sorting tuples within a window.
15 * @param {Array<function(object): *>} [params.groupby] - An array of accessors by which to partition tuples into separate windows.
16 * @param {Array<string>} params.ops - An array of strings indicating window operations to perform.
17 * @param {Array<function(object): *>} [params.fields] - An array of accessors
18 * for data fields to use as inputs to window operations.
19 * @param {Array<*>} [params.params] - An array of parameter values for window operations.
20 * @param {Array<string>} [params.as] - An array of output field names for window operations.
21 * @param {Array<number>} [params.frame] - Window frame definition as two-element array.
22 * @param {boolean} [params.ignorePeers=false] - If true, base window frame boundaries on row
23 * number alone, ignoring peers with identical sort values. If false (default),
24 * the window boundaries will be adjusted to include peer values.
25 */
26export default function Window(params) {
27 Transform.call(this, {}, params);
28 this._mlen = 0;
29 this._mods = [];
30}
31
32Window.Definition = {
33 'type': 'Window',
34 'metadata': {'modifies': true},
35 'params': [
36 { 'name': 'sort', 'type': 'compare' },
37 { 'name': 'groupby', 'type': 'field', 'array': true },
38 { 'name': 'ops', 'type': 'enum', 'array': true, 'values': ValidWindowOps.concat(ValidAggregateOps) },
39 { 'name': 'params', 'type': 'number', 'null': true, 'array': true },
40 { 'name': 'fields', 'type': 'field', 'null': true, 'array': true },
41 { 'name': 'as', 'type': 'string', 'null': true, 'array': true },
42 { 'name': 'frame', 'type': 'number', 'null': true, 'array': true, 'length': 2, 'default': [null, 0] },
43 { 'name': 'ignorePeers', 'type': 'boolean', 'default': false }
44 ]
45};
46
47var prototype = inherits(Window, Transform);
48
49prototype.transform = function(_, pulse) {
50 var self = this,
51 state = self.state,
52 mod = _.modified(),
53 cmp = stableCompare(_.sort),
54 i, n;
55
56 this.stamp = pulse.stamp;
57
58 // initialize window state
59 if (!state || mod) {
60 state = self.state = new WindowState(_);
61 }
62
63 // retrieve group for a tuple
64 var key = groupkey(_.groupby);
65 function group(t) { return self.group(key(t)); }
66
67 // partition input tuples
68 if (mod || pulse.modified(state.inputs)) {
69 self.value = {};
70 pulse.visit(pulse.SOURCE, function(t) { group(t).add(t); });
71 } else {
72 pulse.visit(pulse.REM, function(t) { group(t).remove(t); });
73 pulse.visit(pulse.ADD, function(t) { group(t).add(t); });
74 }
75
76 // perform window calculations for each modified partition
77 for (i=0, n=self._mlen; i<n; ++i) {
78 processPartition(self._mods[i], state, cmp, _);
79 }
80 self._mlen = 0;
81 self._mods = [];
82
83 // TODO don't reflow everything?
84 return pulse.reflow(mod).modifies(state.outputs);
85};
86
87prototype.group = function(key) {
88 var self = this,
89 group = self.value[key];
90
91 if (!group) {
92 group = self.value[key] = SortedList(tupleid);
93 group.stamp = -1;
94 }
95
96 if (group.stamp < self.stamp) {
97 group.stamp = self.stamp;
98 self._mods[self._mlen++] = group;
99 }
100
101 return group;
102};
103
104function processPartition(list, state, cmp, _) {
105 var sort = _.sort,
106 range = sort && !_.ignorePeers,
107 frame = _.frame || [null, 0],
108 data = list.data(cmp), // use cmp for stable sort
109 n = data.length,
110 i = 0,
111 b = range ? bisector(sort) : null,
112 w = {
113 i0: 0, i1: 0, p0: 0, p1: 0, index: 0,
114 data: data, compare: sort || constant(-1)
115 };
116
117 for (state.init(); i<n; ++i) {
118 setWindow(w, frame, i, n);
119 if (range) adjustRange(w, b);
120 state.update(w, data[i]);
121 }
122}
123
124function setWindow(w, f, i, n) {
125 w.p0 = w.i0;
126 w.p1 = w.i1;
127 w.i0 = f[0] == null ? 0 : Math.max(0, i - Math.abs(f[0]));
128 w.i1 = f[1] == null ? n : Math.min(n, i + Math.abs(f[1]) + 1);
129 w.index = i;
130}
131
132// if frame type is 'range', adjust window for peer values
133function adjustRange(w, bisect) {
134 var r0 = w.i0,
135 r1 = w.i1 - 1,
136 c = w.compare,
137 d = w.data,
138 n = d.length - 1;
139
140 if (r0 > 0 && !c(d[r0], d[r0-1])) w.i0 = bisect.left(d, d[r0]);
141 if (r1 < n && !c(d[r1], d[r1+1])) w.i1 = bisect.right(d, d[r1]);
142}