UNPKG

14 kBJavaScriptView Raw
1const path = require('path')
2const fsEx = require('fs-extra')
3const jsonlint = require('jsonlint')
4const UserSettings = require('../user/UserSettings')
5const AppSettings = require('../app/AppSettings')
6const BackendProcess = require('../app/backend/BackendProcess')
7const ExtensionConfigWatcher = require('../app/ExtensionConfigWatcher')
8const AttachedExtensionsWatcher = require('../app/AttachedExtensionsWatcher')
9const logger = require('../logger')
10const DcHttpClient = require('../DcHttpClient')
11const PipelineWatcher = require('../app/backend/PipelineWatcher')
12const CliProxy = require('../app/backend/CliProxy')
13const utils = require('../utils/utils')
14const StepExecutor = require('../app/backend/extensionRuntime/StepExecutor')
15
16const { SETTINGS_FOLDER, EXTENSIONS_FOLDER, PIPELINES_FOLDER, TRUSTED_PIPELINES_FOLDER } = require('../app/Constants')
17
18let reconnecting = false
19
20class BackendAction {
21 /**
22 * @param {Command} caporal
23 */
24 static register (caporal) {
25 caporal
26 .command('backend start')
27 .option('--inspect', 'allow inspection tools to connect for the debugging of backend extensions')
28 .description('Establish a connection to the development system')
29 .action((args, options) => { new BackendAction().run(options) })
30 }
31
32 constructor () {
33 this.appSettings = new AppSettings().validate()
34 this.pipelines = {}
35
36 this.settingsFolder = path.join(this.appSettings.getApplicationFolder(), SETTINGS_FOLDER)
37 this.pipelinesFolder = path.join(this.appSettings.getApplicationFolder(), PIPELINES_FOLDER)
38 this.trustedPipelinesFolder = path.join(this.appSettings.getApplicationFolder(), TRUSTED_PIPELINES_FOLDER)
39 this.userSettings = new UserSettings()
40 this.dcClient = new DcHttpClient(this.userSettings, logger)
41 }
42
43 /**
44 * @param {Object} cliOptions
45 * @param {Boolean} cliOptions.inspect
46 */
47 async run (cliOptions) {
48 this.userSettings.validate()
49
50 const pid = await utils.previousProcess('backend', this.settingsFolder)
51 if (pid) throw new Error(`Backend process is already running with pid: ${pid}. Please quit this process first.`)
52
53 this.backendProcess = new BackendProcess(
54 this.userSettings,
55 this.appSettings,
56 logger,
57 new StepExecutor(logger, this.appSettings, Boolean(cliOptions.inspect))
58 )
59
60 await this.writeExtensionConfigs()
61
62 this.extensionConfigWatcher = new ExtensionConfigWatcher(this.appSettings)
63 this.pipelineWatcher = new PipelineWatcher(this.appSettings)
64 this.attachedExtensionsWatcher = new AttachedExtensionsWatcher(this.appSettings)
65 this.cliProxy = new CliProxy(this.appSettings, logger)
66
67 await this._startSubProcess()
68
69 this.backendProcess.on('reconnect', async () => {
70 if (reconnecting) return logger.debug('already reconnecting ...')
71
72 logger.debug('reconnecting ...')
73 reconnecting = true
74 await this.backendProcess.selectApplication(this.appSettings.getId())
75 await this.backendProcess.resetPipelines()
76 await this._uploadPipelinesOfAttachedExtenstions()
77 await this._attachAllExtensions()
78
79 await Promise.all([
80 this._writeLocalPipelines(),
81 this.backendProcess.reloadPipelineController()
82 ])
83 reconnecting = false
84 })
85
86 process.on('SIGINT', async () => {
87 let exitCode = 0
88 try {
89 await this._stop()
90 } catch (err) {
91 logger.error(err)
92 exitCode = 1
93 }
94
95 await utils.deleteProcessFile('backend', this.settingsFolder)
96 process.exit(exitCode)
97 })
98 }
99
100 async writeExtensionConfigs () {
101 const attachedExtensions = this.appSettings.loadAttachedExtensions()
102 const extensionsFolder = path.join(this.appSettings.getApplicationFolder(), EXTENSIONS_FOLDER)
103 const promises = []
104 for (let extensionId in attachedExtensions) {
105 const extensionPath = path.join(extensionsFolder, attachedExtensions[extensionId].path)
106 const config = await fsEx.readJson(path.join(extensionPath, 'extension-config.json'))
107 promises.push(this._updateExtensionConfig({file: config, path: extensionPath}, this.appSettings.getId()))
108 }
109 return Promise.all(promises)
110 }
111
112 async _stop () {
113 this.pipelineWatcher.close()
114 await Promise.all([
115 this.extensionConfigWatcher.stop(),
116 this.attachedExtensionsWatcher.stop(),
117 this.backendProcess.disconnect(),
118 this.cliProxy.close()
119 ])
120 logger.info('SDK connection closed')
121 }
122
123 async _pipelineEvent (event, file) {
124 const extensionFolder = path.join(this.appSettings.getApplicationFolder(), EXTENSIONS_FOLDER, path.sep)
125 if (!this.pipelines[file]) this.pipelines[file] = {}
126 if (this.pipelines[file].queued) return
127 this.pipelines[file].queued = true
128
129 // check if the extension of the pipeline is attached
130 const attachedExtensions = this.attachedExtensionsWatcher.attachedExtensions
131 const pathOfChangedExtension = file.replace(extensionFolder, '').split(path.sep)[0]
132 let changedExtension = attachedExtensions.filter((extension) => { return extension.path === pathOfChangedExtension })
133 if (changedExtension.length === 0) return logger.debug('The extension of the pipeline is not attached --> skip')
134
135 changedExtension = changedExtension[0]
136 setTimeout(async () => {
137 this.pipelines[file].queued = false
138 try {
139 if (!await fsEx.pathExists(file)) {
140 await this._pipelineRemoved(file, changedExtension.trusted)
141 return
142 }
143 } catch (err) {
144 return logger.error(err)
145 }
146 // pipelines was changed
147 try {
148 await this._pipelineChanged(file, changedExtension.trusted)
149 } catch (err) {
150 if (err.code === 'PIPELINE_INVALID') {
151 const messageObj = JSON.parse(err.message)
152 messageObj.errors.forEach(error => {
153 logger.error({field: error.field}, `Pipeline invalid: ${error.message}`)
154 })
155 return
156 }
157
158 logger.error(`Error while uploading pipeline '${file}': ${err.message}`)
159 if (err.code === 'STEP_NOT_FOUND') logger.warn('Check if the extension containing this step is attached')
160 }
161 }, 500)
162 }
163
164 async _startSubProcess () {
165 const applicationId = this.appSettings.getId()
166 await this.backendProcess.connect()
167 await this.backendProcess.selectApplication(applicationId)
168 await this.backendProcess.resetPipelines()
169 await this._uploadPipelinesOfAttachedExtenstions()
170 await this.backendProcess.startStepExecutor()
171 await this._attachAllExtensions()
172
173 await Promise.all([
174 this._writeLocalPipelines(),
175 this.backendProcess.reloadPipelineController()
176 ])
177 logger.info('Pipelines are now downloaded locally')
178
179 // watcher
180 await this.pipelineWatcher.start()
181 await this.extensionConfigWatcher.start()
182 await this.attachedExtensionsWatcher.start()
183 this.pipelineWatcher.on('all', (event, file) => this._pipelineEvent(event, file))
184 this.extensionConfigWatcher.on('configChange', (config) => this._updateExtensionConfig(config))
185 this.attachedExtensionsWatcher.on('attach', (extensionInfo) => this.backendProcess.attachExtension(extensionInfo))
186 this.attachedExtensionsWatcher.on('detach', (extensionInfo) => this.backendProcess.detachExtension(extensionInfo))
187
188 try {
189 await this.cliProxy.start()
190 await utils.setProcessFile('backend', this.settingsFolder, process.pid)
191 } catch (err) {
192 throw new Error(`Could not start CLI-Proxy ${err}`)
193 }
194 }
195
196 async _pipelineRemoved (file, trusted) {
197 if (!this.pipelines[file] || !this.pipelines[file].id) return
198 const pipelineId = this.pipelines[file].id
199 logger.info(`Start removing pipeline: ${pipelineId}`)
200
201 try {
202 await this.dcClient.removePipeline(pipelineId, this.appSettings.getId())
203 logger.info(`Removed pipeline '${pipelineId}'`)
204 await this._writeLocalPipelines()
205 } catch (err) {
206 throw new Error(`Could not remove pipeline '${pipelineId}': ${err.message}`)
207 }
208 }
209
210 async _writeLocalPipelines () {
211 await Promise.all([
212 (async () => {
213 const pipelines = await this.dcClient.downloadPipelines(this.appSettings.getId(), false)
214 await this._writePipelines(pipelines, this.pipelinesFolder)
215 })(),
216 (async () => {
217 // download trusted pipelines
218 const pipelines = await this.dcClient.downloadPipelines(this.appSettings.getId(), true)
219 await this._writePipelines(pipelines, this.trustedPipelinesFolder)
220 })()
221 ])
222 }
223
224 /**
225 * @param {string[]} pipelines
226 * @param {string} folder
227 */
228 async _writePipelines (pipelines, folder) {
229 await fsEx.emptyDir(folder)
230 const promises = []
231 pipelines.forEach((pipeline) => {
232 const file = path.join(folder, `${pipeline.pipeline.id}.json`)
233 promises.push(fsEx.writeJson(file, pipeline, {spaces: 2}))
234 })
235 await Promise.all(pipelines)
236 }
237
238 async _pipelineChanged (file, trusted) {
239 const pipelineFileContent = await fsEx.readFile(file, 'utf8')
240 let pipeline = jsonlint.parse(pipelineFileContent)
241
242 if (!pipeline || !pipeline.pipeline || !pipeline.pipeline.id) throw new Error(`invalid pipeline; check the pipeline.id property in ${file}`)
243
244 const fileName = path.basename(file, '.json')
245 if (fileName !== pipeline.pipeline.id) throw new Error(`Pipeline ID "${pipeline.pipeline.id}" should match its file name!`)
246
247 if (!this.pipelines[file]) this.pipelines[file] = {}
248 this.pipelines[file].id = pipeline.pipeline.id
249 logger.debug('Upload pipeline')
250 await this.dcClient.uploadPipeline(pipeline, this.appSettings.getId(), trusted)
251
252 // update pipeline in local pipelines folder
253 logger.debug('Update pipeline file in pipelines folder')
254 const pipelinePath = path.join(this.appSettings.getApplicationFolder(), (trusted) ? TRUSTED_PIPELINES_FOLDER : PIPELINES_FOLDER, `${pipeline.pipeline.id}.json`)
255 await fsEx.writeJSON(pipelinePath, pipeline, {spaces: 2})
256
257 logger.debug('Reload pipelines in the PLC ')
258 await this.backendProcess.reloadPipelineController()
259 logger.info(`Updated pipeline '${file}'`)
260 }
261
262 async _updateExtensionConfig (config) {
263 try {
264 const extConfig = await this.dcClient.generateExtensionConfig(config.file, this.appSettings.getId())
265
266 if (extConfig.backend) {
267 await fsEx.outputJson(path.join(config.path, 'extension', 'config.json'), extConfig.backend, {spaces: 2})
268 logger.info(`Updated extension config for backend ${config.file.id}`)
269 }
270 if (extConfig.frontend) {
271 await fsEx.outputJson(path.join(config.path, 'frontend', 'config.json'), extConfig.frontend, {spaces: 2})
272 logger.info(`Updated extension config for frontend ${config.file.id}`)
273 }
274 } catch (err) {
275 logger.error(new Error(`Could not generate config for '${config.file.id}': ${err.message}`))
276 logger.debug(err)
277 }
278 }
279
280 async _attachAllExtensions () {
281 const attachedExtensions = this.appSettings.loadAttachedExtensions()
282 const promises = []
283 for (let extensionId in attachedExtensions) {
284 attachedExtensions[extensionId].id = extensionId
285 promises.push(this.backendProcess.attachExtension(attachedExtensions[extensionId]))
286 }
287 await Promise.all(promises)
288 }
289
290 async _uploadPipelinesOfAttachedExtenstions () {
291 logger.info('Start syncing all local pipelines')
292 const attachedExtensions = this.appSettings.loadAttachedExtensions()
293 const extensionsFolder = path.join(this.appSettings.getApplicationFolder(), EXTENSIONS_FOLDER)
294
295 let pipelines = []
296 let trustedPipelines = []
297 for (let extensionId in attachedExtensions) {
298 const extension = attachedExtensions[extensionId]
299
300 // get list of pipelines of the extension
301 const pipelineFolder = path.join(extensionsFolder, extension.path, 'pipelines')
302
303 let pipelinesFiles
304 try {
305 pipelinesFiles = await fsEx.readdir(pipelineFolder)
306 } catch (e) {
307 logger.debug(`Skipping pipeline upload of ${extensionId}: no pipelines folder found`)
308 continue
309 }
310
311 pipelinesFiles = pipelinesFiles.filter((pipelineFile) => { return pipelineFile.endsWith('.json') && !pipelineFile.startsWith('.') })
312 const promises = []
313
314 if (pipelinesFiles < 1) {
315 logger.debug(`Skipping pipeline upload of ${extensionId}: no pipelines in pipeline folder`)
316 continue
317 }
318
319 // read pipelines
320 for (let index in pipelinesFiles) promises.push(fsEx.readJSON(path.join(pipelineFolder, pipelinesFiles[index])))
321
322 // once loaded, validate pipeline IDs vs pipeline file names; should be the same for each
323 const loadedPipelines = await Promise.all(promises)
324 let pipelineFileNameMismatches = []
325 loadedPipelines.forEach((pipeline, index) => {
326 if (pipeline.pipeline.id !== path.basename(pipelinesFiles[index], '.json')) {
327 pipelineFileNameMismatches.push(
328 `Pipeline ID "${pipeline.pipeline.id}" and file name "${path.basename(pipelinesFiles[index], '.json')}" mismatch! ` +
329 'The ID of a pipeline and its file name should be the same.'
330 )
331 }
332 const pipelinePath = path.join(pipelineFolder, pipelinesFiles[index])
333 if (!this.pipelines[pipelinePath]) this.pipelines[pipelinePath] = {}
334 this.pipelines[pipelinePath].id = pipeline.pipeline.id
335 })
336
337 if (pipelineFileNameMismatches.length > 0) throw new Error(pipelineFileNameMismatches.join('\n'))
338
339 // group pipelines by trusted / untrusted
340 if (extension.trusted === true) {
341 trustedPipelines = trustedPipelines.concat(loadedPipelines)
342 } else {
343 pipelines = pipelines.concat(loadedPipelines)
344 }
345 }
346 await Promise.all([
347 this.dcClient.uploadMultiplePipelines(pipelines, this.appSettings.getId(), false),
348 this.dcClient.uploadMultiplePipelines(trustedPipelines, this.appSettings.getId(), true)
349 ])
350 logger.info('All pipelines are in sync now')
351 }
352}
353
354module.exports = BackendAction