1 | 'use strict';
|
2 |
|
3 | const get = require('lodash.get');
|
4 | const isObject = require('lodash.isobject');
|
5 | const isString = require('lodash.isstring');
|
6 | const omit = require('lodash.omit');
|
7 |
|
8 | /**
|
9 | * Because both kes and message adapter use Mustache for templating,
|
10 | * we add curly brackets to items that are using the [$] and {$} syntax
|
11 | * to produce {{$}} and {[$]}
|
12 | *
|
13 | * @param {Object} cumulusConfig - the CumulusConfig portion of a task definition
|
14 | * @returns {Object} updated CumulusConfig
|
15 | */
|
16 | function fixCumulusMessageSyntax(cumulusConfig) {
|
17 | if (!cumulusConfig) return {};
|
18 |
|
19 | const test = new RegExp('^([\\{]{1}|[\\[]{1})(\\$.*)([\\]]{1}|[\\}]{1})$');
|
20 |
|
21 | Object.keys(cumulusConfig).forEach((n) => {
|
22 | if (isObject(cumulusConfig[n])) {
|
23 |
|
24 | cumulusConfig[n] = fixCumulusMessageSyntax(cumulusConfig[n]);
|
25 | } else if (isString(cumulusConfig[n])) {
|
26 | const match = cumulusConfig[n].match(test);
|
27 | if (match) {
|
28 |
|
29 | cumulusConfig[n] = `{${match[0]}}`;
|
30 | }
|
31 | }
|
32 | });
|
33 |
|
34 | return cumulusConfig;
|
35 | }
|
36 |
|
37 |
|
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 |
|
44 |
|
45 | function extractCumulusConfigFromSF(config) {
|
46 | const workflowConfigs = {};
|
47 |
|
48 |
|
49 |
|
50 |
|
51 |
|
52 |
|
53 | Object.keys(config.stepFunctions).forEach((name) => {
|
54 | const sf = config.stepFunctions[name];
|
55 | workflowConfigs[name] = {};
|
56 | Object.keys(sf.States).forEach((n) => {
|
57 | workflowConfigs[name][n] = fixCumulusMessageSyntax(sf.States[n].CumulusConfig);
|
58 | sf.States[n] = omit(sf.States[n], ['CumulusConfig']);
|
59 | });
|
60 |
|
61 | config.stepFunctions[name] = sf;
|
62 | });
|
63 |
|
64 |
|
65 | config.workflowConfigs = workflowConfigs;
|
66 | return config;
|
67 | }
|
68 |
|
69 |
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 | function findOutputValue(outputs, key) {
|
78 | const output = outputs.find((o) => (o.OutputKey === key));
|
79 | if (output) return output.OutputValue;
|
80 | return undefined;
|
81 | }
|
82 |
|
83 |
|
84 |
|
85 |
|
86 |
|
87 |
|
88 |
|
89 |
|
90 |
|
91 |
|
92 |
|
93 | function generateWorkflowTemplate(name, workflow, config, outputs) {
|
94 |
|
95 | const cmrPassword = findOutputValue(outputs, 'EncryptedCmrPassword');
|
96 | const cmr = Object.assign({}, config.cmr, { password: cmrPassword });
|
97 |
|
98 | const launchpadPassphrase = findOutputValue(outputs, 'EncryptedLaunchpadPassphrase');
|
99 | const launchpad = Object.assign({}, config.launchpad, { passphrase: launchpadPassphrase });
|
100 | const bucket = get(config, 'system_bucket');
|
101 |
|
102 | // add the current workflows' state machine arn
|
103 | const stateMachineArn = findOutputValue(outputs, `${name}StateMachine`);
|
104 |
|
105 | // add queues
|
106 | const queues = {};
|
107 | const queueExecutionLimits = {};
|
108 | if (config.sqs) {
|
109 | const queueArns = outputs.filter((o) => o.OutputKey.includes('SQSOutput'));
|
110 |
|
111 | queueArns.forEach((queue) => {
|
112 | const queueName = queue.OutputKey.replace('SQSOutput', '');
|
113 | const queueUrl = queue.OutputValue;
|
114 |
|
115 | queues[queueName] = queueUrl;
|
116 |
|
117 | const maxExecutions = get(config.sqs, `${queueName}.maxExecutions`);
|
118 | if (maxExecutions) {
|
119 | queueExecutionLimits[queueName] = maxExecutions;
|
120 | }
|
121 | });
|
122 | }
|
123 |
|
124 | // add the cumulus message config of the current workflow
|
125 | const workflowConfig = {};
|
126 | const states = get(workflow, 'States', {});
|
127 | Object.keys(states).forEach((state) => {
|
128 | workflowConfig[state] = config.workflowConfigs[name][state];
|
129 | });
|
130 |
|
131 | // add the s3 uri to all the workflow templates for the current stack
|
132 | const templatesUris = {};
|
133 | const stepFunctions = get(config, 'stepFunctions', {});
|
134 | Object.keys(stepFunctions).forEach((sf) => {
|
135 | templatesUris[sf] = `s3:
|
136 | });
|
137 |
|
138 | const template = {
|
139 | cumulus_meta: {
|
140 | message_source: 'sfn',
|
141 | system_bucket: bucket,
|
142 | state_machine: stateMachineArn,
|
143 | execution_name: null,
|
144 | workflow_start_time: null
|
145 | },
|
146 | meta: {
|
147 | workflow_name: name,
|
148 | workflow_tasks: {},
|
149 | stack: config.stackName,
|
150 | buckets: config.buckets,
|
151 | cmr,
|
152 | launchpad,
|
153 | distribution_endpoint: config.distribution_endpoint,
|
154 | collection: {},
|
155 | provider: {},
|
156 | templates: templatesUris,
|
157 | queues,
|
158 | queueExecutionLimits
|
159 | },
|
160 | workflow_config: workflowConfig,
|
161 | payload: {},
|
162 | exception: null
|
163 | };
|
164 |
|
165 | return template;
|
166 | }
|
167 |
|
168 |
|
169 |
|
170 |
|
171 |
|
172 |
|
173 |
|
174 |
|
175 |
|
176 |
|
177 |
|
178 | async function generateTemplates(config, outputs, uploader) {
|
179 |
|
180 | if (config.stepFunctions) {
|
181 | const bucket = config.system_bucket;
|
182 | const stack = config.stackName;
|
183 | const templates = Object.keys(config.stepFunctions)
|
184 | .map((name) => generateWorkflowTemplate(name, config.stepFunctions[name], config, outputs));
|
185 |
|
186 |
|
187 | const workflows = [];
|
188 | console.log('Uploading Cumulus Message Templates for each Workflow ...');
|
189 | for (let ctr = 0; ctr < templates.length; ctr += 1) {
|
190 | const t = templates[ctr];
|
191 | const name = t.meta.workflow_name;
|
192 | const key = `${stack}/workflows/${name}.json`;
|
193 | await uploader(bucket, key, JSON.stringify(t));
|
194 | workflows.push({
|
195 | name,
|
196 | template: `s3://${bucket}/${key}`,
|
197 | definition: config.stepFunctions[name]
|
198 | });
|
199 | }
|
200 |
|
201 |
|
202 |
|
203 |
|
204 | await uploader(bucket, `${stack}/workflows/list.json`, JSON.stringify(workflows));
|
205 |
|
206 |
|
207 | await uploader(bucket, `${stack}/workflows/buckets.json`, JSON.stringify(config.buckets));
|
208 | }
|
209 | }
|
210 |
|
211 | module.exports = {
|
212 | fixCumulusMessageSyntax,
|
213 | extractCumulusConfigFromSF,
|
214 | findOutputValue,
|
215 | generateWorkflowTemplate,
|
216 | generateTemplates
|
217 | };
|