UNPKG

7.03 kBJavaScriptView Raw
1'use strict';
2
3/**
4 * Generate a Cumulus Message templates for all the workflows
5 * in the stack and upload to s3
6 *
7 * @param {Object} config - Kes config object
8 * @param {Array} outputs - an list of CloudFormation outputs
9 * @param {function} uploader - an uploader function
10 *
11 * @returns {Promise} undefined
12 */
13let generateTemplates = (() => {
14 var _ref = _asyncToGenerator(function* (config, outputs, uploader) {
15 // this function only works if there are step functions defined in the deployment
16 if (config.stepFunctions) {
17 const bucket = config.system_bucket;
18 const stack = config.stackName;
19
20 // generate workflow message template and upload it to s3.
21 const template = generateWorkflowTemplate(config, outputs);
22 console.log('Uploading Cumulus Universal Workflow Message Template ...');
23 const key = `${stack}/workflow_template.json`;
24 yield uploader(bucket, key, JSON.stringify(template));
25
26 // generate list of workflows and upload it to S3
27 // this is used by the /workflows endpoint of the API to return list
28 // of existing workflows
29 const workflowUploads = Object.keys(config.stepFunctions).map(function (name) {
30 const arn = findOutputValue(outputs, `${name}StateMachine`);
31 return uploader(bucket, `${stack}/workflows/${name}.json`, JSON.stringify({
32 name,
33 arn,
34 definition: config.stepFunctions[name]
35 }));
36 });
37 yield Promise.all(workflowUploads);
38
39 // upload the buckets config
40 yield uploader(bucket, `${stack}/workflows/buckets.json`, JSON.stringify(config.buckets));
41 }
42 });
43
44 return function generateTemplates(_x, _x2, _x3) {
45 return _ref.apply(this, arguments);
46 };
47})();
48
49function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
50
51const get = require('lodash.get');
52const isObject = require('lodash.isobject');
53const isString = require('lodash.isstring');
54const omit = require('lodash.omit');
55const { deprecate } = require('@cumulus/common/util');
56
57/**
58 * Because both kes and message adapter use Mustache for templating,
59 * we add curly brackets to items that are using the [$] and {$} syntax
60 * to produce {{$}} and {[$]}
61 *
62 * @param {Object} cumulusConfig - the CumulusConfig portion of a task definition
63 * @returns {Object} updated CumulusConfig
64 */
65function fixCumulusMessageSyntax(cumulusConfig) {
66 if (!cumulusConfig) return {};
67 deprecate('CumulusConfig', '1.15.0', 'AWS Parameters with task_config');
68 const test = new RegExp('^([\\{]{1}|[\\[]{1})(\\$.*)([\\]]{1}|[\\}]{1})$');
69
70 Object.keys(cumulusConfig).forEach(n => {
71 if (isObject(cumulusConfig[n])) {
72 // eslint-disable-next-line no-param-reassign
73 cumulusConfig[n] = fixCumulusMessageSyntax(cumulusConfig[n]);
74 } else if (isString(cumulusConfig[n])) {
75 const match = cumulusConfig[n].match(test);
76 if (match) {
77 // eslint-disable-next-line no-param-reassign
78 cumulusConfig[n] = `{${match[0]}}`;
79 }
80 }
81 });
82
83 return cumulusConfig;
84}
85
86/**
87 * Extracts Cumulus Configuration from each Step Function Workflow
88 * and returns it as a separate object
89 *
90 * @param {Object} config - Kes config object
91 * @returns {Object} updated kes config object
92 */
93function extractCumulusConfigFromSF(config) {
94 const workflowConfigs = {};
95
96 // loop through the message adapter config of each step of
97 // the step function, add curly brackets to values
98 // with dollar sign and remove config key from the
99 // definition, otherwise CloudFormation will be mad
100 // at us.
101 Object.keys(config.stepFunctions).forEach(name => {
102 const sf = config.stepFunctions[name];
103 workflowConfigs[name] = {};
104 Object.keys(sf.States).forEach(n => {
105 workflowConfigs[name][n] = fixCumulusMessageSyntax(sf.States[n].CumulusConfig);
106 sf.States[n] = omit(sf.States[n], ['CumulusConfig']);
107 });
108 // eslint-disable-next-line no-param-reassign
109 config.stepFunctions[name] = sf;
110 });
111
112 // eslint-disable-next-line no-param-reassign
113 config.workflowConfigs = workflowConfigs;
114 if (Object.keys(workflowConfigs)) deprecate('CumulusConfig', '1.15.0', 'AWS Parameters with task_config');
115 return config;
116}
117
118/**
119 * Returns the OutputValue of a CloudFormation Output
120 *
121 * @param {Object} outputs - list of CloudFormation Outputs
122 * @param {string} key - the key to return the value of
123 *
124 * @returns {string} the output value
125 */
126function findOutputValue(outputs, key) {
127 const output = outputs.find(o => o.OutputKey === key);
128 if (output) return output.OutputValue;
129 return undefined;
130}
131
132/**
133 * Generates a universal Cumulus Message template for a Cumulus Workflow
134 *
135 * @param {Object} config - Kes config object
136 * @param {Array} outputs - an list of CloudFormation outputs
137 *
138 * @returns {Object} a Cumulus Message template
139 */
140function generateWorkflowTemplate(config, outputs) {
141 // get cmr password from outputs
142 const cmrPassword = findOutputValue(outputs, 'EncryptedCmrPassword');
143 const cmr = Object.assign({}, config.cmr, { password: cmrPassword });
144 // get launchpad passphrase from outputs
145 const launchpadPassphrase = findOutputValue(outputs, 'EncryptedLaunchpadPassphrase');
146 const launchpad = Object.assign({}, config.launchpad, { passphrase: launchpadPassphrase });
147 const bucket = get(config, 'system_bucket');
148
149 // add queues
150 const queues = {};
151 const queueExecutionLimits = {};
152 if (config.sqs) {
153 const queueArns = outputs.filter(o => o.OutputKey.includes('SQSOutput'));
154
155 queueArns.forEach(queue => {
156 const queueName = queue.OutputKey.replace('SQSOutput', '');
157 const queueUrl = queue.OutputValue;
158
159 queues[queueName] = queueUrl;
160
161 const maxExecutions = get(config.sqs, `${queueName}.maxExecutions`);
162 if (maxExecutions) {
163 queueExecutionLimits[queueName] = maxExecutions;
164 }
165 });
166 }
167
168 const template = {
169 cumulus_meta: {
170 message_source: 'sfn',
171 system_bucket: bucket,
172 state_machine: null,
173 execution_name: null,
174 workflow_start_time: null
175 },
176 meta: {
177 workflow_name: null,
178 workflow_tasks: {},
179 stack: config.stackName,
180 buckets: config.buckets,
181 cmr,
182 launchpad,
183 distribution_endpoint: config.distribution_endpoint,
184 collection: {},
185 provider: {},
186 template: `s3://${bucket}/${config.stack}/workflow_template.json`,
187 queues,
188 queueExecutionLimits
189 },
190 payload: {},
191 exception: null
192 };
193
194 return template;
195}
196
197module.exports = {
198 fixCumulusMessageSyntax,
199 extractCumulusConfigFromSF,
200 findOutputValue,
201 generateWorkflowTemplate,
202 generateTemplates
203};
\No newline at end of file