UNPKG

7.07 kBJavaScriptView Raw
1'use strict';
2
3const get = require('lodash.get');
4const isObject = require('lodash.isobject');
5const isString = require('lodash.isstring');
6const omit = require('lodash.omit');
7
8/**
9 * Because both kes and message adapter use Mustache for templating,
10 * we add curly brackets to items that are using the [$] and {$} syntax
11 * to produce {{$}} and {[$]}
12 *
13 * @param {Object} cumulusConfig - the CumulusConfig portion of a task definition
14 * @returns {Object} updated CumulusConfig
15 */
16function fixCumulusMessageSyntax(cumulusConfig) {
17 if (!cumulusConfig) return {};
18
19 const test = new RegExp('^([\\{]{1}|[\\[]{1})(\\$.*)([\\]]{1}|[\\}]{1})$');
20
21 Object.keys(cumulusConfig).forEach((n) => {
22 if (isObject(cumulusConfig[n])) {
23 // eslint-disable-next-line no-param-reassign
24 cumulusConfig[n] = fixCumulusMessageSyntax(cumulusConfig[n]);
25 } else if (isString(cumulusConfig[n])) {
26 const match = cumulusConfig[n].match(test);
27 if (match) {
28 // eslint-disable-next-line no-param-reassign
29 cumulusConfig[n] = `{${match[0]}}`;
30 }
31 }
32 });
33
34 return cumulusConfig;
35}
36
37
38/**
39 * Extracts Cumulus Configuration from each Step Function Workflow
40 * and returns it as a separate object
41 *
42 * @param {Object} config - Kes config object
43 * @returns {Object} updated kes config object
44 */
45function extractCumulusConfigFromSF(config) {
46 const workflowConfigs = {};
47
48 // loop through the message adapter config of each step of
49 // the step function, add curly brackets to values
50 // with dollar sign and remove config key from the
51 // definition, otherwise CloudFormation will be mad
52 // at us.
53 Object.keys(config.stepFunctions).forEach((name) => {
54 const sf = config.stepFunctions[name];
55 workflowConfigs[name] = {};
56 Object.keys(sf.States).forEach((n) => {
57 workflowConfigs[name][n] = fixCumulusMessageSyntax(sf.States[n].CumulusConfig);
58 sf.States[n] = omit(sf.States[n], ['CumulusConfig']);
59 });
60 // eslint-disable-next-line no-param-reassign
61 config.stepFunctions[name] = sf;
62 });
63
64 // eslint-disable-next-line no-param-reassign
65 config.workflowConfigs = workflowConfigs;
66 return config;
67}
68
69/**
70 * Returns the OutputValue of a CloudFormation Output
71 *
72 * @param {Object} outputs - list of CloudFormation Outputs
73 * @param {string} key - the key to return the value of
74 *
75 * @returns {string} the output value
76 */
77function findOutputValue(outputs, key) {
78 const output = outputs.find((o) => (o.OutputKey === key));
79 if (output) return output.OutputValue;
80 return undefined;
81}
82
83/**
84 * Generates a Cumulus Message template for a Cumulus Workflow
85 *
86 * @param {string} name - name of the workflow
87 * @param {Object} workflow - the Step Function workflow object
88 * @param {Object} config - Kes config object
89 * @param {Array} outputs - an list of CloudFormation outputs
90 *
91 * @returns {Object} a Cumulus Message template
92 */
93function generateWorkflowTemplate(name, workflow, config, outputs) {
94 // get cmr password from outputs
95 const cmrPassword = findOutputValue(outputs, 'EncryptedCmrPassword');
96 const cmr = Object.assign({}, config.cmr, { password: cmrPassword });
97 // get launchpad passphrase from outputs
98 const launchpadPassphrase = findOutputValue(outputs, 'EncryptedLaunchpadPassphrase');
99 const launchpad = Object.assign({}, config.launchpad, { passphrase: launchpadPassphrase });
100 const bucket = get(config, 'system_bucket');
101
102 // add the current workflows' state machine arn
103 const stateMachineArn = findOutputValue(outputs, `${name}StateMachine`);
104
105 // add queues
106 const queues = {};
107 const queueExecutionLimits = {};
108 if (config.sqs) {
109 const queueArns = outputs.filter((o) => o.OutputKey.includes('SQSOutput'));
110
111 queueArns.forEach((queue) => {
112 const queueName = queue.OutputKey.replace('SQSOutput', '');
113 const queueUrl = queue.OutputValue;
114
115 queues[queueName] = queueUrl;
116
117 const maxExecutions = get(config.sqs, `${queueName}.maxExecutions`);
118 if (maxExecutions) {
119 queueExecutionLimits[queueName] = maxExecutions;
120 }
121 });
122 }
123
124 // add the cumulus message config of the current workflow
125 const workflowConfig = {};
126 const states = get(workflow, 'States', {});
127 Object.keys(states).forEach((state) => {
128 workflowConfig[state] = config.workflowConfigs[name][state];
129 });
130
131 // add the s3 uri to all the workflow templates for the current stack
132 const templatesUris = {};
133 const stepFunctions = get(config, 'stepFunctions', {});
134 Object.keys(stepFunctions).forEach((sf) => {
135 templatesUris[sf] = `s3://${bucket}/${config.stack}/workflows/${sf}.json`;
136 });
137
138 const template = {
139 cumulus_meta: {
140 message_source: 'sfn',
141 system_bucket: bucket,
142 state_machine: stateMachineArn,
143 execution_name: null,
144 workflow_start_time: null
145 },
146 meta: {
147 workflow_name: name,
148 workflow_tasks: {},
149 stack: config.stackName,
150 buckets: config.buckets,
151 cmr,
152 launchpad,
153 distribution_endpoint: config.distribution_endpoint,
154 collection: {},
155 provider: {},
156 templates: templatesUris,
157 queues,
158 queueExecutionLimits
159 },
160 workflow_config: workflowConfig,
161 payload: {},
162 exception: null
163 };
164
165 return template;
166}
167
168/**
169 * Generate a Cumulus Message templates for all the workflows
170 * in the stack and upload to s3
171 *
172 * @param {Object} config - Kes config object
173 * @param {Array} outputs - an list of CloudFormation outputs
174 * @param {function} uploader - an uploader function
175 *
176 * @returns {Promise} undefined
177 */
178async function generateTemplates(config, outputs, uploader) {
179 // this function only works if there are step functions defined in the deployment
180 if (config.stepFunctions) {
181 const bucket = config.system_bucket;
182 const stack = config.stackName;
183 const templates = Object.keys(config.stepFunctions)
184 .map((name) => generateWorkflowTemplate(name, config.stepFunctions[name], config, outputs));
185
186 // uploads the generated templates to S3
187 const workflows = [];
188 console.log('Uploading Cumulus Message Templates for each Workflow ...');
189 for (let ctr = 0; ctr < templates.length; ctr += 1) {
190 const t = templates[ctr];
191 const name = t.meta.workflow_name;
192 const key = `${stack}/workflows/${name}.json`;
193 await uploader(bucket, key, JSON.stringify(t)); // eslint-disable-line no-await-in-loop
194 workflows.push({
195 name,
196 template: `s3://${bucket}/${key}`,
197 definition: config.stepFunctions[name]
198 });
199 }
200
201 // generate list of workflows and upload it to S3
202 // this is used by the /workflows endpoint of the API to return list
203 // of existing workflows
204 await uploader(bucket, `${stack}/workflows/list.json`, JSON.stringify(workflows));
205
206 // upload the buckets config
207 await uploader(bucket, `${stack}/workflows/buckets.json`, JSON.stringify(config.buckets));
208 }
209}
210
211module.exports = {
212 fixCumulusMessageSyntax,
213 extractCumulusConfigFromSF,
214 findOutputValue,
215 generateWorkflowTemplate,
216 generateTemplates
217};