UNPKG

18.8 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright © 2019 Atomist, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
18 return new (P || (P = Promise))(function (resolve, reject) {
19 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
20 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
21 function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
22 step((generator = generator.apply(thisArg, _arguments || [])).next());
23 });
24};
25Object.defineProperty(exports, "__esModule", { value: true });
26const automation_client_1 = require("@atomist/automation-client");
27const sdm_1 = require("@atomist/sdm");
28const k8s = require("@kubernetes/client-node");
29const cluster = require("cluster");
30const _ = require("lodash");
31const os = require("os");
32const config_1 = require("./config");
33const service_1 = require("./service");
34/**
35 * GoalScheduler implementation that schedules SDM goals inside k8s jobs.
36 *
37 * It reuses the podSpec of the deployed SDM to create a new jobSpec from.
38 * Subclasses may change the spec and job creation behavior by overwriting beforeCreation
39 * and/or afterCreation methods.
40 */
41class KubernetesGoalScheduler {
42 constructor(options = { isolateAll: false }) {
43 this.options = options;
44 }
45 supports(gi) {
46 return __awaiter(this, void 0, void 0, function* () {
47 return !process.env.ATOMIST_ISOLATED_GOAL &&
48 (
49 // Goal is marked as isolated and SDM is configured to use k8s jobs
50 (gi.goal.definition.isolated && isConfiguredInEnv("kubernetes")) ||
51 // Force all goals to run isolated via env var
52 isConfiguredInEnv("kubernetes-all") ||
53 // Force all goals to run isolated via explicit option
54 (this.options.isolateAll && isConfiguredInEnv("kubernetes")) ||
55 // Force all goals to run isolated via explicit configuration
56 _.get(gi.configuration, "sdm.k8s.isolateAll", false) === true);
57 });
58 }
59 schedule(gi) {
60 return __awaiter(this, void 0, void 0, function* () {
61 const { goalEvent } = gi;
62 const podNs = process.env.ATOMIST_POD_NAMESPACE || process.env.ATOMIST_DEPLOYMENT_NAMESPACE || "default";
63 const kc = config_1.loadKubeConfig();
64 const batch = kc.makeApiClient(k8s.Batch_v1Api);
65 const jobSpec = createJobSpec(_.cloneDeep(this.podSpec), podNs, gi);
66 yield this.beforeCreation(gi, jobSpec);
67 gi.progressLog.write(`/--`);
68 gi.progressLog.write(`Scheduling k8s job '${jobSpec.metadata.namespace}:${jobSpec.metadata.name}' for goal '${goalEvent.name} (${goalEvent.uniqueName})'`);
69 gi.progressLog.write("\\--");
70 try {
71 // Check if this job was previously launched
72 yield batch.readNamespacedJob(jobSpec.metadata.name, jobSpec.metadata.namespace);
73 automation_client_1.logger.debug(`k8s job '${jobSpec.metadata.namespace}:${jobSpec.metadata.name}' for goal '${goalEvent.uniqueName}' already exists. Deleting...`);
74 try {
75 yield batch.deleteNamespacedJob(jobSpec.metadata.name, jobSpec.metadata.namespace, {});
76 automation_client_1.logger.debug(`k8s job '${jobSpec.metadata.namespace}:${jobSpec.metadata.name}' for goal '${goalEvent.uniqueName}' deleted`);
77 }
78 catch (e) {
79 automation_client_1.logger.error(`Failed to delete k8s job '${jobSpec.metadata.namespace}:${jobSpec.metadata.name}' for goal '${goalEvent.uniqueName}': ${JSON.stringify(e.body)}`);
80 return {
81 code: 1,
82 message: `Failed to delete k8s job '${jobSpec.metadata.namespace}:${jobSpec.metadata.name}' ` +
83 `for goal '${goalEvent.uniqueName}': ${prettyPrintError(e)}`,
84 };
85 }
86 }
87 catch (e) {
88 // This is ok to ignore as it just means the job doesn't exist
89 }
90 try {
91 // Previous deletion might not have completed; hence the retry here
92 const jobResult = (yield automation_client_1.doWithRetry(() => batch.createNamespacedJob(jobSpec.metadata.namespace, jobSpec), `Scheduling k8s job '${jobSpec.metadata.namespace}:${jobSpec.metadata.name}' for goal '${goalEvent.uniqueName}'`)).body;
93 yield this.afterCreation(gi, jobResult);
94 automation_client_1.logger.info(`Scheduled k8s job '${jobSpec.metadata.namespace}:${jobSpec.metadata.name}' for goal '${goalEvent.uniqueName}' with result: ${JSON.stringify(jobResult.status)}`);
95 automation_client_1.logger.log("silly", JSON.stringify(jobResult));
96 }
97 catch (e) {
98 automation_client_1.logger.error(`Failed to schedule k8s job '${jobSpec.metadata.namespace}:${jobSpec.metadata.name}' for goal '${goalEvent.uniqueName}': ${JSON.stringify(e.body)}`);
99 return {
100 code: 1,
101 message: `Failed to schedule k8s job '${jobSpec.metadata.namespace}:${jobSpec.metadata.name}' ` +
102 `for goal '${goalEvent.uniqueName}': ${prettyPrintError(e)}`,
103 };
104 }
105 yield gi.progressLog.flush();
106 });
107 }
108 /**
109 * Extension point for sub classes to modify k8s resources or provided jobSpec before the
110 * Job gets created in k8s.
111 * Note: A potentially existing job with the same name has already been deleted at this point.
112 * @param gi
113 * @param jobSpec
114 */
115 beforeCreation(gi, jobSpec) {
116 return __awaiter(this, void 0, void 0, function* () {
117 // Intentionally left empty
118 });
119 }
120 /**
121 * Extension point for sub classes to modify k8s resources after the job has been created.
122 * The provided jobSpec contains the result of the job creation API call.
123 * @param gi
124 * @param jobSpec
125 */
126 afterCreation(gi, jobSpec) {
127 return __awaiter(this, void 0, void 0, function* () {
128 // Intentionally left empty
129 });
130 }
131 initialize(configuration) {
132 return __awaiter(this, void 0, void 0, function* () {
133 const podName = process.env.ATOMIST_POD_NAME || os.hostname();
134 const podNs = process.env.ATOMIST_POD_NAMESPACE || process.env.ATOMIST_DEPLOYMENT_NAMESPACE || "default";
135 try {
136 const kc = new k8s.KubeConfig();
137 kc.loadFromCluster();
138 const core = kc.makeApiClient(k8s.Core_v1Api);
139 this.podSpec = (yield core.readNamespacedPod(podName, podNs)).body;
140 }
141 catch (e) {
142 automation_client_1.logger.error(`Failed to obtain parent pod spec from k8s: ${prettyPrintError(e)}`);
143 throw new Error(`Failed to obtain parent pod spec from k8s: ${prettyPrintError(e)}`);
144 }
145 if (configuration.cluster.enabled === false || cluster.isMaster) {
146 setInterval(() => {
147 return this.cleanUp()
148 .then(() => {
149 automation_client_1.logger.debug("Finished cleaning scheduled goal jobs");
150 });
151 }, automation_client_1.configurationValue("sdm.k8s.job.cleanupInterval", 1000 * 60 * 60 * 2)).unref();
152 }
153 });
154 }
155 /**
156 * Extension point to allow for custom clean up logic.
157 */
158 cleanUp() {
159 return __awaiter(this, void 0, void 0, function* () {
160 return cleanCompletedJobs();
161 });
162 }
163}
164exports.KubernetesGoalScheduler = KubernetesGoalScheduler;
165/**
166 * Cleanup scheduled k8s goal jobs
167 * @returns {Promise<void>}
168 */
169function cleanCompletedJobs() {
170 return __awaiter(this, void 0, void 0, function* () {
171 const selector = `creator=${sanitizeName(automation_client_1.configurationValue("name"))}`;
172 const jobs = yield listJobs(selector);
173 const completedJobs = jobs.filter(j => j.status && j.status.completionTime && j.status.succeeded && j.status.succeeded > 0);
174 if (completedJobs.length > 0) {
175 automation_client_1.logger.info(`Deleting the following k8s jobs: ${completedJobs.map(j => `${j.metadata.namespace}:${j.metadata.name}`).join(", ")}`);
176 const kc = config_1.loadKubeConfig();
177 const batch = kc.makeApiClient(k8s.Batch_v1Api);
178 for (const completedSdmJob of completedJobs) {
179 try {
180 yield batch.deleteNamespacedJob(completedSdmJob.metadata.name, completedSdmJob.metadata.namespace,
181 // propagationPolicy is needed so that pods of the job are also getting deleted
182 { propagationPolicy: "Foreground" });
183 }
184 catch (e) {
185 automation_client_1.logger.warn(`Failed to delete k8s job '${completedSdmJob.metadata.namespace}:${completedSdmJob.metadata.name}': ` +
186 prettyPrintError(e));
187 }
188 }
189 }
190 });
191}
192exports.cleanCompletedJobs = cleanCompletedJobs;
193/**
194 * Create a jobSpec by modifying the provided podSpec
195 * @param podSpec
196 * @param podNs
197 * @param gi
198 * @param context
199 */
200function createJobSpec(podSpec, podNs, gi) {
201 const { goalEvent, context } = gi;
202 const goalName = goalEvent.uniqueName.split("#")[0].toLowerCase();
203 const jobSpec = createJobSpecWithAffinity(podSpec, gi);
204 jobSpec.metadata.name = `${podSpec.spec.containers[0].name}-job-${goalEvent.goalSetId.slice(0, 7)}-${goalName}`;
205 jobSpec.metadata.namespace = podNs;
206 jobSpec.spec.template.spec.restartPolicy = "Never";
207 jobSpec.spec.template.spec.containers[0].name = jobSpec.metadata.name;
208 jobSpec.spec.template.spec.containers[0].env.push({
209 name: "ATOMIST_JOB_NAME",
210 value: jobSpec.metadata.name,
211 }, {
212 name: "ATOMIST_REGISTRATION_NAME",
213 value: `${automation_client_1.automationClientInstance().configuration.name}-job-${goalEvent.goalSetId.slice(0, 7)}-${goalName}`,
214 }, {
215 name: "ATOMIST_GOAL_TEAM",
216 value: context.workspaceId,
217 }, {
218 name: "ATOMIST_GOAL_TEAM_NAME",
219 value: context.context.workspaceName,
220 }, {
221 name: "ATOMIST_GOAL_ID",
222 value: goalEvent.id,
223 }, {
224 name: "ATOMIST_GOAL_SET_ID",
225 value: goalEvent.goalSetId,
226 }, {
227 name: "ATOMIST_GOAL_UNIQUE_NAME",
228 value: goalEvent.uniqueName,
229 }, {
230 name: "ATOMIST_CORRELATION_ID",
231 value: context.correlationId,
232 }, {
233 name: "ATOMIST_ISOLATED_GOAL",
234 value: "true",
235 });
236 rewriteCachePath(jobSpec, context.workspaceId);
237 // Add additional specs from registered services to the job spec
238 if (_.get(gi.configuration, "sdm.k8s.service.enabled", true)) {
239 if (!!goalEvent.data) {
240 let data = {};
241 try {
242 data = JSON.parse(goalEvent.data);
243 }
244 catch (e) {
245 automation_client_1.logger.warn(`Failed to parse goal data on '${goalEvent.uniqueName}'`);
246 }
247 if (!!data[sdm_1.ServiceRegistrationGoalDataKey]) {
248 _.forEach(data[sdm_1.ServiceRegistrationGoalDataKey], (v, k) => {
249 automation_client_1.logger.debug(`Service with name '${k}' and type '${v.type}' found for goal '${goalEvent.uniqueName}'`);
250 if (v.type === service_1.K8sServiceRegistrationType.K8sService) {
251 const spec = v.spec;
252 if (!!spec.container) {
253 const c = Array.isArray(spec.container) ? spec.container : [spec.container];
254 jobSpec.spec.template.spec.containers.push(...c);
255 }
256 if (!!spec.volume) {
257 const vo = Array.isArray(spec.volume) ? spec.volume : [spec.volume];
258 jobSpec.spec.template.spec.volumes.push(...vo);
259 }
260 if (!!spec.volumeMount) {
261 const vm = Array.isArray(spec.volumeMount) ? spec.volumeMount : [spec.volumeMount];
262 jobSpec.spec.template.spec.containers.forEach(c => c.volumeMounts.push(...vm));
263 }
264 }
265 });
266 }
267 }
268 }
269 return jobSpec;
270}
271exports.createJobSpec = createJobSpec;
272/**
273 * Create a k8s Job spec with affinity to jobs for the same goal set
274 * @param goalSetId
275 */
276function createJobSpecWithAffinity(podSpec, gi) {
277 const { goalEvent, configuration, context } = gi;
278 podSpec.spec.affinity = {
279 podAffinity: {
280 preferredDuringSchedulingIgnoredDuringExecution: [
281 {
282 weight: 100,
283 podAffinityTerm: {
284 labelSelector: {
285 matchExpressions: [
286 {
287 key: "atomist.com/goal-set-id",
288 operator: "In",
289 values: [
290 goalEvent.goalSetId,
291 ],
292 },
293 ],
294 },
295 topologyKey: "kubernetes.io/hostname",
296 },
297 },
298 ],
299 },
300 };
301 // Clean up podSpec
302 // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.13/#pod-v1-core note on nodeName
303 delete podSpec.spec.nodeName;
304 const labels = {
305 "atomist.com/goal-set-id": goalEvent.goalSetId,
306 "atomist.com/goal-id": goalEvent.id,
307 "atomist.com/creator": sanitizeName(configuration.name),
308 "atomist.com/workspace-id": context.workspaceId,
309 };
310 const detail = {
311 sdm: {
312 name: configuration.name,
313 version: configuration.version,
314 },
315 goal: {
316 goalId: goalEvent.id,
317 goalSetId: goalEvent.goalSetId,
318 uniqueName: goalEvent.uniqueName,
319 },
320 };
321 const annotations = {
322 "atomist.com/sdm": JSON.stringify(detail),
323 };
324 return {
325 kind: "Job",
326 apiVersion: "batch/v1",
327 metadata: {
328 labels,
329 annotations,
330 },
331 spec: {
332 template: {
333 metadata: {
334 labels,
335 },
336 spec: podSpec.spec,
337 },
338 },
339 };
340}
341/**
342 * Rewrite the volume host path to include the workspace id to prevent cross workspace content ending
343 * up in the same directory.
344 * @param jobSpec
345 * @param workspaceId
346 */
347function rewriteCachePath(jobSpec, workspaceId) {
348 const cachePath = automation_client_1.configurationValue("sdm.cache.path", "/opt/data");
349 const containers = _.get(jobSpec, "spec.template.spec.containers", []);
350 const cacheVolumeNames = [];
351 containers.forEach(c => {
352 cacheVolumeNames.push(...c.volumeMounts.filter(vm => vm.mountPath === cachePath).map(cm => cm.name));
353 });
354 _.uniq(cacheVolumeNames).forEach(vn => {
355 const volume = _.get(jobSpec, "spec.template.spec.volumes", []).find(v => v.name === vn);
356 if (!!volume && !!volume.hostPath && !!volume.hostPath.path) {
357 const path = volume.hostPath.path;
358 if (!path.endsWith(workspaceId) || !path.endsWith(`${workspaceId}/`)) {
359 if (path.endsWith("/")) {
360 volume.hostPath.path = `${path}${workspaceId}`;
361 }
362 else {
363 volume.hostPath.path = `${path}/${workspaceId}`;
364 }
365 }
366 }
367 });
368}
369/**
370 * Checks if one of the provided values is configured in ATOMIST_GOAL_SCHEDULER or -
371 * for backwards compatibility reasons - ATOMIST_GOAL_LAUNCHER.
372 * @param values
373 */
374function isConfiguredInEnv(...values) {
375 const value = process.env.ATOMIST_GOAL_SCHEDULER || process.env.ATOMIST_GOAL_LAUNCHER;
376 if (!!value) {
377 try {
378 const json = JSON.parse(value);
379 if (Array.isArray(json)) {
380 return json.some(v => values.includes(v));
381 }
382 else {
383 return values.includes(json);
384 }
385 }
386 catch (e) {
387 if (typeof value === "string") {
388 return values.includes(value);
389 }
390 }
391 }
392 else {
393 return false;
394 }
395}
396exports.isConfiguredInEnv = isConfiguredInEnv;
397/**
398 * Strip out any characters that aren't allowed a k8s label value
399 * @param name
400 */
401function sanitizeName(name) {
402 return name.replace(/@/g, "").replace(/\//g, ".");
403}
404exports.sanitizeName = sanitizeName;
405/**
406 * List k8s jobs for a single namespace or cluster-wide depending on evn configuration
407 * @param labelSelector
408 */
409function listJobs(labelSelector) {
410 return __awaiter(this, void 0, void 0, function* () {
411 const kc = config_1.loadKubeConfig();
412 const batch = kc.makeApiClient(k8s.Batch_v1Api);
413 if (automation_client_1.configurationValue("sdm.k8s.job.singleNamespace", true)) {
414 const podNs = process.env.ATOMIST_POD_NAMESPACE || process.env.ATOMIST_DEPLOYMENT_NAMESPACE || "default";
415 return (yield batch.listNamespacedJob(podNs, undefined, undefined, undefined, undefined, labelSelector)).body.items;
416 }
417 else {
418 return (yield batch.listJobForAllNamespaces(undefined, undefined, undefined, labelSelector)).body.items;
419 }
420 });
421}
422exports.listJobs = listJobs;
423function prettyPrintError(e) {
424 if (!!e.body) {
425 return e.body.message;
426 }
427 else {
428 return e.message;
429 }
430}
431exports.prettyPrintError = prettyPrintError;
432//# sourceMappingURL=KubernetesGoalScheduler.js.map
\No newline at end of file