UNPKG

10 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, '__esModule', { value: true });
4
5const not = async (x) => {
6 var operand = await x;
7 return !operand
8 };
9
10const identity = x => x;
11
12// predicates
13const isEven = x => x % 2 === 0;
14const isGreaterThan = x => y => y > x ? y : 0;
15const digitize = x => x ? 1 : 0;
16
17const modulus = val => x => x % val;
18
19const select = property => x => x[property];
20
21function apply (x, f) {
22 return f(x)
23}
24
25function compose (...funcs) {
26 return x => funcs.reduceRight(apply, x)
27}
28
29function concat (accumlator, val) {
30 return accumlator.concat(val)
31}
32
33function latest (accumlator, val) {
34 return val
35}
36
37async function transduceAsyncIterator (transform, reducerfunction, init, asynciterator) {
38 var reducer = transform(reducerfunction);
39 var n = null;
40 do {
41 n = await asynciterator.next();
42 if (!n.done) {
43 var v = n.value;
44 init = await reducer(init, v);
45 }
46 } while (!n.done)
47 return init
48}
49
50async function transduceAsyncHasNextIterator (transform, reducerfunction, init, asynchasnextiterator) {
51 var reducer = transform(reducerfunction);
52 var n = null;
53 var r = null;
54 do {
55 n = await asynchasnextiterator.hasNext();
56 if (n) {
57 var v = await asynchasnextiterator.next();
58 r = await reducer(init, v);
59 if (r.hasOwnProperty('reduced')) {
60 if (r.reduced) {
61 init = r.reduced;
62 }
63 } else if (r) {
64 init = r;
65 }
66 }
67 } while (r && n)
68 return init
69}
70
71async function transduceArray (xf, rf, init, xs) {
72 // call reduce on the data structure internally (abstract it away)
73 // pass the rf to the composed transformation
74 // pass in the initial value
75 var xrf = await xf(rf);
76 return xs.reduce(xrf, init)
77}
78
79async function * transduceGenerator (transform, reducerfunction, init, streamgenerator) {
80 var reducer = transform(reducerfunction);
81 for await (const value of streamgenerator) {
82 var newinit = await reducer(init, value);
83 // Here we checked if there is new a 'reduced' value and only generate a new value when this is the case
84 if (!newinit) ; else if (newinit === init) ; else {
85 if (newinit.hasOwnProperty('reduced')) {
86 if (Array.isArray(newinit.reduced)) {
87 for (var i = 0; i < newinit.reduced.length; i++) {
88 init = newinit.reduced[i];
89 yield newinit.reduced[i];
90 }
91 }
92 } else {
93 init = newinit;
94 yield newinit;
95 }
96 }
97 }
98}
99
100/**
101 * Passes all events
102 */
103function passthrough () {
104 return function (rf) {
105 // this takes 2 things and makes them 1
106 return async (acc, val) => {
107 return rf(acc, val) // <-- rf replaces 'concat'
108 }
109 }
110}
111
112/**
113 * Performs a mapping operation on each record in the stream
114 * @param {*} f
115 */function mapping (f) {
116 return function (rf) {
117 // this takes 2 things and makes them 1
118 return async (acc, val) => {
119 var m = await f(val);
120 var r = await rf(acc, m); // <-- rf replaces 'concat'
121 if (r.hasOwnProperty('reduced')) {
122 if (r.reduced) {
123 return r
124 } else {
125 return acc
126 }
127 } else {
128 return r
129 }
130 }
131 }
132 }
133
134 /**
135 * Removes records from the stream if the dont match the predicate
136 * @param {*} p
137 */
138 function filtering (p) {
139 return function (rf) {
140 // this takes 2 things and makes them 1
141 return async (acc, val) => {
142 var pred = (await p(val));
143 return pred ? rf(acc, val) : { reduced: null } // <-- rf replaces 'concat'
144 }
145 }
146 }
147
148 /**
149 * Takes toTake records from the stream
150 * @param {*} toTake
151 */
152function take (toTake) {
153 // assert(toTake && cnt > 0)
154 var count = toTake;
155 return function (rf) {
156 // this takes 2 things and makes them 1
157 return async (acc, val) => {
158 if ((--count) < 0) {
159 return {
160 reduced: null
161 }
162 } else {
163 return rf(acc, val)
164 }
165 }
166 }
167}
168
169/**
170 * Skips forward in a stream by the toSkip records
171 * @param {*} toSkip
172 */
173function skip (toSkip) {
174 // assertMod(toSkip && toSkip >= 0)
175 var count = 0;
176 return function (rf) {
177 // this takes 2 things and makes them 1
178 return async (acc, val) => {
179 if (count++ < toSkip) {
180 return { reduced: null }
181 } else {
182 return rf(acc, val)
183 }
184 }
185 }
186}
187
188/**
189 * Samples from a stream at a particular frequency.
190 * sample(1000) will sample a value once every second
191 * @param {*} period
192 */
193function sampling (period) {
194 // assertMod(cnt && cnt >= 0)
195 var last = 0;
196 return function (rf) {
197 return async (acc, val) => {
198 var nw = Date.now();
199 var diff = nw - last;
200 // console.log(diff)
201 if (diff < period) {
202 // console.log('Skip' + val)
203 return { reduced: null }
204 } else {
205 last = nw;
206 // console.log('Accept' + val)
207 return rf(acc, val)
208 }
209 }
210 }
211}
212
213/**
214 * Transforms a series of measurements into events. The predicate determines when the measurements continue the event or non-event
215 * @param {*} predicate
216 */
217function eventing (predicate) {
218 var latest = null;
219 var sequence = -1;
220 return function (rf) {
221 return async (acc, val) => {
222 sequence++;
223 var pred = (await predicate(val));
224 if (pred && !latest) {
225 latest = {
226 start: val.time ? val.time : sequence,
227 end: val.time ? val.time : sequence
228 };
229 return { reduced: null }
230 } else if (pred && latest) {
231 latest.end = val.time ? val.time : sequence;
232 return { reduced: null }
233 } else if (!pred && latest) {
234 var next = latest;
235 latest = null;
236 return rf(acc, next)
237 } else {
238 return { reduced: null }
239 }
240 }
241 }
242}
243
244/**
245 * Split one event into multiple
246 * splitter is a function which maps a value to an array
247 */
248function split (splitter) {
249 return function (rf) {
250 // this takes 2 things and makes them 1
251 return async (acc, val) => {
252 var rs = await splitter(val);
253 var reduction = { reduced: [] };
254 try {
255 var acc2 = acc;
256 for (var i = 0; i < rs.length; i++) {
257 var r = rs[i];
258 acc2 = await rf(acc2, r);
259 if (acc2.hasOwnProperty('reduced')) {
260 if (acc2.reduced) {
261 reduction.reduced.push(acc2.reduced);
262 }
263 } else {
264 reduction.reduced.push(acc2);
265 }
266 }
267 return reduction
268 } catch (ex) {
269 console.log(ex);
270 }
271 }
272 }
273}
274
275/**
276 * Randomly samples from a stream with a target frequency.
277 * randomFilter(100) will target to sample one value in every 100
278 * @param {*} countFrequency - a number
279 */
280function randomFilter (countFrequency) {
281 // assertMod(cnt && cnt >= 0
282 return function (rf) {
283 return async (acc, val) => {
284 var rand = Math.floor((Math.random() * countFrequency));
285 if (rand === 0) {
286 return rf(acc, val)
287 } else {
288 return { reduced: null }
289 }
290 }
291 }
292}
293
294 /**
295 * Make n neighbors available as each stream record is processed.
296 * @param {*} numberOfNeighbors
297 */
298 function neighbors (numNeighbors = 10) {
299 var neighbors = [];
300
301 return function (rf) {
302 return async (acc, val) => {
303 neighbors.push(val);
304 if ((neighbors.length) < numNeighbors) {
305 return {
306 reduced: null
307 }
308 } else {
309 let first = neighbors[0];
310 let enriched = Object.assign({}, first);
311 enriched.neighbors = neighbors;
312 enriched.data = first;
313 let res = rf(acc, enriched);
314 neighbors.shift();
315 return res
316 }
317 }
318 }
319 }
320
321async function * makeAsyncRangeIterator (start = 0, end = Infinity, step = 1, delay = 10) {
322 let iterationCount = 0;
323 for (let i = start; i < end; i += step) {
324 iterationCount++;
325 yield new Promise((resolve, reject) => {
326 setTimeout(() => {
327 resolve(i);
328 }, delay);
329 });
330 }
331 return iterationCount
332 }
333
334function makeAsyncHasNextRangeIterator (start = 0, end = Infinity, step = 1) {
335 let iterationCount = start;
336 return {
337 hasNext: function () {
338 return iterationCount < end
339 },
340 next: function () {
341 return new Promise((resolve, reject) => {
342 setTimeout(() => {
343 resolve(iterationCount++);
344 }, 10);
345 })
346 }
347 }
348}
349
350function makeArrayIterator (dataArray) {
351 let iterationCount = 0;
352 return {
353 hasNext: function () {
354 return iterationCount < dataArray.length
355 },
356 next: function () {
357 return dataArray[iterationCount++]
358 }
359 }
360 }
361
362exports.not = not;
363exports.identity = identity;
364exports.isEven = isEven;
365exports.isGreaterThan = isGreaterThan;
366exports.digitize = digitize;
367exports.modulus = modulus;
368exports.select = select;
369exports.compose = compose;
370exports.concat = concat;
371exports.latest = latest;
372exports.transduceAsyncIterator = transduceAsyncIterator;
373exports.transduceAsyncHasNextIterator = transduceAsyncHasNextIterator;
374exports.transduceArray = transduceArray;
375exports.transduceGenerator = transduceGenerator;
376exports.mapping = mapping;
377exports.filtering = filtering;
378exports.take = take;
379exports.skip = skip;
380exports.eventing = eventing;
381exports.sampling = sampling;
382exports.passthrough = passthrough;
383exports.split = split;
384exports.randomFilter = randomFilter;
385exports.neighbors = neighbors;
386exports.makeArrayIterator = makeArrayIterator;
387exports.makeAsyncRangeIterator = makeAsyncRangeIterator;
388exports.makeAsyncHasNextRangeIterator = makeAsyncHasNextRangeIterator;
389//# sourceMappingURL=funprog.js.map
390
\No newline at end of file