1 | /*
|
2 | * Copyright 2018 Adobe. All rights reserved.
|
3 | * This file is licensed to you under the Apache License, Version 2.0 (the "License");
|
4 | * you may not use this file except in compliance with the License. You may obtain a copy
|
5 | * of the License at http://www.apache.org/licenses/LICENSE-2.0
|
6 | *
|
7 | * Unless required by applicable law or agreed to in writing, software distributed under
|
8 | * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
|
9 | * OF ANY KIND, either express or implied. See the License for the specific language
|
10 | * governing permissions and limitations under the License.
|
11 | */
|
12 |
|
13 | /* eslint-disable no-await-in-loop */
|
14 |
|
15 | const _ = require('lodash/fp');
|
16 | const callsites = require('callsites');
|
17 | const { enumerate, iter } = require('ferrum');
|
18 | const coerce = require('./utils/coerce-secrets');
|
19 | const Downloader = require('./utils/Downloader.js');
|
20 |
|
21 | const noOp = () => {};
|
22 | const nopLogger = {
|
23 | debug: noOp,
|
24 | warn: noOp,
|
25 | silly: noOp,
|
26 | log: noOp,
|
27 | info: noOp,
|
28 | verbose: noOp,
|
29 | error: noOp,
|
30 | level: 'error',
|
31 | };
|
32 |
|
33 | /**
|
34 | * Simple wrapper to mark a function as error handler
|
35 | * @private
|
36 | */
|
37 | function errorWrapper(fn) {
|
38 | const wrapper = (...args) => fn(...args);
|
39 | wrapper.errorHandler = true;
|
40 | return wrapper;
|
41 | }
|
42 |
|
43 | /**
|
44 | * @typedef {Object} Context
|
45 | */
|
46 |
|
47 | /**
|
48 | * @typedef {Object} Action
|
49 | */
|
50 |
|
51 | /**
|
52 | * Pipeline function
|
53 | *
|
54 | * @typedef {function(context, _action)} pipelineFunction
|
55 | * @callback pipelineFunction
|
56 | * @param {Context} context Pipeline execution context that is passed along
|
57 | * @param {Action} action Pipeline action define during construction
|
58 | * @return {Promise<Context>} Promise which resolves to a parameters to be added to the context.
|
59 | */
|
60 |
|
61 | /**
|
62 | * Tap function
|
63 | *
|
64 | * @typedef {function(context, _action, index)} tapFunction
|
65 | * @callback tapFunction
|
66 | * @param {Context} context Pipeline execution context that is passed along
|
67 | * @param {Action} action Pipeline action define during construction
|
68 | * @param {number} index index of the function invocation order
|
69 | */
|
70 |
|
71 | /**
|
72 | * Pipeline that allows to execute a list of functions in the order they have been added.
|
73 | * Using `when` and `unless` allows to conditionally execute the previously defined function.
|
74 | * @class
|
75 | */
|
76 | class Pipeline {
|
77 | /**
|
78 | * Creates a new pipeline.
|
79 | * @param {Action} action Action properties that are available to all pipeline functions.
|
80 | */
|
81 | constructor(action = {}) {
|
82 | this._action = action;
|
83 | this._action.logger = action.logger || nopLogger;
|
84 |
|
85 | this._action.logger.debug('Creating pipeline');
|
86 |
|
87 | coerce(this._action);
|
88 |
|
89 | // function chain that was defined last. used for `when` and `unless`
|
90 | this._last = null;
|
91 | // object with the custom functions to attach to the pipeline
|
92 | this._attachments = null;
|
93 | // step functions to execute
|
94 | this._steps = [];
|
95 | // functions that are executed before each step
|
96 | this._taps = [];
|
97 |
|
98 | /**
|
99 | * Registers an extension to the pipeline.
|
100 | * @param {String} name - name of the extension point (typically the function name).
|
101 | * @param {pipelineFunction} f - a new pipeline step that will be injected relative to `name`.
|
102 | * @param {integer} offset - where to insert the new function (-1: before, 0: replace, 1: after)
|
103 | */
|
104 | const attachGeneric = (name, f, offset) => {
|
105 | // find the index of the function where the resolved ext name
|
106 | // matches the provided name by searching the list of step functions
|
107 | const foundstep = this._steps
|
108 | .findIndex((step) => step && step.ext && step.ext === name);
|
109 |
|
110 | // if something has been found in the list, insert the
|
111 | // new function into the list, with the correct offset
|
112 | if (foundstep !== -1) {
|
113 | if (offset === 0) {
|
114 | // replace
|
115 | this._steps.splice(foundstep, 1, f);
|
116 | } else if (offset > 0) {
|
117 | // insert after
|
118 | this._steps.splice(foundstep + 1, 0, f);
|
119 | } else {
|
120 | // insert before (default)
|
121 | this._steps.splice(foundstep, 0, f);
|
122 | }
|
123 | } else {
|
124 | this._action.logger.warn(`Unknown extension point ${name}`);
|
125 | }
|
126 | };
|
127 | /**
|
128 | * Registers an extension to the pipeline. The function `f` will be run in
|
129 | * the pipeline before the function called `name` will be executed. If `name`
|
130 | * does not exist, `f` will never be executed.
|
131 | * @param {String} name - name of the extension point (typically the function name).
|
132 | * @param {pipelineFunction} f - a new pipeline step that will be injected relative to `name`.
|
133 | */
|
134 | this.attach.before = (name, f) => attachGeneric.bind(this)(name, f, -1);
|
135 | /**
|
136 | * Registers an extension to the pipeline. The function `f` will be run in
|
137 | * the pipeline after the function called `name` will be executed. If `name`
|
138 | * does not exist, `f` will never be executed.
|
139 | * @param {String} name - name of the extension point (typically the function name).
|
140 | * @param {pipelineFunction} f - a new pipeline step that will be injected relative to `name`.
|
141 | */
|
142 | this.attach.after = (name, f) => attachGeneric.bind(this)(name, f, 1);
|
143 | /**
|
144 | * Registers an extension to the pipeline. The function `f` will be executed in
|
145 | * the pipeline instead of the function called `name`. If `name` does not exist,
|
146 | * `f` will never be executed.
|
147 | * @param {String} name - name of the extension point (typically the function name).
|
148 | * @param {pipelineFunction} f - a new pipeline step that will replace `name`.
|
149 | */
|
150 | this.attach.replace = (name, f) => attachGeneric.bind(this)(name, f, 0);
|
151 | }
|
152 |
|
153 | /**
|
154 | * Adds a processing function to the `step` list of this pipeline.
|
155 | * @param {pipelineFunction} f function to add to the `step` list
|
156 | * @returns {Pipeline} this
|
157 | */
|
158 | use(f) {
|
159 | this.describe(f);
|
160 | this._steps.push(f);
|
161 | this._last = this._steps;
|
162 | // check for extensions
|
163 | if (f && (f.before || f.replace || f.after)) {
|
164 | if (typeof this._attachments === 'function') {
|
165 | throw new Error(`Step '${this._attachments.alias}' already registered extensions for this pipeline, refusing to add more with '${f.alias}'.`);
|
166 | }
|
167 | this._attachments = f;
|
168 | }
|
169 | return this;
|
170 | }
|
171 |
|
172 | /**
|
173 | * Adds a tap (observing) function to the pipeline. taps are executed for every
|
174 | * single pipeline step and best used for validation, and logging. taps don't have
|
175 | * any effect, i.e. the return value of a tap function is ignored.
|
176 | * @param {pipelineFunction} f function to be executed in every step. Effects are ignored.
|
177 | */
|
178 | every(f) {
|
179 | this._taps.push(f);
|
180 | this._last = this._taps;
|
181 | return this;
|
182 | }
|
183 |
|
184 | /**
|
185 | * Declares the last function that has been added to be a named extension point
|
186 | * @param {string} name - name of the new extension point
|
187 | */
|
188 | expose(name) {
|
189 | this._last.slice(-1).pop().ext = name;
|
190 | return this;
|
191 | }
|
192 |
|
193 | /**
|
194 | * Adds a condition to the previously defined `step` function. The previously defined
|
195 | * function will only be executed if the predicate evaluates to something truthy or returns a
|
196 | * Promise that resolves to something truthy.
|
197 | * @param {function(context)} predicate Predicate function.
|
198 | * @callback predicate
|
199 | * @param {Context} context
|
200 | * @returns {Pipeline} this
|
201 | */
|
202 | when(predicate) {
|
203 | if (this._last && this._last.length > 0) {
|
204 | const lastfunc = this._last.pop();
|
205 | const wrappedfunc = (...args) => {
|
206 | const result = predicate(...args);
|
207 | // check if predicate returns a promise like result
|
208 | if (_.isFunction(result.then)) {
|
209 | return result.then((predResult) => {
|
210 | if (predResult) {
|
211 | return lastfunc(...args);
|
212 | }
|
213 | return args[0];
|
214 | });
|
215 | } else if (result) {
|
216 | return lastfunc(...args);
|
217 | }
|
218 | return args[0];
|
219 | };
|
220 | wrappedfunc.alias = lastfunc.alias;
|
221 | wrappedfunc.title = lastfunc.title;
|
222 | wrappedfunc.ext = lastfunc.ext;
|
223 | this._last.push(wrappedfunc);
|
224 | } else {
|
225 | throw new Error('when() needs function to operate on.');
|
226 | }
|
227 | return this;
|
228 | }
|
229 |
|
230 | /**
|
231 | * Adds a condition to the previously defined `step` function. The previously defined
|
232 | * function will only be executed if the predicate evaluates to something not-truthy or returns a
|
233 | * Promise that resolves to something not-truthy.
|
234 | * @param {function(context)} predicate Predicate function.
|
235 | * @callback predicate
|
236 | * @returns {Pipeline} this
|
237 | */
|
238 | unless(predicate) {
|
239 | const inverse = (args) => !predicate(args);
|
240 | return this.when(inverse);
|
241 | }
|
242 |
|
243 | /**
|
244 | * Attaches custom extensions to the pipeline. The expected argument is an object
|
245 | * with a <code>before</code> object, an <code>after</code> object, and/or
|
246 | * a <code>replace</code> object as its properties.
|
247 | * Each of these objects can have keys that correspond to the named extension points
|
248 | * defined for the pipeline, with the function to execute as values. For example:
|
249 | * <pre>
|
250 | * {
|
251 | * before: {
|
252 | * fetch: (context, action) => {
|
253 | * // do something before the fetch step
|
254 | * return context;
|
255 | * }
|
256 | * }
|
257 | * replace: {
|
258 | * html: (context, action) => {
|
259 | * // do this instead of the default html step
|
260 | * return context;
|
261 | * }
|
262 | * }
|
263 | * after: {
|
264 | * meta: (context, action) => {
|
265 | * // do something after the meta step
|
266 | * return context;
|
267 | * }
|
268 | * }
|
269 | * }
|
270 | * </pre>
|
271 | * @param {Object} att The object containing the attachments
|
272 | */
|
273 | attach(att) {
|
274 | if (att && att.before && typeof att.before === 'object') {
|
275 | Object.keys(att.before).map((key) => this.attach.before(key, att.before[key]));
|
276 | }
|
277 | if (att && att.after && typeof att.after === 'object') {
|
278 | Object.keys(att.after).map((key) => this.attach.after(key, att.after[key]));
|
279 | }
|
280 | if (att && att.replace && typeof att.replace === 'object') {
|
281 | Object.keys(att.replace).map((key) => this.attach.replace(key, att.replace[key]));
|
282 | }
|
283 | }
|
284 |
|
285 | /**
|
286 | * Sets an error function. The error function is executed when an error is encountered.
|
287 | * @param {pipelineFunction} f the error function.
|
288 | * @return {Pipeline} this;
|
289 | */
|
290 | error(f) {
|
291 | this.describe(f);
|
292 | const wrapped = errorWrapper(f);
|
293 | // ensure proper alias
|
294 | wrapped.alias = f.alias;
|
295 | wrapped.title = f.title;
|
296 | this._last.push(wrapped);
|
297 | return this;
|
298 | }
|
299 |
|
300 | /**
|
301 | * This helper method generates a human readable name for a given function
|
302 | * It will include:
|
303 | * - the name of the function or "anonymous"
|
304 | * - the name of the function that called describe
|
305 | * - the name and code location of the function that called the function before
|
306 | * @param {Function} f
|
307 | */
|
308 | // eslint-disable-next-line class-methods-use-this
|
309 | describe(f) {
|
310 | if (f.alias) {
|
311 | return;
|
312 | }
|
313 |
|
314 | f.alias = f.name || f.ext || 'anonymous';
|
315 | f.title = f.alias;
|
316 |
|
317 | const [current, injector, caller] = callsites();
|
318 | if (current.getFunctionName() === 'describe' && caller) {
|
319 | f.title = `${injector.getFunctionName()}:${f.alias}`;
|
320 | f.alias = `${f.title} from ${caller.getFileName()}:${caller.getLineNumber()}`;
|
321 | }
|
322 | }
|
323 |
|
324 | /**
|
325 | * Runs the pipline processor be executing the `step` functions in order.
|
326 | * @param {Context} context Pipeline context
|
327 | * @returns {Promise<Context>} Promise that resolves to the final result of the accumulated
|
328 | * context.
|
329 | */
|
330 | async run(context = {}) {
|
331 | const { logger } = this._action;
|
332 |
|
333 | // register all custom attachers to the pipeline
|
334 | this.attach(this._attachments);
|
335 |
|
336 | // setup the download manager
|
337 | if (!this._action.downloader) {
|
338 | this._action.downloader = new Downloader(context, this._action);
|
339 | }
|
340 |
|
341 | /**
|
342 | * Executes the taps of the current function.
|
343 | * @param {Function[]} taps the taps
|
344 | * @param {string} fnIdent the name of the function
|
345 | * @param {number} fnIdx the current idx of the function
|
346 | */
|
347 | const execTaps = async (taps, fnIdent, fnIdx) => {
|
348 | for (const [idx, t] of iter(taps)) {
|
349 | const ident = `#${String(fnIdx).padStart(2, '0')}/${fnIdent}/tap-#${idx}`;
|
350 | logger.silly(`exec ${ident}`);
|
351 | try {
|
352 | await t(context, this._action, fnIdx, fnIdent);
|
353 | } catch (e) {
|
354 | logger.error(`Exception during ${ident}:\n${e.stack}`);
|
355 | throw e;
|
356 | }
|
357 | }
|
358 | };
|
359 |
|
360 | /**
|
361 | * Executes the pipeline functions
|
362 | * @param {Function[]} fns the functions
|
363 | */
|
364 | const execFns = async (fns) => {
|
365 | for (const [idx, f] of enumerate(fns)) {
|
366 | const ident = `#${String(idx).padStart(2, '0')}/${f.alias}`;
|
367 |
|
368 | // skip if error and no error handler (or no error and error handler)
|
369 | if ((!context.error) === (!!f.errorHandler)) {
|
370 | logger.silly(`skip ${ident}`, {
|
371 | function: f.alias,
|
372 | });
|
373 | // eslint-disable-next-line no-continue
|
374 | continue;
|
375 | }
|
376 |
|
377 | try {
|
378 | await execTaps(enumerate(this._taps), f.title, idx);
|
379 | } catch (e) {
|
380 | if (!context.error) {
|
381 | context.error = e;
|
382 | }
|
383 | }
|
384 | if (context.error && !f.errorHandler) {
|
385 | // eslint-disable-next-line no-continue
|
386 | continue;
|
387 | }
|
388 | try {
|
389 | logger.silly(`exec ${ident}`, {
|
390 | function: f.alias,
|
391 | });
|
392 | await f(context, this._action);
|
393 | } catch (e) {
|
394 | logger.error(`Exception during ${ident}:\n${e.stack}`);
|
395 | if (!context.error) {
|
396 | context.error = e;
|
397 | }
|
398 | }
|
399 | }
|
400 | };
|
401 |
|
402 | try {
|
403 | await execFns(this._steps);
|
404 | } catch (e) {
|
405 | logger.error(`Unexpected error during pipeline execution: \n${e.stack}`);
|
406 | if (!context.error) {
|
407 | context.error = e;
|
408 | }
|
409 | } finally {
|
410 | this._action.downloader.destroy();
|
411 | }
|
412 | return context;
|
413 | }
|
414 | }
|
415 |
|
416 | module.exports = Pipeline;
|