UNPKG

3.79 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.transformMap = void 0;
4const js_lib_1 = require("@naturalcycles/js-lib");
5const through2Concurrent = require("through2-concurrent");
6const colors_1 = require("../../colors");
7const stream_util_1 = require("../stream.util");
8// doesn't work, cause here we don't construct our Transform instance ourselves
9// export class TransformMap extends AbortableTransform {}
10/**
11 * Like pMap, but for streams.
12 * Inspired by `through2`.
13 * Main feature is concurrency control (implemented via `through2-concurrent`) and convenient options.
14 * Using this allows native stream .pipe() to work and use backpressure.
15 *
16 * Only works in objectMode (due to through2Concurrent).
17 *
18 * Concurrency defaults to 16.
19 *
20 * If an Array is returned by `mapper` - it will be flattened and multiple results will be emitted from it. Tested by Array.isArray().
21 */
22function transformMap(mapper, opt = {}) {
23 const { concurrency = 16, predicate, // we now default to "no predicate" (meaning pass-everything)
24 errorMode = js_lib_1.ErrorMode.THROW_IMMEDIATELY, flattenArrayOutput, onError, metric = 'stream', logger = console, } = opt;
25 let index = -1;
26 let isSettled = false;
27 let errors = 0;
28 const collectedErrors = []; // only used if errorMode == THROW_AGGREGATED
29 return through2Concurrent.obj({
30 maxConcurrency: concurrency,
31 async final(cb) {
32 // console.log('transformMap final')
33 logErrorStats(true);
34 if (collectedErrors.length) {
35 // emit Aggregated error
36 cb(new js_lib_1.AggregatedError(collectedErrors));
37 }
38 else {
39 // emit no error
40 cb();
41 }
42 },
43 }, async function transformMapFn(chunk, _, cb) {
44 // Stop processing if isSettled (either THROW_IMMEDIATELY was fired or END received)
45 if (isSettled)
46 return cb();
47 const currentIndex = ++index;
48 try {
49 const res = await mapper(chunk, currentIndex);
50 const passedResults = await (0, js_lib_1.pFilter)(flattenArrayOutput && Array.isArray(res) ? res : [res], async (r) => {
51 if (r === js_lib_1.END) {
52 isSettled = true; // will be checked later
53 return false;
54 }
55 return r !== js_lib_1.SKIP && (!predicate || (await predicate(r, currentIndex)));
56 });
57 passedResults.forEach(r => this.push(r));
58 if (isSettled) {
59 logger.log(`transformMap END received at index ${currentIndex}`);
60 (0, stream_util_1.pipelineClose)('transformMap', this, this.sourceReadable, this.streamDone, logger);
61 }
62 cb(); // done processing
63 }
64 catch (err) {
65 logger.error(err);
66 errors++;
67 logErrorStats();
68 if (onError) {
69 try {
70 onError((0, js_lib_1._anyToError)(err), chunk);
71 }
72 catch { }
73 }
74 if (errorMode === js_lib_1.ErrorMode.THROW_IMMEDIATELY) {
75 isSettled = true;
76 return cb(err); // Emit error immediately
77 }
78 if (errorMode === js_lib_1.ErrorMode.THROW_AGGREGATED) {
79 collectedErrors.push(err);
80 }
81 // Tell input stream that we're done processing, but emit nothing to output - not error nor result
82 cb();
83 }
84 });
85 function logErrorStats(final = false) {
86 if (!errors)
87 return;
88 logger.log(`${metric} ${final ? 'final ' : ''}errors: ${(0, colors_1.yellow)(errors)}`);
89 }
90}
91exports.transformMap = transformMap;