UNPKG

21.8 kBPlain TextView Raw
1/*
2 * Copyright © 2019 Atomist, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17// tslint:disable:max-file-line-count
18
19import {
20 automationClientInstance,
21 AutomationContextAware,
22 Configuration,
23 configurationValue,
24 doWithRetry,
25 HandlerContext,
26 logger,
27} from "@atomist/automation-client";
28import {
29 ExecuteGoalResult,
30 GoalInvocation,
31 GoalScheduler,
32 SdmGoalEvent,
33 ServiceRegistrationGoalDataKey,
34} from "@atomist/sdm";
35import * as k8s from "@kubernetes/client-node";
36import * as cluster from "cluster";
37import * as fs from "fs-extra";
38import * as stringify from "json-stringify-safe";
39import * as _ from "lodash";
40import * as os from "os";
41import { toArray } from "../../util/misc/array";
42import {
43 loadKubeClusterConfig,
44 loadKubeConfig,
45} from "./config";
46import {
47 K8sServiceRegistrationType,
48 K8sServiceSpec,
49} from "./service";
50
51/**
52 * Options to configure the k8s goal scheduling support
53 */
54export interface KubernetesGoalSchedulerOptions {
55 isolateAll?: boolean;
56 podSpec?: k8s.V1Pod;
57}
58
59/**
60 * GoalScheduler implementation that schedules SDM goals inside k8s jobs.
61 *
62 * It reuses the podSpec of the deployed SDM to create a new jobSpec from.
63 * Subclasses may change the spec and job creation behavior by overwriting beforeCreation
64 * and/or afterCreation methods.
65 */
66export class KubernetesGoalScheduler implements GoalScheduler {
67
68 public podSpec: k8s.V1Pod;
69
70 constructor(private readonly options: KubernetesGoalSchedulerOptions = { isolateAll: false }) {
71 }
72
73 public async supports(gi: GoalInvocation): Promise<boolean> {
74 return !process.env.ATOMIST_ISOLATED_GOAL &&
75 (
76 // Goal is marked as isolated and SDM is configured to use k8s jobs
77 (gi.goal.definition.isolated && isConfiguredInEnv("kubernetes")) ||
78 // Force all goals to run isolated via env var
79 isConfiguredInEnv("kubernetes-all") ||
80 // Force all goals to run isolated via explicit option
81 (this.options.isolateAll && isConfiguredInEnv("kubernetes")) ||
82 // Force all goals to run isolated via explicit configuration
83 _.get(gi.configuration, "sdm.k8s.isolateAll", false) === true
84 );
85 }
86
87 public async schedule(gi: GoalInvocation): Promise<ExecuteGoalResult> {
88 const { goalEvent } = gi;
89
90 const podNs = await readNamespace();
91
92 const kc = loadKubeConfig();
93 const batch = kc.makeApiClient(k8s.BatchV1Api);
94
95 const jobSpec = createJobSpec(_.cloneDeep(this.podSpec), podNs, gi);
96 const jobDesc = `k8s job '${jobSpec.metadata.namespace}:${jobSpec.metadata.name}' for goal '${goalEvent.uniqueName}'`;
97 await this.beforeCreation(gi, jobSpec);
98
99 gi.progressLog.write(`/--`);
100 gi.progressLog.write(
101 `Scheduling k8s job '${jobSpec.metadata.namespace}:${jobSpec.metadata.name}' for goal '${goalEvent.name} (${goalEvent.uniqueName})'`);
102 gi.progressLog.write("\\--");
103
104 try {
105 // Check if this job was previously launched
106 await batch.readNamespacedJob(jobSpec.metadata.name, jobSpec.metadata.namespace);
107 logger.debug(`${jobDesc} already exists. Deleting...`);
108 try {
109 await batch.deleteNamespacedJob(jobSpec.metadata.name, jobSpec.metadata.namespace, {} as any);
110 logger.debug(`${jobDesc} deleted`);
111 } catch (e) {
112 logger.error(`Failed to delete ${jobDesc}: ${stringify(e.body)}`);
113 return {
114 code: 1,
115 message: `Failed to delete ${jobDesc}: ${prettyPrintError(e)}`,
116 };
117 }
118 } catch (e) {
119 // This is ok to ignore as it just means the job doesn't exist
120 }
121
122 try {
123 logger.debug(`Job spec for ${jobDesc}: ${JSON.stringify(jobSpec)}`);
124 // Previous deletion might not have completed; hence the retry here
125 const jobResult = (await doWithRetry<{ body: k8s.V1Job }>(
126 () => batch.createNamespacedJob(jobSpec.metadata.namespace, jobSpec),
127 `Scheduling ${jobDesc}`)).body;
128
129 await this.afterCreation(gi, jobResult);
130
131 logger.info(`Scheduled ${jobDesc} with result: ${stringify(jobResult.status)}`);
132 logger.log("silly", stringify(jobResult));
133 } catch (e) {
134 logger.error(`Failed to schedule ${jobDesc}: ${stringify(e.body)}`);
135 return {
136 code: 1,
137 message: `Failed to schedule ${jobDesc}: ${prettyPrintError(e)}`,
138 };
139 }
140 await gi.progressLog.flush();
141 return {
142 code: 0,
143 message: `Scheduled ${jobDesc}`,
144 };
145 }
146
147 /**
148 * Extension point for sub classes to modify k8s resources or provided jobSpec before the
149 * Job gets created in k8s.
150 * Note: A potentially existing job with the same name has already been deleted at this point.
151 * @param gi
152 * @param jobSpec
153 */
154 protected async beforeCreation(gi: GoalInvocation, jobSpec: k8s.V1Job): Promise<void> {
155 // Intentionally left empty
156 }
157
158 /**
159 * Extension point for sub classes to modify k8s resources after the job has been created.
160 * The provided jobSpec contains the result of the job creation API call.
161 * @param gi
162 * @param jobSpec
163 */
164 protected async afterCreation(gi: GoalInvocation, jobSpec: k8s.V1Job): Promise<void> {
165 // Intentionally left empty
166 }
167
168 public async initialize(configuration: Configuration): Promise<void> {
169 const podName = process.env.ATOMIST_POD_NAME || os.hostname();
170 const podNs = await readNamespace();
171
172 try {
173 const kc = loadKubeClusterConfig();
174 const core = kc.makeApiClient(k8s.CoreV1Api);
175
176 this.podSpec = (await core.readNamespacedPod(podName, podNs)).body;
177 } catch (e) {
178 logger.error(`Failed to obtain parent pod spec from k8s: ${prettyPrintError(e)}`);
179
180 if (!!this.options.podSpec) {
181 this.podSpec = this.options.podSpec;
182 } else {
183 throw new Error(`Failed to obtain parent pod spec from k8s: ${prettyPrintError(e)}`);
184 }
185 }
186
187 if (configuration.cluster.enabled === false || cluster.isMaster) {
188 setInterval(() => {
189 return this.cleanUp()
190 .then(() => {
191 logger.debug("Finished cleaning scheduled goal jobs");
192 });
193 }, _.get(configuration, "sdm.k8s.job.cleanupInterval", 1000 * 60 * 60 * 2)).unref();
194 }
195 }
196
197 /**
198 * Extension point to allow for custom clean up logic.
199 */
200 protected async cleanUp(): Promise<void> {
201 return cleanCompletedJobs();
202 }
203}
204
205/**
206 * Cleanup scheduled k8s goal jobs
207 * @returns {Promise<void>}
208 */
209export async function cleanCompletedJobs(): Promise<void> {
210 const selector = `atomist.com/creator=${sanitizeName(configurationValue<string>("name"))}`;
211
212 const jobs = await listJobs(selector);
213 const completedJobs =
214 jobs.filter(j => j.status && j.status.completionTime && j.status.succeeded && j.status.succeeded > 0);
215
216 if (completedJobs.length > 0) {
217 logger.debug(`Deleting the following k8s jobs: ${
218 completedJobs.map(j => `${j.metadata.namespace}:${j.metadata.name}`).join(", ")}`);
219
220 for (const completedSdmJob of completedJobs) {
221 const job = { name: completedSdmJob.metadata.name, namespace: completedSdmJob.metadata.namespace };
222 logger.debug(`Deleting k8s job '${job.namespace}:${job.name}'`);
223 await deleteJob(job);
224
225 logger.debug(`Deleting k8s pods for job '${job.namespace}:${job.name}'`);
226 await deletePods(job);
227 }
228 }
229}
230
231/** Unique name for goal to use in k8s job spec. */
232function k8sJobGoalName(goalEvent: SdmGoalEvent): string {
233 return goalEvent.uniqueName.split("#")[0].toLowerCase();
234}
235
236/** Unique name for job to use in k8s job spec. */
237export function k8sJobName(podSpec: k8s.V1Pod, goalEvent: SdmGoalEvent): string {
238 const goalName = k8sJobGoalName(goalEvent);
239 return `${podSpec.spec.containers[0].name}-job-${goalEvent.goalSetId.slice(0, 7)}-${goalName}`
240 .slice(0, 63).replace(/[^a-z0-9]*$/, "");
241}
242
243/**
244 * Kubernetes container spec environment variables that specify an SDM
245 * running in single-goal mode.
246 */
247export function k8sJobEnv(podSpec: k8s.V1Pod, goalEvent: SdmGoalEvent, context: HandlerContext): k8s.V1EnvVar[] {
248 const goalName = k8sJobGoalName(goalEvent);
249 const jobName = k8sJobName(podSpec, goalEvent);
250 const envVars: k8s.V1EnvVar[] = [
251 {
252 name: "ATOMIST_JOB_NAME",
253 value: jobName,
254 },
255 {
256 name: "ATOMIST_REGISTRATION_NAME",
257 value: `${automationClientInstance().configuration.name}-job-${goalEvent.goalSetId.slice(0, 7)}-${goalName}`,
258 },
259 {
260 name: "ATOMIST_GOAL_TEAM",
261 value: context.workspaceId,
262 },
263 {
264 name: "ATOMIST_GOAL_TEAM_NAME",
265 value: (context as any as AutomationContextAware).context.workspaceName,
266 },
267 {
268 name: "ATOMIST_GOAL_ID",
269 value: (goalEvent as any).id,
270 },
271 {
272 name: "ATOMIST_GOAL_SET_ID",
273 value: goalEvent.goalSetId,
274 },
275 {
276 name: "ATOMIST_GOAL_UNIQUE_NAME",
277 value: goalEvent.uniqueName,
278 },
279 {
280 name: "ATOMIST_CORRELATION_ID",
281 value: context.correlationId,
282 },
283 {
284 name: "ATOMIST_ISOLATED_GOAL",
285 value: "true",
286 },
287 ];
288 return envVars;
289}
290
291/**
292 * Create a jobSpec by modifying the provided podSpec
293 * @param podSpec
294 * @param podNs
295 * @param gi
296 */
297export function createJobSpec(podSpec: k8s.V1Pod, podNs: string, gi: GoalInvocation): k8s.V1Job {
298 const { goalEvent, context } = gi;
299
300 const jobSpec = createJobSpecWithAffinity(podSpec, gi);
301
302 jobSpec.metadata.name = k8sJobName(podSpec, goalEvent);
303 jobSpec.metadata.namespace = podNs;
304
305 jobSpec.spec.backoffLimit = 1;
306 jobSpec.spec.template.spec.restartPolicy = "Never";
307 jobSpec.spec.template.spec.containers[0].name = jobSpec.metadata.name;
308
309 jobSpec.spec.template.spec.containers[0].env.push(...k8sJobEnv(podSpec, goalEvent, context));
310
311 rewriteCachePath(jobSpec, context.workspaceId);
312
313 // Add additional specs from registered services to the job spec
314 if (_.get(gi.configuration, "sdm.k8s.service.enabled", true)) {
315 if (!!goalEvent.data) {
316 let data: any = {};
317 try {
318 data = JSON.parse(goalEvent.data);
319 } catch (e) {
320 logger.warn(`Failed to parse goal data on '${goalEvent.uniqueName}'`);
321 }
322 if (!!data[ServiceRegistrationGoalDataKey]) {
323 _.forEach(data[ServiceRegistrationGoalDataKey], (v, k) => {
324 logger.debug(`Service with name '${k}' and type '${v.type}' found for goal '${goalEvent.uniqueName}'`);
325 if (v.type === K8sServiceRegistrationType.K8sService) {
326 const spec = v.spec as K8sServiceSpec;
327 if (!!spec.container) {
328 const c = toArray<k8s.V1Container>(spec.container as any);
329 jobSpec.spec.template.spec.containers.push(...c);
330 }
331
332 if (!!spec.initContainer) {
333 const ic = toArray<k8s.V1Container>(spec.initContainer as any);
334 jobSpec.spec.template.spec.initContainers = [
335 ...(jobSpec.spec.template.spec.initContainers || []),
336 ...ic,
337 ];
338 }
339
340 if (!!spec.volume) {
341 const vo = toArray<k8s.V1Volume>(spec.volume as any);
342 jobSpec.spec.template.spec.volumes = [
343 ...(jobSpec.spec.template.spec.volumes || []),
344 ...vo,
345 ];
346 }
347
348 if (!!spec.volumeMount) {
349 const vm = toArray<k8s.V1VolumeMount>(spec.volumeMount as any);
350 [...jobSpec.spec.template.spec.containers, ...jobSpec.spec.template.spec.initContainers].forEach(c => {
351 c.volumeMounts = [
352 ...(c.volumeMounts || []),
353 ...vm,
354 ];
355 });
356 }
357
358 if (!!spec.imagePullSecret) {
359 const ips = toArray<k8s.V1LocalObjectReference>(spec.imagePullSecret as any);
360 jobSpec.spec.template.spec.imagePullSecrets = [
361 ...(jobSpec.spec.template.spec.imagePullSecrets || []),
362 ...ips,
363 ];
364 }
365 }
366 });
367 }
368 }
369 }
370
371 return jobSpec;
372}
373
374/**
375 * Create a k8s Job spec with affinity to jobs for the same goal set
376 * @param goalSetId
377 */
378function createJobSpecWithAffinity(podSpec: k8s.V1Pod, gi: GoalInvocation): k8s.V1Job {
379 const { goalEvent, configuration, context } = gi;
380
381 _.defaultsDeep(podSpec.spec.affinity, {
382 podAffinity: {
383 preferredDuringSchedulingIgnoredDuringExecution: [
384 {
385 weight: 100,
386 podAffinityTerm: {
387 labelSelector: {
388 matchExpressions: [
389 {
390 key: "atomist.com/goal-set-id",
391 operator: "In",
392 values: [
393 goalEvent.goalSetId,
394 ],
395 },
396 ],
397 },
398 topologyKey: "kubernetes.io/hostname",
399 },
400 },
401 ],
402 },
403 });
404
405 // Clean up podSpec
406 // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.13/#pod-v1-core note on nodeName
407 delete podSpec.spec.nodeName;
408
409 const labels = {
410 "atomist.com/goal-set-id": goalEvent.goalSetId,
411 "atomist.com/goal-id": (goalEvent as any).id,
412 "atomist.com/creator": sanitizeName(configuration.name),
413 "atomist.com/workspace-id": context.workspaceId,
414 };
415
416 const detail = {
417 sdm: {
418 name: configuration.name,
419 version: configuration.version,
420 },
421 goal: {
422 goalId: (goalEvent as any).id,
423 goalSetId: goalEvent.goalSetId,
424 uniqueName: goalEvent.uniqueName,
425 },
426 };
427
428 const annotations = {
429 "atomist.com/sdm": JSON.stringify(detail),
430 };
431
432 return {
433 kind: "Job",
434 apiVersion: "batch/v1",
435 metadata: {
436 labels,
437 annotations,
438 },
439 spec: {
440 template: {
441 metadata: {
442 labels,
443 },
444 spec: podSpec.spec,
445 },
446 },
447 } as any;
448}
449
450/**
451 * Rewrite the volume host path to include the workspace id to prevent cross workspace content ending
452 * up in the same directory.
453 * @param jobSpec
454 * @param workspaceId
455 */
456function rewriteCachePath(jobSpec: k8s.V1Job, workspaceId: string): void {
457 const cachePath = configurationValue("sdm.cache.path", "/opt/data");
458 const containers: k8s.V1Container[] = _.get(jobSpec, "spec.template.spec.containers", []);
459
460 const cacheVolumeNames: string[] = [];
461 containers.forEach(c => {
462 cacheVolumeNames.push(...c.volumeMounts.filter(vm => vm.mountPath === cachePath).map(cm => cm.name));
463 });
464
465 _.uniq(cacheVolumeNames).forEach(vn => {
466 const volume: k8s.V1Volume = _.get(jobSpec, "spec.template.spec.volumes", []).find(v => v.name === vn);
467 if (!!volume && !!volume.hostPath && !!volume.hostPath.path) {
468 const path = volume.hostPath.path;
469 if (!path.endsWith(workspaceId) || !path.endsWith(`${workspaceId}/`)) {
470 if (path.endsWith("/")) {
471 volume.hostPath.path = `${path}${workspaceId}`;
472 } else {
473 volume.hostPath.path = `${path}/${workspaceId}`;
474 }
475 }
476 }
477 });
478}
479
480/**
481 * Checks if one of the provided values is configured in ATOMIST_GOAL_SCHEDULER or -
482 * for backwards compatibility reasons - ATOMIST_GOAL_LAUNCHER.
483 * @param values
484 */
485export function isConfiguredInEnv(...values: string[]): boolean {
486 const value = process.env.ATOMIST_GOAL_SCHEDULER || process.env.ATOMIST_GOAL_LAUNCHER;
487 if (!!value) {
488 try {
489 const json = JSON.parse(value);
490 if (Array.isArray(json)) {
491 return json.some(v => values.includes(v));
492 } else {
493 return values.includes(json);
494 }
495 } catch (e) {
496 if (typeof value === "string") {
497 return values.includes(value);
498 }
499 }
500 }
501 return false;
502}
503
504/**
505 * Strip out any characters that aren't allowed a k8s label value
506 * @param name
507 */
508export function sanitizeName(name: string): string {
509 return name.replace(/@/g, "").replace(/\//g, ".");
510}
511
512/**
513 * List k8s jobs for a single namespace or cluster-wide depending on evn configuration
514 * @param labelSelector
515 */
516export async function listJobs(labelSelector?: string): Promise<k8s.V1Job[]> {
517 const kc = loadKubeConfig();
518 const batch = kc.makeApiClient(k8s.BatchV1Api);
519
520 if (configurationValue<boolean>("sdm.k8s.job.singleNamespace", true)) {
521 const podNs = await readNamespace();
522 return (await batch.listNamespacedJob(
523 podNs,
524 undefined,
525 undefined,
526 undefined,
527 undefined,
528 labelSelector,
529 )).body.items;
530 } else {
531 return (await batch.listJobForAllNamespaces(
532 undefined,
533 undefined,
534 undefined,
535 labelSelector,
536 )).body.items;
537 }
538}
539
540export const K8sNamespaceFile = "/var/run/secrets/kubernetes.io/serviceaccount/namespace";
541
542/**
543 * Read the namespace of the deployment from environment and k8s service account files.
544 * Falls back to the default namespace and no other configuration can be found.
545 */
546export async function readNamespace(): Promise<string> {
547 let podNs = process.env.ATOMIST_POD_NAMESPACE || process.env.ATOMIST_DEPLOYMENT_NAMESPACE;
548 if (!!podNs) {
549 return podNs;
550 }
551
552 if (await fs.pathExists(K8sNamespaceFile)) {
553 podNs = (await fs.readFile(K8sNamespaceFile)).toString().trim();
554 }
555
556 if (!!podNs) {
557 return podNs;
558 }
559
560 return "default";
561}
562
563export function prettyPrintError(e: any): string {
564 if (!!e.body) {
565 return e.body.message;
566 } else {
567 return e.message;
568 }
569}
570
571export async function deleteJob(job: { name: string, namespace: string }): Promise<void> {
572 try {
573 const kc = loadKubeConfig();
574 const batch = kc.makeApiClient(k8s.BatchV1Api);
575
576 await batch.readNamespacedJob(job.name, job.namespace);
577 try {
578 await batch.deleteNamespacedJob(
579 job.name,
580 job.namespace,
581 { propagationPolicy: "Foreground" } as any);
582 } catch (e) {
583 logger.warn(`Failed to delete k8s jobs '${job.namespace}:${job.name}': ${
584 prettyPrintError(e)}`);
585 }
586 } catch (e) {
587 // This is ok to ignore because the job doesn't exist any more
588 }
589}
590
591export async function deletePods(job: { name: string, namespace: string }): Promise<void> {
592 try {
593 const kc = loadKubeConfig();
594 const core = kc.makeApiClient(k8s.CoreV1Api);
595
596 const selector = `job-name=${job.name}`;
597 const pods = await core.listNamespacedPod(
598 job.namespace,
599 undefined,
600 undefined,
601 undefined,
602 undefined,
603 selector);
604 if (pods.body && pods.body.items) {
605 for (const pod of pods.body.items) {
606 try {
607 await core.deleteNamespacedPod(pod.metadata.name, pod.metadata.namespace, {} as any);
608 } catch (e) {
609 // Probably ok because pod might be gone already
610 logger.debug(
611 `Failed to delete k8s pod '${pod.metadata.namespace}:${pod.metadata.name}': ${
612 prettyPrintError(e)}`);
613 }
614 }
615 }
616 } catch (e) {
617 logger.warn(`Failed to list pods for k8s job '${job.namespace}:${job.name}': ${
618 prettyPrintError(e)}`);
619 }
620}
621
\No newline at end of file