UNPKG

13.7 kBJavaScriptView Raw
1const path = require('path')
2const fsEx = require('fs-extra')
3const jsonlint = require('jsonlint')
4const BackendProcess = require('../app/backend/BackendProcess')
5const ExtensionConfigWatcher = require('../app/ExtensionConfigWatcher')
6const AttachedExtensionsWatcher = require('../app/AttachedExtensionsWatcher')
7const { extensionConfigChanged } = require('../utils/EventHandler')
8const logger = require('../logger')
9const PipelineWatcher = require('../app/backend/PipelineWatcher')
10const CliProxy = require('../app/backend/CliProxy')
11const utils = require('../utils/utils')
12const StepExecutor = require('../app/backend/extensionRuntime/StepExecutor')
13// @ts-check
14
15const { SETTINGS_FOLDER, EXTENSIONS_FOLDER, PIPELINES_FOLDER, TRUSTED_PIPELINES_FOLDER } = require('../app/Constants')
16
17let reconnecting = false
18
19class BackendAction {
20 /**
21 * @param {Command} caporal
22 * @param {Internal.AppSettings} appSettings
23 * @param {Internal.UserSettings} userSettings
24 * @param {Internal.DcHttpClient} dcHttpClient
25 */
26 static register (caporal, appSettings, userSettings, dcHttpClient) {
27 caporal
28 .command('backend start')
29 .option('--inspect', 'allow inspection tools to connect for the debugging of backend extensions')
30 .description('Establish a connection to the development system')
31 .action(async (args, options) => {
32 // Get SIGINT on windows (for CYGWIN and CMD)
33 if (/^win/.test(process.platform)) {
34 const rl = require('readline').createInterface({
35 input: process.stdin,
36 output: process.stdout
37 })
38
39 rl.on('SIGINT', () => {
40 process.emit('SIGINT')
41 })
42 }
43
44 try {
45 await new BackendAction(appSettings, userSettings, dcHttpClient).run(options)
46 } catch (err) /* istanbul ignore next */ {
47 logger.error(err.message)
48 process.exit(1)
49 }
50 })
51 }
52
53 /**
54 * @param {Internal.AppSettings} appSettings
55 * @param {Internal.UserSettings} userSettings
56 * @param {Internal.DcHttpClient} dcHttpClient
57 */
58 constructor (appSettings, userSettings, dcHttpClient) {
59 this.appSettings = appSettings
60 this.userSettings = userSettings
61 this.dcHttpClient = dcHttpClient
62 this.pipelines = {}
63
64 this.settingsFolder = path.join(this.appSettings.getApplicationFolder(), SETTINGS_FOLDER)
65 this.pipelinesFolder = path.join(this.appSettings.getApplicationFolder(), PIPELINES_FOLDER)
66 this.trustedPipelinesFolder = path.join(this.appSettings.getApplicationFolder(), TRUSTED_PIPELINES_FOLDER)
67 }
68
69 /**
70 * @param {Object} cliOptions
71 * @param {Boolean} cliOptions.inspect
72 */
73 async run (cliOptions) {
74 await this.appSettings.validate()
75 await this.userSettings.validate()
76
77 const pid = await utils.getProcessId('backend', this.settingsFolder)
78 if (pid) throw new Error(`Backend process is already running with pid: ${pid}. Please quit this process first.`)
79
80 this.backendProcess = new BackendProcess(
81 this.userSettings,
82 logger,
83 new StepExecutor(logger, this.appSettings, this.dcHttpClient, Boolean(cliOptions.inspect))
84 )
85
86 await this.writeExtensionConfigs()
87 this.extensionConfigWatcher = new ExtensionConfigWatcher(this.appSettings)
88 this.pipelineWatcher = new PipelineWatcher(this.appSettings)
89 this.attachedExtensionsWatcher = new AttachedExtensionsWatcher(this.appSettings)
90 this.cliProxy = new CliProxy(this.appSettings, logger)
91 await this._startSubProcess()
92 this.backendProcess.on('reconnect', async () => {
93 if (reconnecting) return logger.debug('already reconnecting ...')
94
95 const appId = await this.appSettings.getId()
96
97 logger.debug('reconnecting ...')
98 reconnecting = true
99 await this.backendProcess.selectApplication(appId)
100 await this.backendProcess.resetHooks()
101 await this.backendProcess.resetPipelines()
102 await this._uploadPipelinesOfAttachedExtenstions()
103 await this.pushHooks()
104 await this._attachAllExtensions()
105 await Promise.all([
106 utils.writeLocalPipelines(this.appSettings, this.dcHttpClient, appId, this.pipelinesFolder, this.trustedPipelinesFolder),
107 this.backendProcess.reloadPipelineController()
108 ])
109 reconnecting = false
110 })
111
112 process.on('SIGINT', async () => {
113 let exitCode = 0
114 try {
115 await this._stop()
116 } catch (err) {
117 logger.error(err)
118 exitCode = 1
119 }
120
121 await utils.deleteProcessFile('backend', this.settingsFolder)
122
123 process.exit(exitCode)
124 })
125 }
126
127 async writeExtensionConfigs () {
128 return utils.writeExtensionConfigs(this.appSettings, this.dcHttpClient)
129 }
130
131 async pushHooks () {
132 return utils.pushHooks(this.appSettings, this.dcHttpClient)
133 }
134
135 async _stop () {
136 this.pipelineWatcher.close()
137 await Promise.all([
138 this.extensionConfigWatcher.stop(),
139 this.attachedExtensionsWatcher.stop(),
140 this.backendProcess.disconnect(),
141 this.cliProxy.close()
142 ])
143 logger.info('SDK connection closed')
144 }
145
146 /**
147 * @param {Object} event
148 * @param {string} file
149 */
150 async _pipelineEvent (event, file) {
151 const extensionFolder = path.join(this.appSettings.getApplicationFolder(), EXTENSIONS_FOLDER, path.sep)
152 if (!this.pipelines[file]) this.pipelines[file] = {}
153 if (this.pipelines[file].queued) return
154 this.pipelines[file].queued = true
155
156 // check if the extension of the pipeline is attached
157 const attachedExtensions = this.attachedExtensionsWatcher.attachedExtensions
158 const pathOfChangedExtension = file.replace(extensionFolder, '').split(path.sep)[0]
159 let changedExtension = attachedExtensions.filter((extension) => { return extension.path === pathOfChangedExtension })
160 if (changedExtension.length === 0) return logger.debug('The extension of the pipeline is not attached --> skip')
161
162 changedExtension = changedExtension[0]
163 setTimeout(async () => {
164 this.pipelines[file].queued = false
165 try {
166 if (!await fsEx.pathExists(file)) {
167 await this._pipelineRemoved(file, changedExtension.trusted)
168 return
169 }
170 } catch (err) {
171 return logger.error(err)
172 }
173 // pipelines was changed
174 try {
175 await this._pipelineChanged(file, changedExtension.trusted)
176 } catch (err) {
177 if (err.code === 'PIPELINE_INVALID') {
178 const messageObj = JSON.parse(err.message)
179 messageObj.errors.forEach(error => {
180 logger.error({ field: error.field }, `Pipeline invalid: ${error.message}`)
181 })
182 return
183 }
184
185 logger.error(`Error while uploading pipeline '${file}': ${err.message}`)
186 if (err.code === 'STEP_NOT_FOUND') logger.warn('Check if the extension containing this step is attached')
187 }
188 }, 500)
189 }
190
191 async _startSubProcess () {
192 const applicationId = await this.appSettings.getId()
193 await this.backendProcess.connect()
194 await this.backendProcess.selectApplication(applicationId)
195 await this.backendProcess.resetHooks()
196 await this.backendProcess.resetPipelines()
197 await this._uploadPipelinesOfAttachedExtenstions()
198 await this.pushHooks()
199 await this.backendProcess.startStepExecutor()
200 await this._attachAllExtensions()
201
202 await utils.writeLocalPipelines(this.appSettings, this.dcHttpClient, applicationId, this.pipelinesFolder, this.trustedPipelinesFolder)
203 await this.backendProcess.reloadPipelineController()
204 logger.info('Pipelines are now downloaded locally')
205
206 // watcher
207 await this.pipelineWatcher.start()
208 await this.extensionConfigWatcher.start('backend')
209 await this.attachedExtensionsWatcher.start()
210 this.pipelineWatcher.on('all', (event, file) => this._pipelineEvent(event, file))
211
212 this.extensionConfigWatcher.on('configChange', async (config) => {
213 const valid = await extensionConfigChanged(config, this.appSettings, this.dcHttpClient, this.backendProcess)
214 try {
215 if (valid) await this.backendProcess.reloadPipelineController()
216 } catch (err) {
217 logger.error('Error while activating local pipelines on the remote server')
218 }
219 })
220
221 this.attachedExtensionsWatcher.on('attach', (extensionInfo) => this.backendProcess.attachExtension(extensionInfo))
222 this.attachedExtensionsWatcher.on('detach', (extensionInfo) => this.backendProcess.detachExtension(extensionInfo))
223
224 try {
225 await this.cliProxy.start()
226 await utils.setProcessFile('backend', this.settingsFolder, process.pid)
227 logger.debug('Backend ready')
228 } catch (err) {
229 throw new Error(`Could not start CLI-Proxy ${err}`)
230 }
231 }
232
233 async _pipelineRemoved (file, trusted) {
234 if (!this.pipelines[file] || !this.pipelines[file].id) return
235 const pipelineId = this.pipelines[file].id
236 logger.info(`Start removing pipeline: ${pipelineId}`)
237
238 const appId = await this.appSettings.getId()
239
240 try {
241 await this.dcHttpClient.removePipeline(pipelineId, appId, trusted)
242 logger.info(`Removed pipeline '${pipelineId}'`)
243 await utils.writeLocalPipelines(this.appSettings, this.dcHttpClient, appId, this.pipelinesFolder, this.trustedPipelinesFolder)
244 } catch (err) {
245 throw new Error(`Could not remove pipeline '${pipelineId}': ${err.message}`)
246 }
247 }
248
249 async _pipelineChanged (file, trusted) {
250 const pipelineFileContent = await fsEx.readFile(file, 'utf8')
251 let pipeline = jsonlint.parse(pipelineFileContent)
252
253 if (!pipeline || !pipeline.pipeline || !pipeline.pipeline.id) throw new Error(`invalid pipeline; check the pipeline.id property in ${file}`)
254
255 const fileName = path.basename(file, '.json')
256 if (fileName !== pipeline.pipeline.id) throw new Error(`Pipeline ID "${pipeline.pipeline.id}" should match its file name!`)
257
258 if (!this.pipelines[file]) this.pipelines[file] = {}
259 this.pipelines[file].id = pipeline.pipeline.id
260 logger.debug('Upload pipeline')
261 await this.dcHttpClient.uploadPipeline(pipeline, await this.appSettings.getId(), trusted)
262
263 const appId = await this.appSettings.getId()
264
265 logger.debug('Upload pipeline')
266 await this.dcHttpClient.uploadPipeline(pipeline, appId, trusted)
267
268 logger.debug('Get updated pipelines')
269 await utils.writeLocalPipelines(this.appSettings, this.dcHttpClient, appId, this.pipelinesFolder, this.trustedPipelinesFolder)
270
271 logger.debug('Reload pipelines in the PLC ')
272 await this.backendProcess.reloadPipelineController()
273 logger.info(`Updated pipeline '${file}'`)
274 }
275
276 async _attachAllExtensions () {
277 const attachedExtensions = await this.appSettings.loadAttachedExtensions()
278 const promises = []
279 for (let extensionId in attachedExtensions) {
280 attachedExtensions[extensionId].id = extensionId
281 promises.push(this.backendProcess.attachExtension(attachedExtensions[extensionId]))
282 }
283 await Promise.all(promises)
284 }
285
286 async _uploadPipelinesOfAttachedExtenstions () {
287 logger.info('Start syncing all local pipelines')
288 const attachedExtensions = await this.appSettings.loadAttachedExtensions()
289 const extensionsFolder = path.join(this.appSettings.getApplicationFolder(), EXTENSIONS_FOLDER)
290
291 let pipelines = []
292 let trustedPipelines = []
293 for (let extensionId in attachedExtensions) {
294 const extension = attachedExtensions[extensionId]
295
296 // get list of pipelines of the extension
297 const pipelineFolder = path.join(extensionsFolder, extension.path, 'pipelines')
298
299 let pipelinesFiles = []
300 try {
301 pipelinesFiles = await fsEx.readdir(pipelineFolder)
302 } catch (e) {
303 logger.debug(`Skipping pipeline upload of ${extensionId}: no pipelines folder found`)
304 continue
305 }
306
307 pipelinesFiles = pipelinesFiles.filter((pipelineFile) => { return pipelineFile.endsWith('.json') && !pipelineFile.startsWith('.') })
308 const promises = []
309
310 if (pipelinesFiles < 1) {
311 logger.debug(`Skipping pipeline upload of ${extensionId}: no pipelines in pipeline folder`)
312 continue
313 }
314
315 // read pipelines
316 for (let index in pipelinesFiles) promises.push(fsEx.readJSON(path.join(pipelineFolder, pipelinesFiles[index])))
317
318 // once loaded, validate pipeline IDs vs pipeline file names; should be the same for each
319 const loadedPipelines = await Promise.all(promises)
320 let pipelineFileNameMismatches = []
321 loadedPipelines.forEach((pipeline, index) => {
322 if (pipeline.pipeline.id !== path.basename(pipelinesFiles[index], '.json')) {
323 pipelineFileNameMismatches.push(
324 `Pipeline ID "${pipeline.pipeline.id}" and file name "${path.basename(pipelinesFiles[index], '.json')}" mismatch! ` +
325 'The ID of a pipeline and its file name should be the same.'
326 )
327 }
328 const pipelinePath = path.join(pipelineFolder, pipelinesFiles[index])
329 if (!this.pipelines[pipelinePath]) this.pipelines[pipelinePath] = {}
330 this.pipelines[pipelinePath].id = pipeline.pipeline.id
331 })
332
333 if (pipelineFileNameMismatches.length > 0) throw new Error(pipelineFileNameMismatches.join('\n'))
334
335 // group pipelines by trusted / untrusted
336 if (extension.trusted === true) {
337 trustedPipelines = trustedPipelines.concat(loadedPipelines)
338 } else {
339 pipelines = pipelines.concat(loadedPipelines)
340 }
341 }
342
343 const appId = await this.appSettings.getId()
344
345 await Promise.all([
346 this.dcHttpClient.uploadMultiplePipelines(pipelines, appId, false),
347 this.dcHttpClient.uploadMultiplePipelines(trustedPipelines, appId, true)
348 ])
349 logger.info('All pipelines are in sync now')
350 }
351}
352
353module.exports = BackendAction