UNPKG

9.38 kBJavaScriptView Raw
1const not = async (x) => {
2 var operand = await x;
3 return !operand
4 };
5
6const identity = x => x;
7
8// predicates
9const isEven = x => x % 2 === 0;
10const isGreaterThan = x => y => y > x ? y : 0;
11const digitize = x => x ? 1 : 0;
12
13const modulus = val => x => x % val;
14
15const select = property => x => x[property];
16
17function apply (x, f) {
18 return f(x)
19}
20
21function compose (...funcs) {
22 return x => funcs.reduceRight(apply, x)
23}
24
25function concat (accumlator, val) {
26 return accumlator.concat(val)
27}
28
29function latest (accumlator, val) {
30 return val
31}
32
33async function transduceAsyncIterator (transform, reducerfunction, init, asynciterator) {
34 var reducer = transform(reducerfunction);
35 var n = null;
36 do {
37 n = await asynciterator.next();
38 if (!n.done) {
39 var v = n.value;
40 init = await reducer(init, v);
41 }
42 } while (!n.done)
43 return init
44}
45
46async function transduceAsyncHasNextIterator (transform, reducerfunction, init, asynchasnextiterator) {
47 var reducer = transform(reducerfunction);
48 var n = null;
49 var r = null;
50 do {
51 n = await asynchasnextiterator.hasNext();
52 if (n) {
53 var v = await asynchasnextiterator.next();
54 r = await reducer(init, v);
55 if (r.hasOwnProperty('reduced')) {
56 if (r.reduced) {
57 init = r.reduced;
58 }
59 } else if (r) {
60 init = r;
61 }
62 }
63 } while (r && n)
64 return init
65}
66
67async function transduceArray (xf, rf, init, xs) {
68 // call reduce on the data structure internally (abstract it away)
69 // pass the rf to the composed transformation
70 // pass in the initial value
71 var xrf = await xf(rf);
72 return xs.reduce(xrf, init)
73}
74
75async function * transduceGenerator (transform, reducerfunction, init, streamgenerator) {
76 var reducer = transform(reducerfunction);
77 for await (const value of streamgenerator) {
78 var newinit = await reducer(init, value);
79 // Here we checked if there is new a 'reduced' value and only generate a new value when this is the case
80 if (!newinit) ; else if (newinit === init) ; else {
81 if (newinit.hasOwnProperty('reduced')) {
82 if (Array.isArray(newinit.reduced)) {
83 for (var i = 0; i < newinit.reduced.length; i++) {
84 init = newinit.reduced[i];
85 yield newinit.reduced[i];
86 }
87 }
88 } else {
89 init = newinit;
90 yield newinit;
91 }
92 }
93 }
94}
95
96/**
97 * Passes all events
98 */
99function passthrough () {
100 return function (rf) {
101 // this takes 2 things and makes them 1
102 return async (acc, val) => {
103 return rf(acc, val) // <-- rf replaces 'concat'
104 }
105 }
106}
107
108/**
109 * Performs a mapping operation on each record in the stream
110 * @param {*} f
111 */function mapping (f) {
112 return function (rf) {
113 // this takes 2 things and makes them 1
114 return async (acc, val) => {
115 var m = await f(val);
116 var r = await rf(acc, m); // <-- rf replaces 'concat'
117 if (r.hasOwnProperty('reduced')) {
118 if (r.reduced) {
119 return r
120 } else {
121 return acc
122 }
123 } else {
124 return r
125 }
126 }
127 }
128 }
129
130 /**
131 * Removes records from the stream if the dont match the predicate
132 * @param {*} p
133 */
134 function filtering (p) {
135 return function (rf) {
136 // this takes 2 things and makes them 1
137 return async (acc, val) => {
138 var pred = (await p(val));
139 return pred ? rf(acc, val) : { reduced: null } // <-- rf replaces 'concat'
140 }
141 }
142 }
143
144 /**
145 * Takes toTake records from the stream
146 * @param {*} toTake
147 */
148function take (toTake) {
149 // assert(toTake && cnt > 0)
150 var count = toTake;
151 return function (rf) {
152 // this takes 2 things and makes them 1
153 return async (acc, val) => {
154 if ((--count) < 0) {
155 return {
156 reduced: null
157 }
158 } else {
159 return rf(acc, val)
160 }
161 }
162 }
163}
164
165/**
166 * Skips forward in a stream by the toSkip records
167 * @param {*} toSkip
168 */
169function skip (toSkip) {
170 // assertMod(toSkip && toSkip >= 0)
171 var count = 0;
172 return function (rf) {
173 // this takes 2 things and makes them 1
174 return async (acc, val) => {
175 if (count++ < toSkip) {
176 return { reduced: null }
177 } else {
178 return rf(acc, val)
179 }
180 }
181 }
182}
183
184/**
185 * Samples from a stream at a particular frequency.
186 * sample(1000) will sample a value once every second
187 * @param {*} period
188 */
189function sampling (period) {
190 // assertMod(cnt && cnt >= 0)
191 var last = 0;
192 return function (rf) {
193 return async (acc, val) => {
194 var nw = Date.now();
195 var diff = nw - last;
196 // console.log(diff)
197 if (diff < period) {
198 // console.log('Skip' + val)
199 return { reduced: null }
200 } else {
201 last = nw;
202 // console.log('Accept' + val)
203 return rf(acc, val)
204 }
205 }
206 }
207}
208
209/**
210 * Transforms a series of measurements into events. The predicate determines when the measurements continue the event or non-event
211 * @param {*} predicate
212 */
213function eventing (predicate) {
214 var latest = null;
215 var sequence = -1;
216 return function (rf) {
217 return async (acc, val) => {
218 sequence++;
219 var pred = (await predicate(val));
220 if (pred && !latest) {
221 latest = {
222 start: val.time ? val.time : sequence,
223 end: val.time ? val.time : sequence
224 };
225 return { reduced: null }
226 } else if (pred && latest) {
227 latest.end = val.time ? val.time : sequence;
228 return { reduced: null }
229 } else if (!pred && latest) {
230 var next = latest;
231 latest = null;
232 return rf(acc, next)
233 } else {
234 return { reduced: null }
235 }
236 }
237 }
238}
239
240/**
241 * Split one event into multiple
242 * splitter is a function which maps a value to an array
243 */
244function split (splitter) {
245 return function (rf) {
246 // this takes 2 things and makes them 1
247 return async (acc, val) => {
248 var rs = await splitter(val);
249 var reduction = { reduced: [] };
250 try {
251 var acc2 = acc;
252 for (var i = 0; i < rs.length; i++) {
253 var r = rs[i];
254 acc2 = await rf(acc2, r);
255 if (acc2.hasOwnProperty('reduced')) {
256 if (acc2.reduced) {
257 reduction.reduced.push(acc2.reduced);
258 }
259 } else {
260 reduction.reduced.push(acc2);
261 }
262 }
263 return reduction
264 } catch (ex) {
265 console.log(ex);
266 }
267 }
268 }
269}
270
271/**
272 * Randomly samples from a stream with a target frequency.
273 * randomFilter(100) will target to sample one value in every 100
274 * @param {*} countFrequency - a number
275 */
276function randomFilter (countFrequency) {
277 // assertMod(cnt && cnt >= 0
278 return function (rf) {
279 return async (acc, val) => {
280 var rand = Math.floor((Math.random() * countFrequency));
281 if (rand === 0) {
282 return rf(acc, val)
283 } else {
284 return { reduced: null }
285 }
286 }
287 }
288}
289
290 /**
291 * Make n neighbors available as each stream record is processed.
292 * @param {*} numberOfNeighbors
293 */
294 function neighbors (numNeighbors = 10) {
295 var neighbors = [];
296
297 return function (rf) {
298 return async (acc, val) => {
299 neighbors.push(val);
300 if ((neighbors.length) < numNeighbors) {
301 return {
302 reduced: null
303 }
304 } else {
305 let first = neighbors[0];
306 let enriched = Object.assign({}, first);
307 enriched.neighbors = neighbors;
308 enriched.data = first;
309 let res = rf(acc, enriched);
310 neighbors.shift();
311 return res
312 }
313 }
314 }
315 }
316
317async function * makeAsyncRangeIterator (start = 0, end = Infinity, step = 1, delay = 10) {
318 let iterationCount = 0;
319 for (let i = start; i < end; i += step) {
320 iterationCount++;
321 yield new Promise((resolve, reject) => {
322 setTimeout(() => {
323 resolve(i);
324 }, delay);
325 });
326 }
327 return iterationCount
328 }
329
330function makeAsyncHasNextRangeIterator (start = 0, end = Infinity, step = 1) {
331 let iterationCount = start;
332 return {
333 hasNext: function () {
334 return iterationCount < end
335 },
336 next: function () {
337 return new Promise((resolve, reject) => {
338 setTimeout(() => {
339 resolve(iterationCount++);
340 }, 10);
341 })
342 }
343 }
344}
345
346function makeArrayIterator (dataArray) {
347 let iterationCount = 0;
348 return {
349 hasNext: function () {
350 return iterationCount < dataArray.length
351 },
352 next: function () {
353 return dataArray[iterationCount++]
354 }
355 }
356 }
357
358export { not, identity, isEven, isGreaterThan, digitize, modulus, select, compose, concat, latest, transduceAsyncIterator, transduceAsyncHasNextIterator, transduceArray, transduceGenerator, mapping, filtering, take, skip, eventing, sampling, passthrough, split, randomFilter, neighbors, makeArrayIterator, makeAsyncRangeIterator, makeAsyncHasNextRangeIterator };
359//# sourceMappingURL=funprog.mjs.map
360
\No newline at end of file