/*!
* Usher
* Copyright(c) 2014 meltmedia <mike@meltmedia.com>
*/
'use strict';
var winston = require('winston'),
os = require('os'),
_ = require('lodash'),
async = require('async'),
swf = require('aws-swf'),
AWS = require('aws-sdk'),
backoff = require('backoff'),
semver = require('semver'),
usherUtil = require('../util'),
WorkflowVersion = require('./version');
module.exports = DecisionPoller;
/**
* Represents a single, named decision poller, where all workflow versions can be created. Tasks will automaticly be
* routed to the first workflow that satisfies the version requested. If no valid workflow is found, the task will
* be marked as a failure.
*
* @constructor
* @param {string} name - The name of the workflow.
* @param {string} domain - The AWS SWF domain name to use when listening for decision tasks.
* @param {object} [options] - Additional SWF options used when creating and executing this workflow
* (taskList, tagList, childPolicy, executionStartToCloseTimeout, taskStartToCloseTimeout)
*/
function DecisionPoller(name, domain, options) {
if (!(this instanceof DecisionPoller)) {
return new DecisionPoller(name, domain, options);
}
if (!_.isString(name)) {
throw new Error('A `name` is required');
}
if (!_.isString(domain)) {
throw new Error('A `domain` is required');
}
this.name = name;
this.domain = domain;
this.options = options || {};
// Make sure taskList is an object of { name: '' }
if (this.options.taskList && _.isString(this.options.taskList)) {
this.options.taskList = { name: this.options.taskList };
}
// Set default taskList if not defined
if (!this.options.taskList) {
this.options.taskList = { name: this.name + '-tasklist' };
}
/** @private */
this._registrations = [];
this._workflows = [];
this._poller = undefined;
}
/**
* Get or create a new WorkflowVersion to handle the given version
* @param {string} version - Version(s) this workflow can handle (conforms to v2.0 of http://semver.org)
* @returns {WorkflowVersion} This workflow version so you can configure the actual decider.
*/
DecisionPoller.prototype.version = function version(ver) {
var workflow = _.find(this._workflows, { version: ver });
if (!workflow) {
workflow = new WorkflowVersion(this.name, ver, this.domain, this.options);
this._workflows.push(workflow);
// Sort the workflows by their version so we can pick the best first match
this._workflows.sort(function (w1, w2) {
return semver.compare(w1.version, w2.version);
});
}
return workflow;
};
/**
* Register a SWF Workflow
* @param {string} name - The name of the workflow.
* @param {string} version - The AWS version of the workflow being registered.
* @returns {DecisionPoller} This Poller so you can chain commands.
*/
DecisionPoller.prototype.register = function register(name, version) {
this._registrations.push({ name: name, version: version });
return this;
};
/**
* Execute the proper workflow version
* @param {Object} input - Input to the workflow
* @param {string} version - Version of the workflow to execute (conforms to v2.0 of http://semver.org)
* @param {Object} [tags] - Tags to mark the workflow with
* @param {Function} [cb] - Callback when done
* @returns {Workflow} This workflow version so you can configure the actual decider.
*/
/**
* Execute a new run of this workflow
* @returns {WorkflowVersion} This workflow so you can chain commands.
*/
DecisionPoller.prototype.execute = function execute(input, version, tags, cb) {
if (!input && !version) {
throw new Error('Missing paramaters for workflow execution: input and version required');
}
cb = cb || (_.isFunction(tags) ? tags : function () {});
tags = _.isFunction(tags) ? [] : (tags || []);
var config = usherUtil.workflowConfig(this.name, version, input, this.options);
// Merge local tags with global tags
config.tagList = _.isArray(config.tagList) ? config.tagList.concat(tags) : tags;
// The domain is not always needed so it's not set by the workflow config
config.domain = this.domain;
var workflow = new swf.Workflow(config, new AWS.SWF());
var workflowExecution = workflow.start({ input: JSON.stringify(input) }, function (err, runId) {
if (err) {
return cb(err);
}
cb(null, runId, workflowExecution.workflowId);
});
return this; // chainable
};
/**
* Start listening for decision tasks from SWF
*
* @returns {DecisionPoller} This workflow poller
*/
DecisionPoller.prototype.start = function start() {
var self = this;
// If we already have a poller, skip setup
if (!_.isEmpty(this._poller)) {
return;
}
// Before starting, ensure each workflow version is valid
var valid = _.every(this._workflows, function (workflow) {
try {
workflow.validate();
return true;
} catch (err) {
winston.log('error', 'The workflow: %s for version: %s is invalid due to:', workflow.name, workflow.version, err);
return false;
}
});
if (!valid) {
winston.log('error', 'Unable to start workflow: %s', this.name);
return;
}
var config = {
'domain': this.domain,
'taskList': this.options.taskList,
'identity': this.name + '-' + os.hostname() + '-' + process.pid,
'maximumPageSize': 100,
'reverseOrder': false
};
this._poller = new swf.Decider(config, new AWS.SWF());
this._poller.on('decisionTask', this._onDecisionTask.bind(this));
// Setup poller restart backoff strategy
this._pollerBackoff = backoff.fibonacci({
randomisationFactor: 0,
initialDelay: 10,
maxDelay: 30000
});
this._pollerBackoff.on('backoff', function (number, delay) {
winston.log('warn', 'Poller: %s failed %d time(s), restarting in %d ms', self.name, number, delay, {});
});
this._pollerBackoff.on('ready', function () {
winston.log('info', 'Restarting poller: %s', self.name);
self._poller.start();
}.bind(this));
this._pollerBackoff.on('fail', function (number) {
winston.log('error', 'Poller: %s failed too many times [%d], will not restart', self.name, number, {});
});
// For debug purposes only
this._poller.on('poll', function () {
winston.log('silly', 'Polling for decision tasks in poller: %s ...', self.name);
});
this._poller.on('error', function (err) {
winston.log('error', 'An error occured in poller: %s due to: ', self.name, err);
// Attempt to start polling again
self._pollerBackoff.backoff();
}.bind(this));
winston.log('debug', 'Registering %s workflows', this._registrations.length);
this._registerWorkflows(function () {
if (!_.isEmpty(self._poller)) {
// Start the poller
winston.log('info', 'Starting poller: %s', self.name);
self._poller.start();
}
});
return this; // chainable
};
/**
* Stop listening for decision tasks from SWF
*
* @returns {DecisionPoller} This workflow poller
*/
DecisionPoller.prototype.stop = function stop() {
if (!_.isEmpty(this._poller)) {
// Stop the poller
this._poller.stop();
// Remove the instance so config changes can be made between start/stop cycles
delete this._poller;
}
if (!_.isEmpty(this._pollerBackoff)) {
// Reset the backoff strategy so anything in progress stops
this._pollerBackoff.reset();
// Remove the backoff instance
delete this._pollerBackoff;
}
return this; // chainable
};
/** @private */
DecisionPoller.prototype._registerWorkflows = function _registerWorkflows(done) {
var aws = new AWS.SWF(),
self = this;
async.each(this._registrations, function (item, next) {
aws.registerWorkflowType({
domain: self.domain,
name: item.name,
version: item.version || '1.0.0',
defaultTaskList: self.options.taskList,
defaultExecutionStartToCloseTimeout: self.options.defaultExecutionStartToCloseTimeout,
defaultTaskStartToCloseTimeout: self.options.defaultTaskStartToCloseTimeout,
defaultTaskPriority: self.options.defaultTaskPriority,
defaultChildPolicy: self.options.defaultChildPolicy
},
function () {
next();
}
);
}, done);
};
DecisionPoller.prototype._onDecisionTask = function _onDecisionTask(task) {
// If we are handling request, we can successfuly reset our backoff strategy
if (this._pollerBackoff) {
this._pollerBackoff.reset();
}
var name = task.config.workflowType.name,
version = usherUtil.semverPad(task.config.workflowType.version);
// Lookup workflow...
var workflow = _.find(this._workflows, function (item) {
return semver.satisfies(version, item.version);
});
// Fail the workflow if we can not find a workflow that satisfies the requested version
if (!workflow) {
var message = {
name: 'NoValidVersionFound',
retriable: false,
message: 'No workflow satisfies version: ' + version
};
task.response.fail(message.name, message, function (err) {
if (err) {
winston.log('error', 'Unable to mark workflow: %s as failed due to: %s', name, JSON.stringify(err));
return;
}
});
return;
}
// If we get here we have a valid workflow so we let it do it's thing
workflow._handleTask(task);
};