1 | 'use strict';
|
2 |
|
3 | const get = require('lodash.get');
|
4 | const isObject = require('lodash.isobject');
|
5 | const isString = require('lodash.isstring');
|
6 | const omit = require('lodash.omit');
|
7 | const { deprecate } = require('@cumulus/common/util');
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 | function fixCumulusMessageSyntax(cumulusConfig) {
|
18 | if (!cumulusConfig) return {};
|
19 | deprecate('CumulusConfig', '1.15.0', 'AWS Parameters with task_config');
|
20 | const test = new RegExp('^([\\{]{1}|[\\[]{1})(\\$.*)([\\]]{1}|[\\}]{1})$');
|
21 |
|
22 | Object.keys(cumulusConfig).forEach((n) => {
|
23 | if (isObject(cumulusConfig[n])) {
|
24 |
|
25 | cumulusConfig[n] = fixCumulusMessageSyntax(cumulusConfig[n]);
|
26 | } else if (isString(cumulusConfig[n])) {
|
27 | const match = cumulusConfig[n].match(test);
|
28 | if (match) {
|
29 |
|
30 | cumulusConfig[n] = `{${match[0]}}`;
|
31 | }
|
32 | }
|
33 | });
|
34 |
|
35 | return cumulusConfig;
|
36 | }
|
37 |
|
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 |
|
44 |
|
45 |
|
46 | function extractCumulusConfigFromSF(config) {
|
47 | const workflowConfigs = {};
|
48 |
|
49 |
|
50 |
|
51 |
|
52 |
|
53 |
|
54 | Object.keys(config.stepFunctions).forEach((name) => {
|
55 | const sf = config.stepFunctions[name];
|
56 | workflowConfigs[name] = {};
|
57 | Object.keys(sf.States).forEach((n) => {
|
58 | workflowConfigs[name][n] = fixCumulusMessageSyntax(sf.States[n].CumulusConfig);
|
59 | sf.States[n] = omit(sf.States[n], ['CumulusConfig']);
|
60 | });
|
61 |
|
62 | config.stepFunctions[name] = sf;
|
63 | });
|
64 |
|
65 |
|
66 | config.workflowConfigs = workflowConfigs;
|
67 | if (Object.keys(workflowConfigs)) deprecate('CumulusConfig', '1.15.0', 'AWS Parameters with task_config');
|
68 | return config;
|
69 | }
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 |
|
78 |
|
79 | function findOutputValue(outputs, key) {
|
80 | const output = outputs.find((o) => (o.OutputKey === key));
|
81 | if (output) return output.OutputValue;
|
82 | return undefined;
|
83 | }
|
84 |
|
85 |
|
86 |
|
87 |
|
88 |
|
89 |
|
90 |
|
91 |
|
92 |
|
93 | function generateWorkflowTemplate(config, outputs) {
|
94 |
|
95 | const cmrPassword = findOutputValue(outputs, 'EncryptedCmrPassword');
|
96 | const cmr = Object.assign({}, config.cmr, { password: cmrPassword });
|
97 |
|
98 | const launchpadPassphrase = findOutputValue(outputs, 'EncryptedLaunchpadPassphrase');
|
99 | const launchpad = Object.assign({}, config.launchpad, { passphrase: launchpadPassphrase });
|
100 | const bucket = get(config, 'system_bucket');
|
101 |
|
102 | // add queues
|
103 | const queues = {};
|
104 | const queueExecutionLimits = {};
|
105 | if (config.sqs) {
|
106 | const queueArns = outputs.filter((o) => o.OutputKey.includes('SQSOutput'));
|
107 |
|
108 | queueArns.forEach((queue) => {
|
109 | const queueName = queue.OutputKey.replace('SQSOutput', '');
|
110 | const queueUrl = queue.OutputValue;
|
111 |
|
112 | queues[queueName] = queueUrl;
|
113 |
|
114 | const maxExecutions = get(config.sqs, `${queueName}.maxExecutions`);
|
115 | if (maxExecutions) {
|
116 | queueExecutionLimits[queueName] = maxExecutions;
|
117 | }
|
118 | });
|
119 | }
|
120 |
|
121 | const template = {
|
122 | cumulus_meta: {
|
123 | message_source: 'sfn',
|
124 | system_bucket: bucket,
|
125 | state_machine: null,
|
126 | execution_name: null,
|
127 | workflow_start_time: null
|
128 | },
|
129 | meta: {
|
130 | workflow_name: null,
|
131 | workflow_tasks: {},
|
132 | stack: config.stackName,
|
133 | buckets: config.buckets,
|
134 | cmr,
|
135 | launchpad,
|
136 | distribution_endpoint: config.distribution_endpoint,
|
137 | collection: {},
|
138 | provider: {},
|
139 | template: `s3:
|
140 | queues,
|
141 | queueExecutionLimits
|
142 | },
|
143 | payload: {},
|
144 | exception: null
|
145 | };
|
146 |
|
147 | return template;
|
148 | }
|
149 |
|
150 |
|
151 |
|
152 |
|
153 |
|
154 |
|
155 |
|
156 |
|
157 |
|
158 |
|
159 |
|
160 | async function generateTemplates(config, outputs, uploader) {
|
161 |
|
162 | if (config.stepFunctions) {
|
163 | const bucket = config.system_bucket;
|
164 | const stack = config.stackName;
|
165 |
|
166 |
|
167 | const template = generateWorkflowTemplate(config, outputs);
|
168 | console.log('Uploading Cumulus Universal Workflow Message Template ...');
|
169 | const key = `${stack}/workflow_template.json`;
|
170 | await uploader(bucket, key, JSON.stringify(template));
|
171 |
|
172 |
|
173 |
|
174 |
|
175 | const workflowUploads = Object.keys(config.stepFunctions).map((name) => {
|
176 | const arn = findOutputValue(outputs, `${name}StateMachine`);
|
177 | return uploader(bucket, `${stack}/workflows/${name}.json`, JSON.stringify({
|
178 | name,
|
179 | arn,
|
180 | definition: config.stepFunctions[name]
|
181 | }));
|
182 | });
|
183 | await Promise.all(workflowUploads);
|
184 |
|
185 |
|
186 | await uploader(bucket, `${stack}/workflows/buckets.json`, JSON.stringify(config.buckets));
|
187 | }
|
188 | }
|
189 |
|
190 | module.exports = {
|
191 | fixCumulusMessageSyntax,
|
192 | extractCumulusConfigFromSF,
|
193 | findOutputValue,
|
194 | generateWorkflowTemplate,
|
195 | generateTemplates
|
196 | };
|