UNPKG

13.8 kBJavaScriptView Raw
1/*
2 * Copyright 2018 Adobe. All rights reserved.
3 * This file is licensed to you under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License. You may obtain a copy
5 * of the License at http://www.apache.org/licenses/LICENSE-2.0
6 *
7 * Unless required by applicable law or agreed to in writing, software distributed under
8 * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9 * OF ANY KIND, either express or implied. See the License for the specific language
10 * governing permissions and limitations under the License.
11 */
12
13/* eslint-disable no-await-in-loop */
14
15const _ = require('lodash/fp');
16const callsites = require('callsites');
17const { enumerate, iter } = require('ferrum');
18const coerce = require('./utils/coerce-secrets');
19const Downloader = require('./utils/Downloader.js');
20
21const noOp = () => {};
22const nopLogger = {
23 debug: noOp,
24 warn: noOp,
25 silly: noOp,
26 log: noOp,
27 info: noOp,
28 verbose: noOp,
29 error: noOp,
30 level: 'error',
31};
32
33/**
34 * Simple wrapper to mark a function as error handler
35 * @private
36 */
37function errorWrapper(fn) {
38 const wrapper = (...args) => fn(...args);
39 wrapper.errorHandler = true;
40 return wrapper;
41}
42
43/**
44 * @typedef {Object} Context
45 */
46
47/**
48 * @typedef {Object} Action
49 */
50
51/**
52 * Pipeline function
53 *
54 * @typedef {function(context, _action)} pipelineFunction
55 * @callback pipelineFunction
56 * @param {Context} context Pipeline execution context that is passed along
57 * @param {Action} action Pipeline action define during construction
58 * @return {Promise<Context>} Promise which resolves to a parameters to be added to the context.
59*/
60
61/**
62 * Tap function
63 *
64 * @typedef {function(context, _action, index)} tapFunction
65 * @callback tapFunction
66 * @param {Context} context Pipeline execution context that is passed along
67 * @param {Action} action Pipeline action define during construction
68 * @param {number} index index of the function invocation order
69*/
70
71/**
72 * Pipeline that allows to execute a list of functions in the order they have been added.
73 * Using `when` and `unless` allows to conditionally execute the previously defined function.
74 * @class
75 */
76class Pipeline {
77 /**
78 * Creates a new pipeline.
79 * @param {Action} action Action properties that are available to all pipeline functions.
80 */
81 constructor(action = {}) {
82 this._action = action;
83 this._action.logger = action.logger || nopLogger;
84
85 this._action.logger.debug('Creating pipeline');
86
87 coerce(this._action);
88
89 // function chain that was defined last. used for `when` and `unless`
90 this._last = null;
91 // object with the custom functions to attach to the pipeline
92 this._attachments = null;
93 // step functions to execute
94 this._steps = [];
95 // functions that are executed before each step
96 this._taps = [];
97
98 /**
99 * Registers an extension to the pipeline.
100 * @param {String} name - name of the extension point (typically the function name).
101 * @param {pipelineFunction} f - a new pipeline step that will be injected relative to `name`.
102 * @param {integer} offset - where to insert the new function (-1: before, 0: replace, 1: after)
103 */
104 const attachGeneric = (name, f, offset) => {
105 // find the index of the function where the resolved ext name
106 // matches the provided name by searching the list of step functions
107 const foundstep = this._steps
108 .findIndex((step) => step && step.ext && step.ext === name);
109
110 // if something has been found in the list, insert the
111 // new function into the list, with the correct offset
112 if (foundstep !== -1) {
113 if (offset === 0) {
114 // replace
115 this._steps.splice(foundstep, 1, f);
116 } else if (offset > 0) {
117 // insert after
118 this._steps.splice(foundstep + 1, 0, f);
119 } else {
120 // insert before (default)
121 this._steps.splice(foundstep, 0, f);
122 }
123 } else {
124 this._action.logger.warn(`Unknown extension point ${name}`);
125 }
126 };
127 /**
128 * Registers an extension to the pipeline. The function `f` will be run in
129 * the pipeline before the function called `name` will be executed. If `name`
130 * does not exist, `f` will never be executed.
131 * @param {String} name - name of the extension point (typically the function name).
132 * @param {pipelineFunction} f - a new pipeline step that will be injected relative to `name`.
133 */
134 this.attach.before = (name, f) => attachGeneric.bind(this)(name, f, -1);
135 /**
136 * Registers an extension to the pipeline. The function `f` will be run in
137 * the pipeline after the function called `name` will be executed. If `name`
138 * does not exist, `f` will never be executed.
139 * @param {String} name - name of the extension point (typically the function name).
140 * @param {pipelineFunction} f - a new pipeline step that will be injected relative to `name`.
141 */
142 this.attach.after = (name, f) => attachGeneric.bind(this)(name, f, 1);
143 /**
144 * Registers an extension to the pipeline. The function `f` will be executed in
145 * the pipeline instead of the function called `name`. If `name` does not exist,
146 * `f` will never be executed.
147 * @param {String} name - name of the extension point (typically the function name).
148 * @param {pipelineFunction} f - a new pipeline step that will replace `name`.
149 */
150 this.attach.replace = (name, f) => attachGeneric.bind(this)(name, f, 0);
151 }
152
153 /**
154 * Adds a processing function to the `step` list of this pipeline.
155 * @param {pipelineFunction} f function to add to the `step` list
156 * @returns {Pipeline} this
157 */
158 use(f) {
159 this.describe(f);
160 this._steps.push(f);
161 this._last = this._steps;
162 // check for extensions
163 if (f && (f.before || f.replace || f.after)) {
164 if (typeof this._attachments === 'function') {
165 throw new Error(`Step '${this._attachments.alias}' already registered extensions for this pipeline, refusing to add more with '${f.alias}'.`);
166 }
167 this._attachments = f;
168 }
169 return this;
170 }
171
172 /**
173 * Adds a tap (observing) function to the pipeline. taps are executed for every
174 * single pipeline step and best used for validation, and logging. taps don't have
175 * any effect, i.e. the return value of a tap function is ignored.
176 * @param {pipelineFunction} f function to be executed in every step. Effects are ignored.
177 */
178 every(f) {
179 this._taps.push(f);
180 this._last = this._taps;
181 return this;
182 }
183
184 /**
185 * Declares the last function that has been added to be a named extension point
186 * @param {string} name - name of the new extension point
187 */
188 expose(name) {
189 this._last.slice(-1).pop().ext = name;
190 return this;
191 }
192
193 /**
194 * Adds a condition to the previously defined `step` function. The previously defined
195 * function will only be executed if the predicate evaluates to something truthy or returns a
196 * Promise that resolves to something truthy.
197 * @param {function(context)} predicate Predicate function.
198 * @callback predicate
199 * @param {Context} context
200 * @returns {Pipeline} this
201 */
202 when(predicate) {
203 if (this._last && this._last.length > 0) {
204 const lastfunc = this._last.pop();
205 const wrappedfunc = (...args) => {
206 const result = predicate(...args);
207 // check if predicate returns a promise like result
208 if (_.isFunction(result.then)) {
209 return result.then((predResult) => {
210 if (predResult) {
211 return lastfunc(...args);
212 }
213 return args[0];
214 });
215 } else if (result) {
216 return lastfunc(...args);
217 }
218 return args[0];
219 };
220 wrappedfunc.alias = lastfunc.alias;
221 wrappedfunc.title = lastfunc.title;
222 wrappedfunc.ext = lastfunc.ext;
223 this._last.push(wrappedfunc);
224 } else {
225 throw new Error('when() needs function to operate on.');
226 }
227 return this;
228 }
229
230 /**
231 * Adds a condition to the previously defined `step` function. The previously defined
232 * function will only be executed if the predicate evaluates to something not-truthy or returns a
233 * Promise that resolves to something not-truthy.
234 * @param {function(context)} predicate Predicate function.
235 * @callback predicate
236 * @returns {Pipeline} this
237 */
238 unless(predicate) {
239 const inverse = (args) => !predicate(args);
240 return this.when(inverse);
241 }
242
243 /**
244 * Attaches custom extensions to the pipeline. The expected argument is an object
245 * with a <code>before</code> object, an <code>after</code> object, and/or
246 * a <code>replace</code> object as its properties.
247 * Each of these objects can have keys that correspond to the named extension points
248 * defined for the pipeline, with the function to execute as values. For example:
249 * <pre>
250 * {
251 * before: {
252 * fetch: (context, action) => {
253 * // do something before the fetch step
254 * return context;
255 * }
256 * }
257 * replace: {
258 * html: (context, action) => {
259 * // do this instead of the default html step
260 * return context;
261 * }
262 * }
263 * after: {
264 * meta: (context, action) => {
265 * // do something after the meta step
266 * return context;
267 * }
268 * }
269 * }
270 * </pre>
271 * @param {Object} att The object containing the attachments
272 */
273 attach(att) {
274 if (att && att.before && typeof att.before === 'object') {
275 Object.keys(att.before).map((key) => this.attach.before(key, att.before[key]));
276 }
277 if (att && att.after && typeof att.after === 'object') {
278 Object.keys(att.after).map((key) => this.attach.after(key, att.after[key]));
279 }
280 if (att && att.replace && typeof att.replace === 'object') {
281 Object.keys(att.replace).map((key) => this.attach.replace(key, att.replace[key]));
282 }
283 }
284
285 /**
286 * Sets an error function. The error function is executed when an error is encountered.
287 * @param {pipelineFunction} f the error function.
288 * @return {Pipeline} this;
289 */
290 error(f) {
291 this.describe(f);
292 const wrapped = errorWrapper(f);
293 // ensure proper alias
294 wrapped.alias = f.alias;
295 wrapped.title = f.title;
296 this._last.push(wrapped);
297 return this;
298 }
299
300 /**
301 * This helper method generates a human readable name for a given function
302 * It will include:
303 * - the name of the function or "anonymous"
304 * - the name of the function that called describe
305 * - the name and code location of the function that called the function before
306 * @param {Function} f
307 */
308 // eslint-disable-next-line class-methods-use-this
309 describe(f) {
310 if (f.alias) {
311 return;
312 }
313
314 f.alias = f.name || f.ext || 'anonymous';
315 f.title = f.alias;
316
317 const [current, injector, caller] = callsites();
318 if (current.getFunctionName() === 'describe' && caller) {
319 f.title = `${injector.getFunctionName()}:${f.alias}`;
320 f.alias = `${f.title} from ${caller.getFileName()}:${caller.getLineNumber()}`;
321 }
322 }
323
324 /**
325 * Runs the pipline processor be executing the `step` functions in order.
326 * @param {Context} context Pipeline context
327 * @returns {Promise<Context>} Promise that resolves to the final result of the accumulated
328 * context.
329 */
330 async run(context = {}) {
331 const { logger } = this._action;
332
333 // register all custom attachers to the pipeline
334 this.attach(this._attachments);
335
336 // setup the download manager
337 if (!this._action.downloader) {
338 this._action.downloader = new Downloader(context, this._action);
339 }
340
341 /**
342 * Executes the taps of the current function.
343 * @param {Function[]} taps the taps
344 * @param {string} fnIdent the name of the function
345 * @param {number} fnIdx the current idx of the function
346 */
347 const execTaps = async (taps, fnIdent, fnIdx) => {
348 for (const [idx, t] of iter(taps)) {
349 const ident = `#${String(fnIdx).padStart(2, '0')}/${fnIdent}/tap-#${idx}`;
350 logger.silly(`exec ${ident}`);
351 try {
352 await t(context, this._action, fnIdx, fnIdent);
353 } catch (e) {
354 logger.error(`Exception during ${ident}:\n${e.stack}`);
355 throw e;
356 }
357 }
358 };
359
360 /**
361 * Executes the pipeline functions
362 * @param {Function[]} fns the functions
363 */
364 const execFns = async (fns) => {
365 for (const [idx, f] of enumerate(fns)) {
366 const ident = `#${String(idx).padStart(2, '0')}/${f.alias}`;
367
368 // skip if error and no error handler (or no error and error handler)
369 if ((!context.error) === (!!f.errorHandler)) {
370 logger.silly(`skip ${ident}`, {
371 function: f.alias,
372 });
373 // eslint-disable-next-line no-continue
374 continue;
375 }
376
377 try {
378 await execTaps(enumerate(this._taps), f.title, idx);
379 } catch (e) {
380 if (!context.error) {
381 context.error = e;
382 }
383 }
384 if (context.error && !f.errorHandler) {
385 // eslint-disable-next-line no-continue
386 continue;
387 }
388 try {
389 logger.silly(`exec ${ident}`, {
390 function: f.alias,
391 });
392 await f(context, this._action);
393 } catch (e) {
394 logger.error(`Exception during ${ident}:\n${e.stack}`);
395 if (!context.error) {
396 context.error = e;
397 }
398 }
399 }
400 };
401
402 try {
403 await execFns(this._steps);
404 } catch (e) {
405 logger.error(`Unexpected error during pipeline execution: \n${e.stack}`);
406 if (!context.error) {
407 context.error = e;
408 }
409 } finally {
410 this._action.downloader.destroy();
411 }
412 return context;
413 }
414}
415
416module.exports = Pipeline;