Source: decider/fragment.js

/*!
 * Usher
 * Copyright(c) 2014 meltmedia <mike@meltmedia.com>
 */

'use strict';

var util = require('util'),
    events = require('events'),
    _ = require('lodash'),
    sequencify = require('@meltmedia/sequencify'),
    Activity = require('./tasks/activity'),
    ChildWorkflow = require('./tasks/child'),
    Loop = require('./tasks/loop'),
    Accumulator = require('./tasks/accumulator'),
    WhileLoop = require('./tasks/while-loop'),
    Decision = require('./tasks/decision'),
    Transform = require('./tasks/transform'),
    Result = require('./tasks/result'),
    Variable = require('./tasks/variable'),
    Terminate = require('./tasks/terminate');


module.exports = Fragment;

/**
 * Represents a single, named workflow, where all activities and decisions are defined.
 * @constructor
 * @param {string} name - The name of the workflow.
 * @param {string} domain - The AWS SWF domain name to execute this workflow in.
 * @param {string} [tasklist=<name>-tasklist] - The name of the tasklist to listen for tasks on.
 */
function Fragment(name) {
  if (!(this instanceof Fragment)) {
    return new Fragment(name);
  }

  events.EventEmitter.call(this);

  if (!_.isString(name)) {
    throw new Error('A `name` is required');
  }

  this.name = name;
  this.activityOptions = {};

  /** @private */
  this._tasks = [];
  this._sequencedTasks = [];
}


// Make Fragment an EventEmitter
util.inherits(Fragment, events.EventEmitter);


/**
 * Get the defined tasks for the fragment
 * @returns {Array} An array of defined tasks
 */
Fragment.prototype.tasks = function tasks() {
  return this._tasks;
};


/**
 * Get the defined tasks for the fragment in execution order
 * @returns {Array} An array of defined tasks
 */
Fragment.prototype.sequencedTasks = function sequencedTasks() {
  if (this._sequencedTasks && this._sequencedTasks.length > 0) {
    return this._sequencedTasks;
  }

  // Map tasks into sequencify's input format
  var names = [],
      items = {};

  _.each(this.tasks(), function (item) {
    names.push(item.name);
    items[item.name] = {
      name: item.name,
      dep: item.deps,
      task: item
    };
  });

  var results = sequencify(items, names);

  // Required dependencies not defined
  if (!_.isEmpty(results.missingTasks)) {
    throw new Error('The workflow is missing tasks: ' + JSON.stringify(results.missingTasks));
  }

  // Recursive dependencies found
  if (!_.isEmpty(results.recursiveDependencies)) {
    throw new Error('The workflow has recursive dependencies: ' + JSON.stringify(results.recursiveDependencies));
  }

  // Cache sequenced tasks
  this._sequencedTasks = _.map(results.sequence, function (item) {
    return items[item].task;
  });

  return this._sequencedTasks;
};


/**
 * Set global activity options for the entire workflow
 * @param {Object} options - Custom options to use for all defined activities.
 * @returns {Fragment} This workflow so you can chain commands.
 */
Fragment.prototype.activityDefaults = function activityDefaults(options) {
  if (_.isPlainObject(options)) {
    this.activityOptions = options;
  }

  return this; // chainable
};


/**
 * Add an activity to the workflow
 * @param {string} name - The unique name of the activity.
 * @param {Array} [deps] - The names of the dependencies that must be met before this activity can execute.
 * @param {Object} [options] - Custom options for this activity.
 * @returns {Fragment} This workflow so you can chain commands.
 */
Fragment.prototype.activity = function activity(name, deps, options) {
  if (!options && !_.isArray(deps)) {
    options = deps;
    deps = undefined;
  }
  deps = deps || [];
  options = options || {};

  options = _.defaults(options, this.activityOptions);

  var task = new Activity(name, deps, options);
  this._tasks.push(task); // Add the activity
  this._sequencedTasks = null;

  return this; // chainable
};


/**
 * Add an child workflow execution to the workflow
 * @param {string} name - The unique name of the child workflow.
 * @param {Array} [deps] - The names of the dependencies that must be met before this activity can execute.
 * @param {string} workflowName - The name of the workflow to execute.
 * @param {string} workflowVersion - The version of the workflow to execute.
 * @param {Object} [options] - Custom options for this activity.
 * @returns {Fragment} This workflow so you can chain commands.
 */
