UNPKG

5.34 kBJavaScriptView Raw
1
2/**
3 * Passes all events
4 */
5function passthrough () {
6 return function (rf) {
7 // this takes 2 things and makes them 1
8 return async (acc, val) => {
9 return rf(acc, val) // <-- rf replaces 'concat'
10 }
11 }
12}
13
14/**
15 * Performs a mapping operation on each record in the stream
16 * @param {*} f
17 */function mapping (f) {
18 return function (rf) {
19 // this takes 2 things and makes them 1
20 return async (acc, val) => {
21 var m = await f(val)
22 var r = await rf(acc, m) // <-- rf replaces 'concat'
23 if (r.hasOwnProperty('reduced')) {
24 if (r.reduced) {
25 return r
26 } else {
27 return acc
28 }
29 } else {
30 return r
31 }
32 }
33 }
34 }
35
36 /**
37 * Removes records from the stream if the dont match the predicate
38 * @param {*} p
39 */
40 function filtering (p) {
41 return function (rf) {
42 // this takes 2 things and makes them 1
43 return async (acc, val) => {
44 var pred = (await p(val))
45 return pred ? rf(acc, val) : { reduced: null } // <-- rf replaces 'concat'
46 }
47 }
48 }
49
50 /**
51 * Takes toTake records from the stream
52 * @param {*} toTake
53 */
54function take (toTake) {
55 // assert(toTake && cnt > 0)
56 var count = toTake
57 return function (rf) {
58 // this takes 2 things and makes them 1
59 return async (acc, val) => {
60 if ((--count) < 0) {
61 return {
62 reduced: null
63 }
64 } else {
65 return rf(acc, val)
66 }
67 }
68 }
69}
70
71/**
72 * Skips forward in a stream by the toSkip records
73 * @param {*} toSkip
74 */
75function skip (toSkip) {
76 // assertMod(toSkip && toSkip >= 0)
77 var count = 0
78 return function (rf) {
79 // this takes 2 things and makes them 1
80 return async (acc, val) => {
81 if (count++ < toSkip) {
82 return { reduced: null }
83 } else {
84 return rf(acc, val)
85 }
86 }
87 }
88}
89
90/**
91 * Samples from a stream at a particular frequency.
92 * sample(1000) will sample a value once every second
93 * @param {*} period
94 */
95function sampling (period) {
96 // assertMod(cnt && cnt >= 0)
97 var last = 0
98 return function (rf) {
99 return async (acc, val) => {
100 var nw = Date.now()
101 var diff = nw - last
102 // console.log(diff)
103 if (diff < period) {
104 // console.log('Skip' + val)
105 return { reduced: null }
106 } else {
107 last = nw
108 // console.log('Accept' + val)
109 return rf(acc, val)
110 }
111 }
112 }
113}
114
115/**
116 * Transforms a series of measurements into events. The predicate determines when the measurements continue the event or non-event
117 * @param {*} predicate
118 */
119function eventing (predicate) {
120 var latest = null
121 var sequence = -1
122 return function (rf) {
123 return async (acc, val) => {
124 sequence++
125 var pred = (await predicate(val))
126 if (pred && !latest) {
127 latest = {
128 start: val.time ? val.time : sequence,
129 end: val.time ? val.time : sequence
130 }
131 return { reduced: null }
132 } else if (pred && latest) {
133 latest.end = val.time ? val.time : sequence
134 return { reduced: null }
135 } else if (!pred && latest) {
136 var next = latest
137 latest = null
138 return rf(acc, next)
139 } else {
140 return { reduced: null }
141 }
142 }
143 }
144}
145
146/**
147 * Split one event into multiple
148 * splitter is a function which maps a value to an array
149 */
150function split (splitter) {
151 return function (rf) {
152 // this takes 2 things and makes them 1
153 return async (acc, val) => {
154 var rs = await splitter(val)
155 var reduction = { reduced: [] }
156 try {
157 var acc2 = acc
158 for (var i = 0; i < rs.length; i++) {
159 var r = rs[i]
160 acc2 = await rf(acc2, r)
161 if (acc2.hasOwnProperty('reduced')) {
162 if (acc2.reduced) {
163 reduction.reduced.push(acc2.reduced)
164 }
165 } else {
166 reduction.reduced.push(acc2)
167 }
168 }
169 return reduction
170 } catch (ex) {
171 console.log(ex)
172 }
173 }
174 }
175}
176
177/**
178 * Randomly samples from a stream with a target frequency.
179 * randomFilter(100) will target to sample one value in every 100
180 * @param {*} countFrequency - a number
181 */
182function randomFilter (countFrequency) {
183 // assertMod(cnt && cnt >= 0
184 return function (rf) {
185 return async (acc, val) => {
186 var rand = Math.floor((Math.random() * countFrequency))
187 if (rand === 0) {
188 return rf(acc, val)
189 } else {
190 return { reduced: null }
191 }
192 }
193 }
194}
195
196 /**
197 * Make n neighbors available as each stream record is processed.
198 * @param {*} numberOfNeighbors
199 */
200 function neighbors (numNeighbors = 10) {
201 var neighbors = []
202
203 return function (rf) {
204 return async (acc, val) => {
205 neighbors.push(val)
206 if ((neighbors.length) < numNeighbors) {
207 return {
208 reduced: null
209 }
210 } else {
211 let first = neighbors[0]
212 let enriched = Object.assign({}, first)
213 enriched.neighbors = neighbors
214 enriched.data = first
215 let res = rf(acc, enriched)
216 neighbors.shift()
217 return res
218 }
219 }
220 }
221 }
222
223export {
224 passthrough,
225 mapping,
226 filtering,
227 take,
228 skip,
229 eventing,
230 sampling,
231 split,
232 randomFilter,
233 neighbors
234}
235
\No newline at end of file