UNPKG

3.4 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.transformMapSync = exports.TransformMapSync = void 0;
4const js_lib_1 = require("@naturalcycles/js-lib");
5const colors_1 = require("../../colors");
6const pipeline_1 = require("../pipeline/pipeline");
7const stream_util_1 = require("../stream.util");
8class TransformMapSync extends pipeline_1.AbortableTransform {
9}
10exports.TransformMapSync = TransformMapSync;
11/**
12 * Sync (not async) version of transformMap.
13 * Supposedly faster, for cases when async is not needed.
14 */
15function transformMapSync(mapper, opt = {}) {
16 let index = -1;
17 const { predicate, // defaults to "no predicate" (pass everything)
18 errorMode = js_lib_1.ErrorMode.THROW_IMMEDIATELY, flattenArrayOutput = false, onError, metric = 'stream', objectMode = true, logger = console, } = opt;
19 let isSettled = false;
20 let errors = 0;
21 const collectedErrors = []; // only used if errorMode == THROW_AGGREGATED
22 return new TransformMapSync({
23 objectMode,
24 ...opt,
25 transform(chunk, _, cb) {
26 // Stop processing if isSettled
27 if (isSettled)
28 return cb();
29 const currentIndex = ++index;
30 try {
31 // map and pass through
32 const v = mapper(chunk, currentIndex);
33 const passedResults = (flattenArrayOutput && Array.isArray(v) ? v : [v]).filter(r => {
34 if (r === js_lib_1.END) {
35 isSettled = true; // will be checked later
36 return false;
37 }
38 return r !== js_lib_1.SKIP && (!predicate || predicate(r, currentIndex));
39 });
40 passedResults.forEach(r => this.push(r));
41 if (isSettled) {
42 logger.log(`transformMapSync END received at index ${currentIndex}`);
43 (0, stream_util_1.pipelineClose)('transformMapSync', this, this.sourceReadable, this.streamDone, logger);
44 }
45 cb(); // done processing
46 }
47 catch (err) {
48 logger.error(err);
49 errors++;
50 logErrorStats();
51 if (onError) {
52 try {
53 onError(err, chunk);
54 }
55 catch { }
56 }
57 if (errorMode === js_lib_1.ErrorMode.THROW_IMMEDIATELY) {
58 isSettled = true;
59 // Emit error immediately
60 return cb(err);
61 }
62 if (errorMode === js_lib_1.ErrorMode.THROW_AGGREGATED) {
63 collectedErrors.push(err);
64 }
65 cb();
66 }
67 },
68 final(cb) {
69 // console.log('transformMap final')
70 logErrorStats(true);
71 if (collectedErrors.length) {
72 // emit Aggregated error
73 cb(new js_lib_1.AggregatedError(collectedErrors));
74 }
75 else {
76 // emit no error
77 cb();
78 }
79 },
80 });
81 function logErrorStats(final = false) {
82 if (!errors)
83 return;
84 logger.log(`${metric} ${final ? 'final ' : ''}errors: ${(0, colors_1.yellow)(errors)}`);
85 }
86}
87exports.transformMapSync = transformMapSync;