Fragment.prototype.child = function child(name, deps, workflowName, workflowVersion, options) {
  if (!options && !_.isArray(deps)) {
    options = workflowVersion;
    workflowVersion = workflowName;
    workflowName = deps;
    deps = undefined;
  }
  deps = deps || [];
  options = options || {};

  var task = new ChildWorkflow(name, deps, workflowName, workflowVersion, options);
  this._tasks.push(task); // Add the child workflow
  this._sequencedTasks = null;

  return this; // chainable
};


/**
 * Add a looping workflow execution to the workflow
 *
 * The loop executes in batches to help alleviate rate limit exceptions.
 * The number of items to proccess per batch and the delay between batches are both configurable.
 *
 * @param {string} name - The unique name of the loop fragment.
 * @param {Array} [deps] - The names of the dependencies that must be met before this activity can execute.
 * @param {Fragment} fragment - The workflow fragment to loop over. Generated by `usher.fragment()`
 * @param {Function} loopFn - A function given the taks's input that returns an array. For every item in the Array an
 *                            execution of the `fragment` workflow will execute.
 * @param {Object} [options] - Custom options for this activity. [itemsPerBatch, batchDelay]
 * @returns {Fragment} This workflow so you can chain commands.
 */
Fragment.prototype.loop = function loop(name, deps, fragment, loopFn, options) {
  if (!options && !_.isArray(deps)) {
    options = loopFn;
    loopFn = fragment;
    fragment = deps;
    deps = undefined;
  }
  deps = deps || [];

  var task = new Loop(name, deps, fragment, loopFn, options);
  this._tasks.push(task); // Add the loop
  this._sequencedTasks = null;

  return this; // chainable
};


/**
 * Add a while loop workflow execution to the workflow
 *
 * The loop executes until the `doneFn` returns a truthy value
 *
 * @param {string} name - The unique name of the while loop fragment.
 * @param {Array} [deps] - The names of the dependencies that must be met before this activity can execute.
 * @param {Fragment} fragment - The workflow fragment to loop over. Generated by `usher.fragment()`
 * @param {Function} doneFn - Indicates when the loop is complete by returning a truthy value.
 * @param {Object} [options] - Custom options for this activity.
 * @returns {Fragment} This workflow so you can chain commands.
 */
Fragment.prototype.whileLoop = function whileLoop(name, deps, fragment, doneFn, options) {
  if (!options && !_.isArray(deps)) {
    options = doneFn;
    doneFn = fragment;
    fragment = deps;
    deps = undefined;
  }
  deps = deps || [];

  var task = new WhileLoop(name, deps, fragment, doneFn, options);
  this._tasks.push(task); // Add the while loop
  this._sequencedTasks = null;

  return this; // chainable
};


/**
 * Add a accumulator execution to the workflow
 *
 * The accumulator executes a fragment until the `doneFn` returns a truthy value, taking the results
 * of each iteration and making them available to any dependents
 *
 * @param {string} name - The unique name of the accumulator fragment.
 * @param {Array} [deps] - The names of the dependencies that must be met before this activity can execute.
 * @param {Fragment} fragment - The workflow fragment to iterate over. Generated by `usher.fragment()`
 * @param {Function} doneFn - Indicates when the accumulator is complete by returning a truthy value.
 * @param {Object} [options] - Custom options for this activity.
 * @returns {Fragment} This workflow so you can chain commands.
 */
Fragment.prototype.accumulator = function accumulator(name, deps, fragment, doneFn, options) {
  if (!options && !_.isArray(deps)) {
    options = doneFn;
    doneFn = fragment;
    fragment = deps;
    deps = undefined;
  }
  deps = deps || [];

  var task = new Accumulator(name, deps, fragment, doneFn, options);
  this._tasks.push(task); // Add the while loop
  this._sequencedTasks = null;

  return this; // chainable
};


