UNPKG

11.8 kBJavaScriptView Raw
1(function (global, factory) {
2 typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports) :
3 typeof define === 'function' && define.amd ? define(['exports'], factory) :
4 (global = global || self, factory(global.funprog = {}));
5}(this, function (exports) { 'use strict';
6
7 const not = async (x) => {
8 var operand = await x;
9 return !operand
10 };
11
12 const identity = x => x;
13
14 // predicates
15 const isEven = x => x % 2 === 0;
16 const isGreaterThan = x => y => y > x ? y : 0;
17 const digitize = x => x ? 1 : 0;
18
19 const modulus = val => x => x % val;
20
21 const select = property => x => x[property];
22
23 function apply (x, f) {
24 return f(x)
25 }
26
27 function compose (...funcs) {
28 return x => funcs.reduceRight(apply, x)
29 }
30
31 function concat (accumlator, val) {
32 return accumlator.concat(val)
33 }
34
35 function latest (accumlator, val) {
36 return val
37 }
38
39 async function transduceAsyncIterator (transform, reducerfunction, init, asynciterator) {
40 var reducer = transform(reducerfunction);
41 var n = null;
42 do {
43 n = await asynciterator.next();
44 if (!n.done) {
45 var v = n.value;
46 init = await reducer(init, v);
47 }
48 } while (!n.done)
49 return init
50 }
51
52 async function transduceAsyncHasNextIterator (transform, reducerfunction, init, asynchasnextiterator) {
53 var reducer = transform(reducerfunction);
54 var n = null;
55 var r = null;
56 do {
57 n = await asynchasnextiterator.hasNext();
58 if (n) {
59 var v = await asynchasnextiterator.next();
60 r = await reducer(init, v);
61 if (r.hasOwnProperty('reduced')) {
62 if (r.reduced) {
63 init = r.reduced;
64 }
65 } else if (r) {
66 init = r;
67 }
68 }
69 } while (r && n)
70 return init
71 }
72
73 async function transduceArray (xf, rf, init, xs) {
74 // call reduce on the data structure internally (abstract it away)
75 // pass the rf to the composed transformation
76 // pass in the initial value
77 var xrf = await xf(rf);
78 return xs.reduce(xrf, init)
79 }
80
81 async function * transduceGenerator (transform, reducerfunction, init, streamgenerator) {
82 var reducer = transform(reducerfunction);
83 for await (const value of streamgenerator) {
84 var newinit = await reducer(init, value);
85 // Here we checked if there is new a 'reduced' value and only generate a new value when this is the case
86 if (!newinit) ; else if (newinit === init) ; else {
87 if (newinit.hasOwnProperty('reduced')) {
88 if (Array.isArray(newinit.reduced)) {
89 for (var i = 0; i < newinit.reduced.length; i++) {
90 init = newinit.reduced[i];
91 yield newinit.reduced[i];
92 }
93 }
94 } else {
95 init = newinit;
96 yield newinit;
97 }
98 }
99 }
100 }
101
102 /**
103 * Passes all events
104 */
105 function passthrough () {
106 return function (rf) {
107 // this takes 2 things and makes them 1
108 return async (acc, val) => {
109 return rf(acc, val) // <-- rf replaces 'concat'
110 }
111 }
112 }
113
114 /**
115 * Performs a mapping operation on each record in the stream
116 * @param {*} f
117 */function mapping (f) {
118 return function (rf) {
119 // this takes 2 things and makes them 1
120 return async (acc, val) => {
121 var m = await f(val);
122 var r = await rf(acc, m); // <-- rf replaces 'concat'
123 if (r.hasOwnProperty('reduced')) {
124 if (r.reduced) {
125 return r
126 } else {
127 return acc
128 }
129 } else {
130 return r
131 }
132 }
133 }
134 }
135
136 /**
137 * Removes records from the stream if the dont match the predicate
138 * @param {*} p
139 */
140 function filtering (p) {
141 return function (rf) {
142 // this takes 2 things and makes them 1
143 return async (acc, val) => {
144 var pred = (await p(val));
145 return pred ? rf(acc, val) : { reduced: null } // <-- rf replaces 'concat'
146 }
147 }
148 }
149
150 /**
151 * Takes toTake records from the stream
152 * @param {*} toTake
153 */
154 function take (toTake) {
155 // assert(toTake && cnt > 0)
156 var count = toTake;
157 return function (rf) {
158 // this takes 2 things and makes them 1
159 return async (acc, val) => {
160 if ((--count) < 0) {
161 return {
162 reduced: null
163 }
164 } else {
165 return rf(acc, val)
166 }
167 }
168 }
169 }
170
171 /**
172 * Skips forward in a stream by the toSkip records
173 * @param {*} toSkip
174 */
175 function skip (toSkip) {
176 // assertMod(toSkip && toSkip >= 0)
177 var count = 0;
178 return function (rf) {
179 // this takes 2 things and makes them 1
180 return async (acc, val) => {
181 if (count++ < toSkip) {
182 return { reduced: null }
183 } else {
184 return rf(acc, val)
185 }
186 }
187 }
188 }
189
190 /**
191 * Samples from a stream at a particular frequency.
192 * sample(1000) will sample a value once every second
193 * @param {*} period
194 */
195 function sampling (period) {
196 // assertMod(cnt && cnt >= 0)
197 var last = 0;
198 return function (rf) {
199 return async (acc, val) => {
200 var nw = Date.now();
201 var diff = nw - last;
202 // console.log(diff)
203 if (diff < period) {
204 // console.log('Skip' + val)
205 return { reduced: null }
206 } else {
207 last = nw;
208 // console.log('Accept' + val)
209 return rf(acc, val)
210 }
211 }
212 }
213 }
214
215 /**
216 * Transforms a series of measurements into events. The predicate determines when the measurements continue the event or non-event
217 * @param {*} predicate
218 */
219 function eventing (predicate) {
220 var latest = null;
221 var sequence = -1;
222 return function (rf) {
223 return async (acc, val) => {
224 sequence++;
225 var pred = (await predicate(val));
226 if (pred && !latest) {
227 latest = {
228 start: val.time ? val.time : sequence,
229 end: val.time ? val.time : sequence
230 };
231 return { reduced: null }
232 } else if (pred && latest) {
233 latest.end = val.time ? val.time : sequence;
234 return { reduced: null }
235 } else if (!pred && latest) {
236 var next = latest;
237 latest = null;
238 return rf(acc, next)
239 } else {
240 return { reduced: null }
241 }
242 }
243 }
244 }
245
246 /**
247 * Split one event into multiple
248 * splitter is a function which maps a value to an array
249 */
250 function split (splitter) {
251 return function (rf) {
252 // this takes 2 things and makes them 1
253 return async (acc, val) => {
254 var rs = await splitter(val);
255 var reduction = { reduced: [] };
256 try {
257 var acc2 = acc;
258 for (var i = 0; i < rs.length; i++) {
259 var r = rs[i];
260 acc2 = await rf(acc2, r);
261 if (acc2.hasOwnProperty('reduced')) {
262 if (acc2.reduced) {
263 reduction.reduced.push(acc2.reduced);
264 }
265 } else {
266 reduction.reduced.push(acc2);
267 }
268 }
269 return reduction
270 } catch (ex) {
271 console.log(ex);
272 }
273 }
274 }
275 }
276
277 /**
278 * Randomly samples from a stream with a target frequency.
279 * randomFilter(100) will target to sample one value in every 100
280 * @param {*} countFrequency - a number
281 */
282 function randomFilter (countFrequency) {
283 // assertMod(cnt && cnt >= 0
284 return function (rf) {
285 return async (acc, val) => {
286 var rand = Math.floor((Math.random() * countFrequency));
287 if (rand === 0) {
288 return rf(acc, val)
289 } else {
290 return { reduced: null }
291 }
292 }
293 }
294 }
295
296 /**
297 * Make n neighbors available as each stream record is processed.
298 * @param {*} numberOfNeighbors
299 */
300 function neighbors (numNeighbors = 10) {
301 var neighbors = [];
302
303 return function (rf) {
304 return async (acc, val) => {
305 neighbors.push(val);
306 if ((neighbors.length) < numNeighbors) {
307 return {
308 reduced: null
309 }
310 } else {
311 let first = neighbors[0];
312 let enriched = Object.assign({}, first);
313 enriched.neighbors = neighbors;
314 enriched.data = first;
315 let res = rf(acc, enriched);
316 neighbors.shift();
317 return res
318 }
319 }
320 }
321 }
322
323 async function * makeAsyncRangeIterator (start = 0, end = Infinity, step = 1, delay = 10) {
324 let iterationCount = 0;
325 for (let i = start; i < end; i += step) {
326 iterationCount++;
327 yield new Promise((resolve, reject) => {
328 setTimeout(() => {
329 resolve(i);
330 }, delay);
331 });
332 }
333 return iterationCount
334 }
335
336 function makeAsyncHasNextRangeIterator (start = 0, end = Infinity, step = 1) {
337 let iterationCount = start;
338 return {
339 hasNext: function () {
340 return iterationCount < end
341 },
342 next: function () {
343 return new Promise((resolve, reject) => {
344 setTimeout(() => {
345 resolve(iterationCount++);
346 }, 10);
347 })
348 }
349 }
350 }
351
352 function makeArrayIterator (dataArray) {
353 let iterationCount = 0;
354 return {
355 hasNext: function () {
356 return iterationCount < dataArray.length
357 },
358 next: function () {
359 return dataArray[iterationCount++]
360 }
361 }
362 }
363
364 exports.not = not;
365 exports.identity = identity;
366 exports.isEven = isEven;
367 exports.isGreaterThan = isGreaterThan;
368 exports.digitize = digitize;
369 exports.modulus = modulus;
370 exports.select = select;
371 exports.compose = compose;
372 exports.concat = concat;
373 exports.latest = latest;
374 exports.transduceAsyncIterator = transduceAsyncIterator;
375 exports.transduceAsyncHasNextIterator = transduceAsyncHasNextIterator;
376 exports.transduceArray = transduceArray;
377 exports.transduceGenerator = transduceGenerator;
378 exports.mapping = mapping;
379 exports.filtering = filtering;
380 exports.take = take;
381 exports.skip = skip;
382 exports.eventing = eventing;
383 exports.sampling = sampling;
384 exports.passthrough = passthrough;
385 exports.split = split;
386 exports.randomFilter = randomFilter;
387 exports.neighbors = neighbors;
388 exports.makeArrayIterator = makeArrayIterator;
389 exports.makeAsyncRangeIterator = makeAsyncRangeIterator;
390 exports.makeAsyncHasNextRangeIterator = makeAsyncHasNextRangeIterator;
391
392 Object.defineProperty(exports, '__esModule', { value: true });
393
394}));
395//# sourceMappingURL=funprog.umd.js.map
396
\No newline at end of file