1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.transformMap = void 0;
|
4 | const js_lib_1 = require("@naturalcycles/js-lib");
|
5 | const through2Concurrent = require("through2-concurrent");
|
6 | const colors_1 = require("../../colors");
|
7 | const stream_util_1 = require("../stream.util");
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 | function transformMap(mapper, opt = {}) {
|
23 | const { concurrency = 16, predicate,
|
24 | errorMode = js_lib_1.ErrorMode.THROW_IMMEDIATELY, flattenArrayOutput, onError, metric = 'stream', logger = console, } = opt;
|
25 | let index = -1;
|
26 | let isSettled = false;
|
27 | let errors = 0;
|
28 | const collectedErrors = [];
|
29 | return through2Concurrent.obj({
|
30 | maxConcurrency: concurrency,
|
31 | async final(cb) {
|
32 |
|
33 | logErrorStats(true);
|
34 | if (collectedErrors.length) {
|
35 |
|
36 | cb(new js_lib_1.AggregatedError(collectedErrors));
|
37 | }
|
38 | else {
|
39 |
|
40 | cb();
|
41 | }
|
42 | },
|
43 | }, async function transformMapFn(chunk, _, cb) {
|
44 |
|
45 | if (isSettled)
|
46 | return cb();
|
47 | const currentIndex = ++index;
|
48 | try {
|
49 | const res = await mapper(chunk, currentIndex);
|
50 | const passedResults = await (0, js_lib_1.pFilter)(flattenArrayOutput && Array.isArray(res) ? res : [res], async (r) => {
|
51 | if (r === js_lib_1.END) {
|
52 | isSettled = true;
|
53 | return false;
|
54 | }
|
55 | return r !== js_lib_1.SKIP && (!predicate || (await predicate(r, currentIndex)));
|
56 | });
|
57 | passedResults.forEach(r => this.push(r));
|
58 | if (isSettled) {
|
59 | logger.log(`transformMap END received at index ${currentIndex}`);
|
60 | (0, stream_util_1.pipelineClose)('transformMap', this, this.sourceReadable, this.streamDone, logger);
|
61 | }
|
62 | cb();
|
63 | }
|
64 | catch (err) {
|
65 | logger.error(err);
|
66 | errors++;
|
67 | logErrorStats();
|
68 | if (onError) {
|
69 | try {
|
70 | onError((0, js_lib_1._anyToError)(err), chunk);
|
71 | }
|
72 | catch { }
|
73 | }
|
74 | if (errorMode === js_lib_1.ErrorMode.THROW_IMMEDIATELY) {
|
75 | isSettled = true;
|
76 | return cb(err);
|
77 | }
|
78 | if (errorMode === js_lib_1.ErrorMode.THROW_AGGREGATED) {
|
79 | collectedErrors.push(err);
|
80 | }
|
81 |
|
82 | cb();
|
83 | }
|
84 | });
|
85 | function logErrorStats(final = false) {
|
86 | if (!errors)
|
87 | return;
|
88 | logger.log(`${metric} ${final ? 'final ' : ''}errors: ${(0, colors_1.yellow)(errors)}`);
|
89 | }
|
90 | }
|
91 | exports.transformMap = transformMap;
|