UNPKG

2.67 kBJavaScriptView Raw
1const {identity} = require('lodash');
2const pReduce = require('p-reduce');
3const AggregateError = require('aggregate-error');
4const {extractErrors} = require('../utils');
5
6/**
7 * A Function that execute a list of function sequencially. If at least one Function ins the pipeline throws an Error or rejects, the pipeline function rejects as well.
8 *
9 * @typedef {Function} Pipeline
10 * @param {Any} input Argument to pass to the first step in the pipeline.
11 *
12 * @return {Array<*>|*} An Array with the result of each step in the pipeline; if there is only 1 step in the pipeline, the result of this step is returned directly.
13 *
14 * @throws {AggregateError|Error} An AggregateError with the errors of each step in the pipeline that rejected; if there is only 1 step in the pipeline, the error of this step is thrown directly.
15 */
16
17/**
18 * Create a Pipeline with a list of Functions.
19 *
20 * @param {Array<Function>} steps The list of Function to execute.
21 * @param {Object} options Pipeline options.
22 * @param {Boolean} [options.settleAll=false] If `true` all the steps in the pipeline are executed, even if one rejects, if `false` the execution stops after a steps rejects.
23 * @param {Function} [options.getNextInput=identity] Function called after each step is executed, with the last step input and the current current step result; the returned value will be used as the input of the next step.
24 * @param {Function} [options.transform=identity] Function called after each step is executed, with the current step result, the step function and the last step input; the returned value will be saved in the pipeline results.
25 *
26 * @return {Pipeline} A Function that execute the `steps` sequencially
27 */
28module.exports = (steps, {settleAll = false, getNextInput = identity, transform = identity} = {}) => async (input) => {
29 const results = [];
30 const errors = [];
31 await pReduce(
32 steps,
33 async (lastInput, step) => {
34 let result;
35 try {
36 // Call the step with the input computed at the end of the previous iteration and save intermediary result
37 result = await transform(await step(lastInput), step, lastInput);
38 results.push(result);
39 } catch (error) {
40 if (settleAll) {
41 errors.push(...extractErrors(error));
42 result = error;
43 } else {
44 throw error;
45 }
46 }
47
48 // Prepare input for the next step, passing the input of the last iteration (or initial parameter for the first iteration) and the result of the current one
49 return getNextInput(lastInput, result);
50 },
51 input
52 );
53 if (errors.length > 0) {
54 throw new AggregateError(errors);
55 }
56
57 return results;
58};