UNPKG

3.92 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.transformMap = exports.notNullishPredicate = void 0;
4const js_lib_1 = require("@naturalcycles/js-lib");
5const through2Concurrent = require("through2-concurrent");
6const colors_1 = require("../../colors");
7function notNullishPredicate(item) {
8 return item !== undefined && item !== null;
9}
10exports.notNullishPredicate = notNullishPredicate;
11/**
12 * Like pMap, but for streams.
13 * Inspired by `through2`.
14 * Main feature is concurrency control (implemented via `through2-concurrent`) and convenient options.
15 * Using this allows native stream .pipe() to work and use backpressure.
16 *
17 * Only works in objectMode (due to through2Concurrent).
18 *
19 * Concurrency defaults to 16.
20 *
21 * If an Array is returned by `mapper` - it will be flattened and multiple results will be emitted from it. Tested by Array.isArray().
22 */
23function transformMap(mapper, opt = {}) {
24 const { concurrency = 16, predicate = notNullishPredicate, errorMode = js_lib_1.ErrorMode.THROW_IMMEDIATELY, flattenArrayOutput, onError, beforeFinal, afterFinal, metric = 'stream', objectMode = true, } = opt;
25 let index = -1;
26 let isRejected = false;
27 let errors = 0;
28 const collectedErrors = []; // only used if errorMode == THROW_AGGREGATED
29 return (objectMode ? through2Concurrent.obj : through2Concurrent)({
30 maxConcurrency: concurrency,
31 // autoDestroy: true,
32 async final(cb) {
33 // console.log('transformMap final')
34 logErrorStats(true);
35 await beforeFinal?.(); // call beforeFinal if defined
36 if (collectedErrors.length) {
37 // emit Aggregated error
38 cb(new js_lib_1.AggregatedError(collectedErrors));
39 }
40 else {
41 // emit no error
42 cb();
43 }
44 afterFinal?.(); // call afterFinal if defined (optional invokation operator)
45 },
46 }, async function transformMapFn(chunk, _encoding, cb) {
47 index++;
48 // console.log({chunk, _encoding})
49 // Stop processing if THROW_IMMEDIATELY mode is used
50 if (isRejected && errorMode === js_lib_1.ErrorMode.THROW_IMMEDIATELY)
51 return cb();
52 try {
53 const currentIndex = index; // because we need to pass it to 2 functions - mapper and predicate. Refers to INPUT index (since it may return multiple outputs)
54 const res = await mapper(chunk, currentIndex);
55 const passedResults = await (0, js_lib_1.pFilter)(flattenArrayOutput && Array.isArray(res) ? res : [res], async (r) => await predicate(r, currentIndex));
56 if (passedResults.length === 0) {
57 cb(); // 0 results
58 }
59 else {
60 passedResults.forEach(r => {
61 this.push(r);
62 // cb(null, r)
63 });
64 cb(); // done processing
65 }
66 }
67 catch (err) {
68 console.error(err);
69 errors++;
70 logErrorStats();
71 if (onError) {
72 try {
73 onError(err, chunk);
74 }
75 catch { }
76 }
77 if (errorMode === js_lib_1.ErrorMode.THROW_IMMEDIATELY) {
78 isRejected = true;
79 // Emit error immediately
80 return cb(err);
81 }
82 if (errorMode === js_lib_1.ErrorMode.THROW_AGGREGATED) {
83 collectedErrors.push(err);
84 }
85 // Tell input stream that we're done processing, but emit nothing to output - not error nor result
86 cb();
87 }
88 });
89 function logErrorStats(final = false) {
90 if (!errors)
91 return;
92 console.log(`${metric} ${final ? 'final ' : ''}errors: ${(0, colors_1.yellow)(errors)}`);
93 }
94}
95exports.transformMap = transformMap;