/**
 * Add a decision to the workflow
 * @param {string} name - The unique name of the decision.
 * @param {Array} [deps] - The names of the dependencies that must be met before this decision can execute.
 * @param {Fragment~decisionLogic} decisionFn - The logic for this decision.
 * @returns {Fragment} This workflow so you can chain commands.
 */
Fragment.prototype.decision = function decision(name, deps, decisionFn) {
  if (!decisionFn && !_.isArray(deps)) {
    decisionFn = deps;
    deps = undefined;
  }
  deps = deps || [];
  decisionFn = decisionFn || function () { return true; };

  var task = new Decision(name, deps, decisionFn);
  this._tasks.push(task); // Add the decision
  this._sequencedTasks = null;

  return this; // chainable
};

/**
 * The decision logic to execute when evaluating the given named decision
 * @callback Fragment~decisionLogic
 * @param {Object} input - The results of all dependencies for this decision
 * @return {Boolean} Should dependents of this decision execute
 */


/**
 * Add a termination point to the workflow
 * @param {string} name - The unique name that represents this termination point.
 * @param {Array} [deps] - The names of the dependencies that must be met before the workflow can terminate.
 * @returns {Fragment} This workflow so you can chain commands.
 */
Fragment.prototype.terminate = function terminate(name, deps) {
  deps = deps || [];

  var task = new Terminate(name, deps);
  this._tasks.push(task); // Add the terminate task
  this._sequencedTasks = null;

  return this; // chainable
};


/**
 * Add a transformation to the workflow. Transformations are good for manipulating the results of prior activities into new representations for future dependents.
 * @param {string} name - The unique name of the transformation.
 * @param {Array} [deps] - The names of the dependencies that must be met before this decision can execute.
 * @param {Fragment~transformationLogic} [transformFn] - The funtion that will perform the transformation.
 * @returns {Fragment} This workflow so you can chain commands.
 */
Fragment.prototype.transform = function transform(name, deps, transformFn) {
  if (!transformFn && !_.isArray(deps)) {
    transformFn = deps;
    deps = undefined;
  }
  deps = deps || [];
  transformFn = transformFn || function (input) { return input; };

  var task = new Transform(name, deps, transformFn);
  this._tasks.push(task); // Add the transformation
  this._sequencedTasks = null;

  return this; // chainable
};

/**
 * The tranformation to execute
 * @callback Fragment~transformationLogic
 * @param {Object} input - The results of all dependencies for this transformation
 * @return {*} The transformed input
 */


/**
 * Add a result transformation to the workflow. This sets the fragments results based on the `transformFn`
 * @param {string} name - The unique name of the result task.
 * @param {Array} [deps] - The names of the dependencies that must be met before this decision can execute.
 * @param {Fragment~transformationLogic} [transformFn] - The funtion that will perform the transformation.
 * @returns {Fragment} This workflow so you can chain commands.
 */
Fragment.prototype.result = function result(name, deps, transformFn) {
  if (!transformFn && !_.isArray(deps)) {
    transformFn = deps;
    deps = undefined;
  }
  deps = deps || [];
  transformFn = transformFn || function (input) { return input; };

  var task = new Result(name, deps, transformFn);
  this._tasks.push(task); // Add the transformation
  this._sequencedTasks = null;

  return this; // chainable
};


/**
 * Set an variable accessable to all future scheduled activities
 * @param {string} name - The unique name of the variable.
 * @param {Array} [deps] - The names of the dependencies that must be met before this variable can be set.
 * @param {Fragment~variableValueFunction} [valueFn] - The funtion that will calculate the variable name.
 * @returns {Fragment} This workflow so you can chain commands.
 */
Fragment.prototype.variable = function variable(name, deps, valueFn) {
  if (!valueFn && !_.isArray(deps)) {
    valueFn = deps;
    deps = undefined;
  }
  deps = deps || [];
  valueFn = valueFn || function (input) { return input; };

  var task = new Variable(name, deps, valueFn);
  this._tasks.push(task); // Add the variable task
  this._sequencedTasks = null;

  return this; // chainable
};

/**
 * A function to determine the value of the variable
 * @callback Fragment~variableValueFunction
 * @param {Object} input - The results of all dependencies for this variable
 * @return {*} The variable value
 */