1 | const {identity} = require('lodash');
|
2 | const pReduce = require('p-reduce');
|
3 | const AggregateError = require('aggregate-error');
|
4 | const {extractErrors} = require('../utils');
|
5 |
|
6 | /**
|
7 | * A Function that execute a list of function sequencially. If at least one Function ins the pipeline throws an Error or rejects, the pipeline function rejects as well.
|
8 | *
|
9 | * @typedef {Function} Pipeline
|
10 | * @param {Any} input Argument to pass to the first step in the pipeline.
|
11 | *
|
12 | * @return {Array<*>|*} An Array with the result of each step in the pipeline; if there is only 1 step in the pipeline, the result of this step is returned directly.
|
13 | *
|
14 | * @throws {AggregateError|Error} An AggregateError with the errors of each step in the pipeline that rejected; if there is only 1 step in the pipeline, the error of this step is thrown directly.
|
15 | */
|
16 |
|
17 | /**
|
18 | * Create a Pipeline with a list of Functions.
|
19 | *
|
20 | * @param {Array<Function>} steps The list of Function to execute.
|
21 | * @param {Object} options Pipeline options.
|
22 | * @param {Boolean} [options.settleAll=false] If `true` all the steps in the pipeline are executed, even if one rejects, if `false` the execution stops after a steps rejects.
|
23 | * @param {Function} [options.getNextInput=identity] Function called after each step is executed, with the last step input and the current current step result; the returned value will be used as the input of the next step.
|
24 | * @param {Function} [options.transform=identity] Function called after each step is executed, with the current step result, the step function and the last step input; the returned value will be saved in the pipeline results.
|
25 | *
|
26 | * @return {Pipeline} A Function that execute the `steps` sequencially
|
27 | */
|
28 | module.exports = (steps, {settleAll = false, getNextInput = identity, transform = identity} = {}) => async (input) => {
|
29 | const results = [];
|
30 | const errors = [];
|
31 | await pReduce(
|
32 | steps,
|
33 | async (lastInput, step) => {
|
34 | let result;
|
35 | try {
|
36 | // Call the step with the input computed at the end of the previous iteration and save intermediary result
|
37 | result = await transform(await step(lastInput), step, lastInput);
|
38 | results.push(result);
|
39 | } catch (error) {
|
40 | if (settleAll) {
|
41 | errors.push(...extractErrors(error));
|
42 | result = error;
|
43 | } else {
|
44 | throw error;
|
45 | }
|
46 | }
|
47 |
|
48 | // Prepare input for the next step, passing the input of the last iteration (or initial parameter for the first iteration) and the result of the current one
|
49 | return getNextInput(lastInput, result);
|
50 | },
|
51 | input
|
52 | );
|
53 | if (errors.length > 0) {
|
54 | throw new AggregateError(errors);
|
55 | }
|
56 |
|
57 | return results;
|
58 | };
|