UNPKG

3.77 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.transformMap = exports.notNullishPredicate = void 0;
4const js_lib_1 = require("@naturalcycles/js-lib");
5const through2Concurrent = require("through2-concurrent");
6const colors_1 = require("../../colors");
7function notNullishPredicate(item) {
8 return item !== undefined && item !== null;
9}
10exports.notNullishPredicate = notNullishPredicate;
11/**
12 * Like pMap, but for streams.
13 * Inspired by `through2`.
14 * Main feature is concurrency control (implemented via `through2-concurrent`) and convenient options.
15 * Using this allows native stream .pipe() to work and use backpressure.
16 *
17 * Only works in objectMode (due to through2Concurrent).
18 *
19 * Concurrency defaults to 16.
20 *
21 * If an Array is returned by `mapper` - it will be flattened and multiple results will be emitted from it. Tested by Array.isArray().
22 */
23function transformMap(mapper, opt = {}) {
24 const { concurrency = 16, predicate = notNullishPredicate, errorMode = js_lib_1.ErrorMode.THROW_IMMEDIATELY, flattenArrayOutput, onError, beforeFinal, metric = 'stream', } = opt;
25 let index = -1;
26 let isRejected = false;
27 let errors = 0;
28 const collectedErrors = []; // only used if errorMode == THROW_AGGREGATED
29 return through2Concurrent.obj({
30 maxConcurrency: concurrency,
31 // autoDestroy: true,
32 async final(cb) {
33 // console.log('transformMap final')
34 logErrorStats(true);
35 await beforeFinal?.(); // call beforeFinal if defined
36 if (collectedErrors.length) {
37 // emit Aggregated error
38 cb(new js_lib_1.AggregatedError(collectedErrors));
39 }
40 else {
41 // emit no error
42 cb();
43 }
44 },
45 }, async function transformMapFn(chunk, _encoding, cb) {
46 index++;
47 // console.log({chunk, _encoding})
48 // Stop processing if THROW_IMMEDIATELY mode is used
49 if (isRejected && errorMode === js_lib_1.ErrorMode.THROW_IMMEDIATELY)
50 return cb();
51 try {
52 const currentIndex = index; // because we need to pass it to 2 functions - mapper and predicate. Refers to INPUT index (since it may return multiple outputs)
53 const res = await mapper(chunk, currentIndex);
54 const passedResults = await (0, js_lib_1.pFilter)(flattenArrayOutput && Array.isArray(res) ? res : [res], async (r) => await predicate(r, currentIndex));
55 if (passedResults.length === 0) {
56 cb(); // 0 results
57 }
58 else {
59 passedResults.forEach(r => {
60 this.push(r);
61 // cb(null, r)
62 });
63 cb(); // done processing
64 }
65 }
66 catch (err) {
67 console.error(err);
68 errors++;
69 logErrorStats();
70 if (onError) {
71 try {
72 onError(err, chunk);
73 }
74 catch { }
75 }
76 if (errorMode === js_lib_1.ErrorMode.THROW_IMMEDIATELY) {
77 isRejected = true;
78 // Emit error immediately
79 return cb(err);
80 }
81 if (errorMode === js_lib_1.ErrorMode.THROW_AGGREGATED) {
82 collectedErrors.push(err);
83 }
84 // Tell input stream that we're done processing, but emit nothing to output - not error nor result
85 cb();
86 }
87 });
88 function logErrorStats(final = false) {
89 if (!errors)
90 return;
91 console.log(`${metric} ${final ? 'final ' : ''}errors: ${(0, colors_1.yellow)(errors)}`);
92 }
93}
94exports.transformMap = transformMap;