UNPKG

6.38 kBJavaScriptView Raw
1'use strict';
2
3const get = require('lodash.get');
4const isObject = require('lodash.isobject');
5const isString = require('lodash.isstring');
6const omit = require('lodash.omit');
7const { deprecate } = require('@cumulus/common/util');
8
9/**
10 * Because both kes and message adapter use Mustache for templating,
11 * we add curly brackets to items that are using the [$] and {$} syntax
12 * to produce {{$}} and {[$]}
13 *
14 * @param {Object} cumulusConfig - the CumulusConfig portion of a task definition
15 * @returns {Object} updated CumulusConfig
16 */
17function fixCumulusMessageSyntax(cumulusConfig) {
18 if (!cumulusConfig) return {};
19 deprecate('CumulusConfig', '1.15.0', 'AWS Parameters with task_config');
20 const test = new RegExp('^([\\{]{1}|[\\[]{1})(\\$.*)([\\]]{1}|[\\}]{1})$');
21
22 Object.keys(cumulusConfig).forEach((n) => {
23 if (isObject(cumulusConfig[n])) {
24 // eslint-disable-next-line no-param-reassign
25 cumulusConfig[n] = fixCumulusMessageSyntax(cumulusConfig[n]);
26 } else if (isString(cumulusConfig[n])) {
27 const match = cumulusConfig[n].match(test);
28 if (match) {
29 // eslint-disable-next-line no-param-reassign
30 cumulusConfig[n] = `{${match[0]}}`;
31 }
32 }
33 });
34
35 return cumulusConfig;
36}
37
38
39/**
40 * Extracts Cumulus Configuration from each Step Function Workflow
41 * and returns it as a separate object
42 *
43 * @param {Object} config - Kes config object
44 * @returns {Object} updated kes config object
45 */
46function extractCumulusConfigFromSF(config) {
47 const workflowConfigs = {};
48
49 // loop through the message adapter config of each step of
50 // the step function, add curly brackets to values
51 // with dollar sign and remove config key from the
52 // definition, otherwise CloudFormation will be mad
53 // at us.
54 Object.keys(config.stepFunctions).forEach((name) => {
55 const sf = config.stepFunctions[name];
56 workflowConfigs[name] = {};
57 Object.keys(sf.States).forEach((n) => {
58 workflowConfigs[name][n] = fixCumulusMessageSyntax(sf.States[n].CumulusConfig);
59 sf.States[n] = omit(sf.States[n], ['CumulusConfig']);
60 });
61 // eslint-disable-next-line no-param-reassign
62 config.stepFunctions[name] = sf;
63 });
64
65 // eslint-disable-next-line no-param-reassign
66 config.workflowConfigs = workflowConfigs;
67 if (Object.keys(workflowConfigs)) deprecate('CumulusConfig', '1.15.0', 'AWS Parameters with task_config');
68 return config;
69}
70
71/**
72 * Returns the OutputValue of a CloudFormation Output
73 *
74 * @param {Object} outputs - list of CloudFormation Outputs
75 * @param {string} key - the key to return the value of
76 *
77 * @returns {string} the output value
78 */
79function findOutputValue(outputs, key) {
80 const output = outputs.find((o) => (o.OutputKey === key));
81 if (output) return output.OutputValue;
82 return undefined;
83}
84
85/**
86 * Generates a universal Cumulus Message template for a Cumulus Workflow
87 *
88 * @param {Object} config - Kes config object
89 * @param {Array} outputs - an list of CloudFormation outputs
90 *
91 * @returns {Object} a Cumulus Message template
92 */
93function generateWorkflowTemplate(config, outputs) {
94 // get cmr password from outputs
95 const cmrPassword = findOutputValue(outputs, 'EncryptedCmrPassword');
96 const cmr = Object.assign({}, config.cmr, { password: cmrPassword });
97 // get launchpad passphrase from outputs
98 const launchpadPassphrase = findOutputValue(outputs, 'EncryptedLaunchpadPassphrase');
99 const launchpad = Object.assign({}, config.launchpad, { passphrase: launchpadPassphrase });
100 const bucket = get(config, 'system_bucket');
101
102 // add queues
103 const queues = {};
104 const queueExecutionLimits = {};
105 if (config.sqs) {
106 const queueArns = outputs.filter((o) => o.OutputKey.includes('SQSOutput'));
107
108 queueArns.forEach((queue) => {
109 const queueName = queue.OutputKey.replace('SQSOutput', '');
110 const queueUrl = queue.OutputValue;
111
112 queues[queueName] = queueUrl;
113
114 const maxExecutions = get(config.sqs, `${queueName}.maxExecutions`);
115 if (maxExecutions) {
116 queueExecutionLimits[queueName] = maxExecutions;
117 }
118 });
119 }
120
121 const template = {
122 cumulus_meta: {
123 message_source: 'sfn',
124 system_bucket: bucket,
125 state_machine: null,
126 execution_name: null,
127 workflow_start_time: null
128 },
129 meta: {
130 workflow_name: null,
131 workflow_tasks: {},
132 stack: config.stackName,
133 buckets: config.buckets,
134 cmr,
135 launchpad,
136 distribution_endpoint: config.distribution_endpoint,
137 collection: {},
138 provider: {},
139 template: `s3://${bucket}/${config.stack}/workflow_template.json`,
140 queues,
141 queueExecutionLimits
142 },
143 payload: {},
144 exception: null
145 };
146
147 return template;
148}
149
150/**
151 * Generate a Cumulus Message templates for all the workflows
152 * in the stack and upload to s3
153 *
154 * @param {Object} config - Kes config object
155 * @param {Array} outputs - an list of CloudFormation outputs
156 * @param {function} uploader - an uploader function
157 *
158 * @returns {Promise} undefined
159 */
160async function generateTemplates(config, outputs, uploader) {
161 // this function only works if there are step functions defined in the deployment
162 if (config.stepFunctions) {
163 const bucket = config.system_bucket;
164 const stack = config.stackName;
165
166 // generate workflow message template and upload it to s3.
167 const template = generateWorkflowTemplate(config, outputs);
168 console.log('Uploading Cumulus Universal Workflow Message Template ...');
169 const key = `${stack}/workflow_template.json`;
170 await uploader(bucket, key, JSON.stringify(template));
171
172 // generate list of workflows and upload it to S3
173 // this is used by the /workflows endpoint of the API to return list
174 // of existing workflows
175 const workflowUploads = Object.keys(config.stepFunctions).map((name) => {
176 const arn = findOutputValue(outputs, `${name}StateMachine`);
177 return uploader(bucket, `${stack}/workflows/${name}.json`, JSON.stringify({
178 name,
179 arn,
180 definition: config.stepFunctions[name]
181 }));
182 });
183 await Promise.all(workflowUploads);
184
185 // upload the buckets config
186 await uploader(bucket, `${stack}/workflows/buckets.json`, JSON.stringify(config.buckets));
187 }
188}
189
190module.exports = {
191 fixCumulusMessageSyntax,
192 extractCumulusConfigFromSF,
193 findOutputValue,
194 generateWorkflowTemplate,
195 generateTemplates
196};