UNPKG

7.74 kBJavaScriptView Raw
1'use strict';
2
3/**
4 * Generate a Cumulus Message templates for all the workflows
5 * in the stack and upload to s3
6 *
7 * @param {Object} config - Kes config object
8 * @param {Array} outputs - an list of CloudFormation outputs
9 * @param {function} uploader - an uploader function
10 *
11 * @returns {Promise} undefined
12 */
13let generateTemplates = (() => {
14 var _ref = _asyncToGenerator(function* (config, outputs, uploader) {
15 // this function only works if there are step functions defined in the deployment
16 if (config.stepFunctions) {
17 const bucket = config.system_bucket;
18 const stack = config.stackName;
19 const templates = Object.keys(config.stepFunctions).map(function (name) {
20 return generateWorkflowTemplate(name, config.stepFunctions[name], config, outputs);
21 });
22
23 // uploads the generated templates to S3
24 const workflows = [];
25 console.log('Uploading Cumulus Message Templates for each Workflow ...');
26 for (let ctr = 0; ctr < templates.length; ctr += 1) {
27 const t = templates[ctr];
28 const name = t.meta.workflow_name;
29 const key = `${stack}/workflows/${name}.json`;
30 yield uploader(bucket, key, JSON.stringify(t)); // eslint-disable-line no-await-in-loop
31 workflows.push({
32 name,
33 template: `s3://${bucket}/${key}`,
34 definition: config.stepFunctions[name]
35 });
36 }
37
38 // generate list of workflows and upload it to S3
39 // this is used by the /workflows endpoint of the API to return list
40 // of existing workflows
41 yield uploader(bucket, `${stack}/workflows/list.json`, JSON.stringify(workflows));
42
43 // upload the buckets config
44 yield uploader(bucket, `${stack}/workflows/buckets.json`, JSON.stringify(config.buckets));
45 }
46 });
47
48 return function generateTemplates(_x, _x2, _x3) {
49 return _ref.apply(this, arguments);
50 };
51})();
52
53function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
54
55const get = require('lodash.get');
56const isObject = require('lodash.isobject');
57const isString = require('lodash.isstring');
58const omit = require('lodash.omit');
59
60/**
61 * Because both kes and message adapter use Mustache for templating,
62 * we add curly brackets to items that are using the [$] and {$} syntax
63 * to produce {{$}} and {[$]}
64 *
65 * @param {Object} cumulusConfig - the CumulusConfig portion of a task definition
66 * @returns {Object} updated CumulusConfig
67 */
68function fixCumulusMessageSyntax(cumulusConfig) {
69 if (!cumulusConfig) return {};
70
71 const test = new RegExp('^([\\{]{1}|[\\[]{1})(\\$.*)([\\]]{1}|[\\}]{1})$');
72
73 Object.keys(cumulusConfig).forEach(n => {
74 if (isObject(cumulusConfig[n])) {
75 // eslint-disable-next-line no-param-reassign
76 cumulusConfig[n] = fixCumulusMessageSyntax(cumulusConfig[n]);
77 } else if (isString(cumulusConfig[n])) {
78 const match = cumulusConfig[n].match(test);
79 if (match) {
80 // eslint-disable-next-line no-param-reassign
81 cumulusConfig[n] = `{${match[0]}}`;
82 }
83 }
84 });
85
86 return cumulusConfig;
87}
88
89/**
90 * Extracts Cumulus Configuration from each Step Function Workflow
91 * and returns it as a separate object
92 *
93 * @param {Object} config - Kes config object
94 * @returns {Object} updated kes config object
95 */
96function extractCumulusConfigFromSF(config) {
97 const workflowConfigs = {};
98
99 // loop through the message adapter config of each step of
100 // the step function, add curly brackets to values
101 // with dollar sign and remove config key from the
102 // definition, otherwise CloudFormation will be mad
103 // at us.
104 Object.keys(config.stepFunctions).forEach(name => {
105 const sf = config.stepFunctions[name];
106 workflowConfigs[name] = {};
107 Object.keys(sf.States).forEach(n => {
108 workflowConfigs[name][n] = fixCumulusMessageSyntax(sf.States[n].CumulusConfig);
109 sf.States[n] = omit(sf.States[n], ['CumulusConfig']);
110 });
111 // eslint-disable-next-line no-param-reassign
112 config.stepFunctions[name] = sf;
113 });
114
115 // eslint-disable-next-line no-param-reassign
116 config.workflowConfigs = workflowConfigs;
117 return config;
118}
119
120/**
121 * Returns the OutputValue of a CloudFormation Output
122 *
123 * @param {Object} outputs - list of CloudFormation Outputs
124 * @param {string} key - the key to return the value of
125 *
126 * @returns {string} the output value
127 */
128function findOutputValue(outputs, key) {
129 const output = outputs.find(o => o.OutputKey === key);
130 if (output) return output.OutputValue;
131 return undefined;
132}
133
134/**
135 * Generates a Cumulus Message template for a Cumulus Workflow
136 *
137 * @param {string} name - name of the workflow
138 * @param {Object} workflow - the Step Function workflow object
139 * @param {Object} config - Kes config object
140 * @param {Array} outputs - an list of CloudFormation outputs
141 *
142 * @returns {Object} a Cumulus Message template
143 */
144function generateWorkflowTemplate(name, workflow, config, outputs) {
145 // get cmr password from outputs
146 const cmrPassword = findOutputValue(outputs, 'EncryptedCmrPassword');
147 const cmr = Object.assign({}, config.cmr, { password: cmrPassword });
148 // get launchpad passphrase from outputs
149 const launchpadPassphrase = findOutputValue(outputs, 'EncryptedLaunchpadPassphrase');
150 const launchpad = Object.assign({}, config.launchpad, { passphrase: launchpadPassphrase });
151 const bucket = get(config, 'system_bucket');
152
153 // add the current workflows' state machine arn
154 const stateMachineArn = findOutputValue(outputs, `${name}StateMachine`);
155
156 // add queues
157 const queues = {};
158 const queueExecutionLimits = {};
159 if (config.sqs) {
160 const queueArns = outputs.filter(o => o.OutputKey.includes('SQSOutput'));
161
162 queueArns.forEach(queue => {
163 const queueName = queue.OutputKey.replace('SQSOutput', '');
164 const queueUrl = queue.OutputValue;
165
166 queues[queueName] = queueUrl;
167
168 const maxExecutions = get(config.sqs, `${queueName}.maxExecutions`);
169 if (maxExecutions) {
170 queueExecutionLimits[queueName] = maxExecutions;
171 }
172 });
173 }
174
175 // add the cumulus message config of the current workflow
176 const workflowConfig = {};
177 const states = get(workflow, 'States', {});
178 Object.keys(states).forEach(state => {
179 workflowConfig[state] = config.workflowConfigs[name][state];
180 });
181
182 // add the s3 uri to all the workflow templates for the current stack
183 const templatesUris = {};
184 const stepFunctions = get(config, 'stepFunctions', {});
185 Object.keys(stepFunctions).forEach(sf => {
186 templatesUris[sf] = `s3://${bucket}/${config.stack}/workflows/${sf}.json`;
187 });
188
189 const template = {
190 cumulus_meta: {
191 message_source: 'sfn',
192 system_bucket: bucket,
193 state_machine: stateMachineArn,
194 execution_name: null,
195 workflow_start_time: null
196 },
197 meta: {
198 workflow_name: name,
199 workflow_tasks: {},
200 stack: config.stackName,
201 buckets: config.buckets,
202 cmr,
203 launchpad,
204 distribution_endpoint: config.distribution_endpoint,
205 collection: {},
206 provider: {},
207 templates: templatesUris,
208 queues,
209 queueExecutionLimits
210 },
211 workflow_config: workflowConfig,
212 payload: {},
213 exception: null
214 };
215
216 return template;
217}
218
219module.exports = {
220 fixCumulusMessageSyntax,
221 extractCumulusConfigFromSF,
222 findOutputValue,
223 generateWorkflowTemplate,
224 generateTemplates
225};
\No newline at end of file