1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 | import {
|
18 | GitCommandGitProject,
|
19 | GitProject,
|
20 | guid,
|
21 | } from "@atomist/automation-client";
|
22 | import { sleep } from "@atomist/automation-client/lib/internal/util/poll";
|
23 | import {
|
24 | ExecuteGoal,
|
25 | GoalProjectListenerEvent,
|
26 | GoalProjectListenerRegistration,
|
27 | GoalScheduler,
|
28 | GoalWithFulfillment,
|
29 | ImplementationRegistration,
|
30 | minimalClone,
|
31 | ProgressLog,
|
32 | RepoContext,
|
33 | SdmGoalEvent,
|
34 | SdmGoalState,
|
35 | ServiceRegistrationGoalDataKey,
|
36 | } from "@atomist/sdm";
|
37 | import * as k8s from "@kubernetes/client-node";
|
38 | import * as fs from "fs-extra";
|
39 | import * as stringify from "json-stringify-safe";
|
40 | import * as _ from "lodash";
|
41 | import * as os from "os";
|
42 | import * as path from "path";
|
43 | import * as request from "request";
|
44 | import { Writable } from "stream";
|
45 | import {
|
46 | DeepPartial,
|
47 | Merge,
|
48 | } from "ts-essentials";
|
49 | import { loadKubeConfig } from "../../pack/k8s/config";
|
50 | import { DefaultKubernetesFulfillmentOptions } from "../../pack/k8s/KubernetesFulfillmentGoalScheduler";
|
51 | import {
|
52 | k8sJobEnv,
|
53 | KubernetesGoalScheduler,
|
54 | readNamespace,
|
55 | } from "../../pack/k8s/KubernetesGoalScheduler";
|
56 | import {
|
57 | K8sServiceRegistrationType,
|
58 | K8sServiceSpec,
|
59 | } from "../../pack/k8s/service";
|
60 | import { toArray } from "../../util/misc/array";
|
61 | import {
|
62 | CacheOutputGoalDataKey,
|
63 | cachePut,
|
64 | cacheRestore,
|
65 | } from "../cache/goalCaching";
|
66 | import {
|
67 | Container,
|
68 | ContainerInput,
|
69 | ContainerOutput,
|
70 | ContainerProjectHome,
|
71 | ContainerRegistration,
|
72 | ContainerRegistrationGoalDataKey,
|
73 | ContainerScheduler,
|
74 | GoalContainer,
|
75 | GoalContainerVolume,
|
76 | } from "./container";
|
77 | import { prepareSecrets } from "./provider";
|
78 | import {
|
79 | containerEnvVars,
|
80 | prepareInputAndOutput,
|
81 | processResult,
|
82 | } from "./util";
|
83 |
|
84 |
|
85 |
|
86 |
|
87 | export type K8sGoalContainer =
|
88 | Merge<GoalContainer, DeepPartial<k8s.V1Container>>
|
89 | & Pick<GoalContainer, "name" | "image">;
|
90 |
|
91 | export type K8sGoalContainerVolume = Merge<k8s.V1Volume, GoalContainerVolume>;
|
92 |
|
93 |
|
94 | export type K8sGoalContainerSpec = Omit<K8sContainerRegistration, "callback">;
|
95 |
|
96 |
|
97 |
|
98 |
|
99 |
|
100 | export type K8sContainerSpecCallback =
|
101 | (r: K8sContainerRegistration, p: GitProject, g: Container, e: SdmGoalEvent, ctx: RepoContext) =>
|
102 | Promise<Omit<K8sContainerRegistration, "callback">>;
|
103 |
|
104 |
|
105 |
|
106 |
|
107 | export interface K8sContainerRegistration extends ContainerRegistration {
|
108 | |
109 |
|
110 |
|
111 |
|
112 |
|
113 |
|
114 |
|
115 |
|
116 |
|
117 |
|
118 |
|
119 |
|
120 |
|
121 |
|
122 |
|
123 |
|
124 | containers: K8sGoalContainer[];
|
125 | |
126 |
|
127 |
|
128 |
|
129 | callback?: K8sContainerSpecCallback;
|
130 | |
131 |
|
132 |
|
133 |
|
134 |
|
135 | initContainers?: k8s.V1Container[];
|
136 | |
137 |
|
138 |
|
139 |
|
140 | volumes?: K8sGoalContainerVolume[];
|
141 | }
|
142 |
|
143 | export const k8sContainerScheduler: ContainerScheduler = (goal, registration: K8sContainerRegistration) => {
|
144 | goal.addFulfillment({
|
145 | goalExecutor: executeK8sJob(),
|
146 | ...registration as ImplementationRegistration,
|
147 | });
|
148 |
|
149 | goal.addFulfillmentCallback({
|
150 | goal,
|
151 | callback: k8sFulfillmentCallback(goal, registration),
|
152 | });
|
153 | };
|
154 |
|
155 |
|
156 |
|
157 |
|
158 |
|
159 | export function k8sFulfillmentCallback(
|
160 | goal: Container,
|
161 | registration: K8sContainerRegistration,
|
162 | ): (sge: SdmGoalEvent, rc: RepoContext) => Promise<SdmGoalEvent> {
|
163 |
|
164 | return async (goalEvent, repoContext) => {
|
165 | let spec: K8sContainerRegistration = _.cloneDeep(registration);
|
166 | if (registration.callback) {
|
167 | spec = await repoContext.configuration.sdm.projectLoader.doWithProject({
|
168 | ...repoContext,
|
169 | readOnly: true,
|
170 | cloneOptions: minimalClone(goalEvent.push, { detachHead: true }),
|
171 | }, async p => {
|
172 | return {
|
173 | ...spec,
|
174 | ...(await registration.callback(_.cloneDeep(registration), p, goal, goalEvent, repoContext)) || {},
|
175 | };
|
176 | });
|
177 | }
|
178 |
|
179 | if (!spec.containers || spec.containers.length < 1) {
|
180 | throw new Error("No containers defined in K8sGoalContainerSpec");
|
181 | }
|
182 |
|
183 |
|
184 | let data = parseGoalEventData(goalEvent);
|
185 | let newData: any = {};
|
186 | delete spec.callback;
|
187 | _.set<any>(newData, ContainerRegistrationGoalDataKey, spec);
|
188 | goalEvent.data = JSON.stringify(_.merge(data, newData));
|
189 |
|
190 | if (spec.containers[0].workingDir === "") {
|
191 | delete spec.containers[0].workingDir;
|
192 | } else if (!spec.containers[0].workingDir) {
|
193 | spec.containers[0].workingDir = ContainerProjectHome;
|
194 | }
|
195 |
|
196 | const goalSchedulers: GoalScheduler[] = toArray(repoContext.configuration.sdm.goalScheduler) || [];
|
197 | const k8sScheduler = goalSchedulers.find(gs => gs instanceof KubernetesGoalScheduler) as KubernetesGoalScheduler;
|
198 | if (!k8sScheduler) {
|
199 | throw new Error("Failed to find KubernetesGoalScheduler in goal schedulers");
|
200 | }
|
201 | if (!k8sScheduler.podSpec) {
|
202 | throw new Error("KubernetesGoalScheduler has no podSpec defined");
|
203 | }
|
204 |
|
205 | const containerEnvs = await containerEnvVars(goalEvent, repoContext);
|
206 | const projectVolume = `project-${guid().split("-")[0]}`;
|
207 | const inputVolume = `input-${guid().split("-")[0]}`;
|
208 | const outputVolume = `output-${guid().split("-")[0]}`;
|
209 | const ioVolumes = [
|
210 | {
|
211 | name: projectVolume,
|
212 | emptyDir: {},
|
213 | },
|
214 | {
|
215 | name: inputVolume,
|
216 | emptyDir: {},
|
217 | },
|
218 | {
|
219 | name: outputVolume,
|
220 | emptyDir: {},
|
221 | },
|
222 | ];
|
223 | const ioVolumeMounts = [
|
224 | {
|
225 | mountPath: ContainerProjectHome,
|
226 | name: projectVolume,
|
227 | },
|
228 | {
|
229 | mountPath: ContainerInput,
|
230 | name: inputVolume,
|
231 | },
|
232 | {
|
233 | mountPath: ContainerOutput,
|
234 | name: outputVolume,
|
235 | },
|
236 | ];
|
237 |
|
238 | const copyContainer = _.cloneDeep(k8sScheduler.podSpec.spec.containers[0]);
|
239 | delete copyContainer.lifecycle;
|
240 | delete copyContainer.livenessProbe;
|
241 | delete copyContainer.readinessProbe;
|
242 | copyContainer.name = `container-goal-init-${guid().split("-")[0]}`;
|
243 | copyContainer.env = [
|
244 | ...(copyContainer.env || []),
|
245 | ...k8sJobEnv(k8sScheduler.podSpec, goalEvent, repoContext.context as any),
|
246 | ...containerEnvs,
|
247 | {
|
248 | name: "ATOMIST_ISOLATED_GOAL_INIT",
|
249 | value: "true",
|
250 | },
|
251 | {
|
252 | name: "ATOMIST_CONFIG",
|
253 | value: JSON.stringify({
|
254 | cluster: {
|
255 | enabled: false,
|
256 | },
|
257 | ws: {
|
258 | enabled: false,
|
259 | },
|
260 | }),
|
261 | },
|
262 | ];
|
263 | spec.initContainers = spec.initContainers || [];
|
264 |
|
265 | const parameters = JSON.parse((goalEvent as any).parameters || "{}");
|
266 | const secrets = await prepareSecrets(
|
267 | _.merge({}, registration.containers[0], (parameters["@atomist/sdm/secrets"] || {})), repoContext);
|
268 | delete spec.containers[0].secrets;
|
269 | [...spec.containers, ...spec.initContainers].forEach(c => {
|
270 | c.env = [
|
271 | ...(secrets.env || []),
|
272 | ...containerEnvs,
|
273 | ...(c.env || []),
|
274 | ];
|
275 | });
|
276 | if (!!secrets?.files) {
|
277 | for (const file of secrets.files) {
|
278 | const fileName = path.basename(file.mountPath);
|
279 | const dirname = path.dirname(file.mountPath);
|
280 | let secretName = `secret-${guid().split("-")[0]}`;
|
281 |
|
282 | const vm = (copyContainer.volumeMounts || [])
|
283 | .find(m => m.mountPath === dirname);
|
284 | if (!!vm) {
|
285 | secretName = vm.name;
|
286 | } else {
|
287 | copyContainer.volumeMounts = [
|
288 | ...(copyContainer.volumeMounts || []),
|
289 | {
|
290 | mountPath: dirname,
|
291 | name: secretName,
|
292 | },
|
293 | ];
|
294 | spec.volumes = [
|
295 | ...(spec.volumes || []),
|
296 | {
|
297 | name: secretName,
|
298 | emptyDir: {},
|
299 | } as any,
|
300 | ];
|
301 | }
|
302 | [...spec.containers, ...spec.initContainers].forEach((c: k8s.V1Container) => {
|
303 | c.volumeMounts = [
|
304 | ...(c.volumeMounts || []),
|
305 | {
|
306 | mountPath: file.mountPath,
|
307 | name: secretName,
|
308 | subPath: fileName,
|
309 | },
|
310 | ];
|
311 | });
|
312 | }
|
313 | }
|
314 | spec.initContainers = [
|
315 | copyContainer,
|
316 | ...spec.initContainers,
|
317 | ];
|
318 |
|
319 | const serviceSpec: { type: string, spec: K8sServiceSpec } = {
|
320 | type: K8sServiceRegistrationType.K8sService,
|
321 | spec: {
|
322 | container: spec.containers,
|
323 | initContainer: spec.initContainers,
|
324 | volume: [
|
325 | ...ioVolumes,
|
326 | ...(spec.volumes || []),
|
327 | ],
|
328 | volumeMount: ioVolumeMounts,
|
329 | },
|
330 | };
|
331 |
|
332 |
|
333 | data = JSON.parse(goalEvent.data || "{}");
|
334 | newData = {};
|
335 | _.set<any>(newData, `${ServiceRegistrationGoalDataKey}.${registration.name}`, serviceSpec);
|
336 | goalEvent.data = JSON.stringify(_.merge(data, newData));
|
337 | return goalEvent;
|
338 | };
|
339 | }
|
340 |
|
341 |
|
342 |
|
343 |
|
344 |
|
345 |
|
346 |
|
347 | export const scheduleK8sJob: ExecuteGoal = async gi => {
|
348 | const { goalEvent } = gi;
|
349 | const { uniqueName } = goalEvent;
|
350 | const data = parseGoalEventData(goalEvent);
|
351 | const containerReg: K8sContainerRegistration = data["@atomist/sdm/container"];
|
352 | if (!containerReg) {
|
353 | throw new Error(`Goal ${uniqueName} event data has no container spec: ${goalEvent.data}`);
|
354 | }
|
355 |
|
356 | const goalSchedulers: GoalScheduler[] = toArray(gi.configuration.sdm.goalScheduler) || [];
|
357 | const k8sScheduler = goalSchedulers.find(gs => gs instanceof KubernetesGoalScheduler) as KubernetesGoalScheduler;
|
358 | if (!k8sScheduler) {
|
359 | throw new Error(`Failed to find KubernetesGoalScheduler in goal schedulers: ${stringify(goalSchedulers)}`);
|
360 | }
|
361 |
|
362 |
|
363 | delete data[ServiceRegistrationGoalDataKey];
|
364 | goalEvent.data = JSON.stringify(data);
|
365 |
|
366 | try {
|
367 | const schedulableGoalEvent = await k8sFulfillmentCallback(gi.goal as Container, containerReg)(goalEvent, gi);
|
368 | const scheduleResult = await k8sScheduler.schedule({ ...gi, goalEvent: schedulableGoalEvent });
|
369 | if (scheduleResult.code) {
|
370 | return { ...scheduleResult, message: `Failed to schedule container goal ${uniqueName}: ${scheduleResult.message}` };
|
371 | }
|
372 | schedulableGoalEvent.state = SdmGoalState.in_process;
|
373 | return schedulableGoalEvent;
|
374 | } catch (e) {
|
375 | const message = `Failed to schedule container goal ${uniqueName} as Kubernetes job: ${e.message}`;
|
376 | gi.progressLog.write(message);
|
377 | return { code: 1, message };
|
378 | }
|
379 | };
|
380 |
|
381 |
|
382 | interface K8sContainer {
|
383 |
|
384 | config: k8s.KubeConfig;
|
385 |
|
386 | name: string;
|
387 |
|
388 | pod: string;
|
389 |
|
390 | ns: string;
|
391 |
|
392 | log: ProgressLog;
|
393 | }
|
394 |
|
395 |
|
396 |
|
397 |
|
398 |
|
399 | export function executeK8sJob(): ExecuteGoal {
|
400 |
|
401 | return async gi => {
|
402 | const { goalEvent, progressLog, configuration, id, credentials } = gi;
|
403 |
|
404 | const projectDir = process.env.ATOMIST_PROJECT_DIR || ContainerProjectHome;
|
405 | const inputDir = process.env.ATOMIST_INPUT_DIR || ContainerInput;
|
406 | const outputDir = process.env.ATOMIST_OUTPUT_DIR || ContainerOutput;
|
407 |
|
408 | const data = parseGoalEventData(goalEvent);
|
409 | if (!data[ContainerRegistrationGoalDataKey]) {
|
410 | throw new Error("Failed to read k8s ContainerRegistration from goal data");
|
411 | }
|
412 | if (!data[ContainerRegistrationGoalDataKey]) {
|
413 | throw new Error(`Goal ${gi.goal.uniqueName} has no Kubernetes container registration: ${gi.goalEvent.data}`);
|
414 | }
|
415 | const registration: K8sContainerRegistration = data[ContainerRegistrationGoalDataKey];
|
416 |
|
417 | if (process.env.ATOMIST_ISOLATED_GOAL_INIT === "true") {
|
418 | return configuration.sdm.projectLoader.doWithProject({
|
419 | ...gi,
|
420 | readOnly: false,
|
421 | cloneDir: projectDir,
|
422 | cloneOptions: minimalClone(goalEvent.push, { detachHead: true }),
|
423 | }, async project => {
|
424 | try {
|
425 | await prepareInputAndOutput(inputDir, outputDir, gi);
|
426 | } catch (e) {
|
427 | const message = `Failed to prepare input and output for goal ${goalEvent.name}: ${e.message}`;
|
428 | progressLog.write(message);
|
429 | return { code: 1, message };
|
430 | }
|
431 | const secrets = await prepareSecrets(
|
432 | _.merge({}, registration.containers[0], ((gi.parameters || {})["@atomist/sdm/secrets"] || {})), gi);
|
433 | if (!!secrets?.files) {
|
434 | for (const file of secrets.files) {
|
435 | await fs.writeFile(file.mountPath, file.value);
|
436 | }
|
437 | }
|
438 |
|
439 | goalEvent.state = SdmGoalState.in_process;
|
440 | return goalEvent;
|
441 |
|
442 | });
|
443 | }
|
444 |
|
445 | let containerName: string = _.get(registration, "containers[0].name");
|
446 | if (!containerName) {
|
447 | const msg = `Failed to get main container name from goal registration: ${stringify(registration)}`;
|
448 | progressLog.write(msg);
|
449 | let svcSpec: K8sServiceSpec;
|
450 | try {
|
451 | svcSpec = _.get(data, `${ServiceRegistrationGoalDataKey}.${registration.name}.spec`);
|
452 | } catch (e) {
|
453 | const message = `Failed to parse Kubernetes spec from goal data '${goalEvent.data}': ${e.message}`;
|
454 | progressLog.write(message);
|
455 | return { code: 1, message };
|
456 | }
|
457 | containerName = _.get(svcSpec, "container[1].name");
|
458 | if (!containerName) {
|
459 | const message = `Failed to get main container name from either goal registration or data: '${goalEvent.data}'`;
|
460 | progressLog.write(message);
|
461 | return { code: 1, message };
|
462 | }
|
463 | }
|
464 | const ns = await readNamespace();
|
465 | const podName = os.hostname();
|
466 |
|
467 | let kc: k8s.KubeConfig;
|
468 | try {
|
469 | kc = loadKubeConfig();
|
470 | } catch (e) {
|
471 | const message = `Failed to load Kubernetes configuration: ${e.message}`;
|
472 | progressLog.write(message);
|
473 | return { code: 1, message };
|
474 | }
|
475 |
|
476 | const container: K8sContainer = {
|
477 | config: kc,
|
478 | name: containerName,
|
479 | pod: podName,
|
480 | ns,
|
481 | log: progressLog,
|
482 | };
|
483 |
|
484 | try {
|
485 | await containerStarted(container);
|
486 | } catch (e) {
|
487 | progressLog.write(e.message);
|
488 | return { code: 1, message: e.message };
|
489 | }
|
490 |
|
491 | const log = followK8sLog(container);
|
492 |
|
493 | const status = { code: 0, message: `Container '${containerName}' completed successfully` };
|
494 | try {
|
495 | const podStatus = await containerWatch(container);
|
496 | progressLog.write(`Container '${containerName}' exited: ${stringify(podStatus)}`);
|
497 | } catch (e) {
|
498 | const message = `Container '${containerName}' failed: ${e.message}`;
|
499 | progressLog.write(message);
|
500 | status.code++;
|
501 | status.message = message;
|
502 | } finally {
|
503 |
|
504 | await sleep(1000);
|
505 | log.abort();
|
506 | }
|
507 |
|
508 | const outputFile = path.join(outputDir, "result.json");
|
509 | let outputResult;
|
510 | if ((await fs.pathExists(outputFile)) && status.code === 0) {
|
511 | try {
|
512 | outputResult = await processResult(await fs.readJson(outputFile), gi);
|
513 | } catch (e) {
|
514 | const message = `Failed to read output from container: ${e.message}`;
|
515 | progressLog.write(message);
|
516 | status.code++;
|
517 | status.message += ` but f${message.slice(1)}`;
|
518 | }
|
519 | }
|
520 |
|
521 | const cacheEntriesToPut = [
|
522 | ...(registration.output || []),
|
523 | ...((gi.parameters || {})[CacheOutputGoalDataKey] || []),
|
524 | ];
|
525 | if (cacheEntriesToPut.length > 0) {
|
526 | try {
|
527 | const project = GitCommandGitProject.fromBaseDir(id, projectDir, credentials, async () => {
|
528 | });
|
529 | const cp = cachePut({ entries: cacheEntriesToPut });
|
530 | await cp.listener(project, gi, GoalProjectListenerEvent.after);
|
531 | } catch (e) {
|
532 | const message = `Failed to put cache output from container: ${e.message}`;
|
533 | progressLog.write(message);
|
534 | status.code++;
|
535 | status.message += ` but f${message.slice(1)}`;
|
536 | }
|
537 | }
|
538 |
|
539 | return outputResult || status;
|
540 | };
|
541 | }
|
542 |
|
543 |
|
544 |
|
545 |
|
546 | function parseGoalEventData(goalEvent: SdmGoalEvent): any {
|
547 | const goalName = goalEvent.uniqueName;
|
548 | if (!goalEvent || !goalEvent.data) {
|
549 | return {};
|
550 | }
|
551 | let data: any;
|
552 | try {
|
553 | data = JSON.parse(goalEvent.data);
|
554 | } catch (e) {
|
555 | e.message = `Failed to parse goal event data for ${goalName} as JSON '${goalEvent.data}': ${e.message}`;
|
556 | throw e;
|
557 | }
|
558 | return data;
|
559 | }
|
560 |
|
561 |
|
562 |
|
563 |
|
564 |
|
565 |
|
566 | const containerExecutor: ExecuteGoal = gi => (process.env.ATOMIST_ISOLATED_GOAL) ? executeK8sJob()(gi) : scheduleK8sJob(gi);
|
567 |
|
568 |
|
569 |
|
570 |
|
571 | const containerFulfillerCacheRestore: GoalProjectListenerRegistration = {
|
572 | name: "ContainerFulfillerCacheRestore",
|
573 | events: [GoalProjectListenerEvent.before],
|
574 | listener: async (project, gi, event) => {
|
575 | const data = parseGoalEventData(gi.goalEvent);
|
576 | if (!data[ContainerRegistrationGoalDataKey]) {
|
577 | throw new Error(`Goal ${gi.goal.uniqueName} has no Kubernetes container registration: ${gi.goalEvent.data}`);
|
578 | }
|
579 | const registration: K8sContainerRegistration = data[ContainerRegistrationGoalDataKey];
|
580 | if (registration.input && registration.input.length > 0) {
|
581 | try {
|
582 | const cp = cacheRestore({ entries: registration.input });
|
583 | return cp.listener(project, gi, GoalProjectListenerEvent.before);
|
584 | } catch (e) {
|
585 | const message = `Failed to restore cache input to container for goal ${gi.goal.uniqueName}: ${e.message}`;
|
586 | gi.progressLog.write(message);
|
587 | return { code: 1, message };
|
588 | }
|
589 | } else {
|
590 | return { code: 0, message: "No container input cache entries to restore" };
|
591 | }
|
592 | },
|
593 | };
|
594 |
|
595 |
|
596 |
|
597 |
|
598 |
|
599 | export const K8sContainerFulfiller = new GoalWithFulfillment({
|
600 | displayName: "Kubernetes Container Goal Fulfiller",
|
601 | uniqueName: DefaultKubernetesFulfillmentOptions.name,
|
602 | })
|
603 | .with({
|
604 | goalExecutor: containerExecutor,
|
605 | name: `${DefaultKubernetesFulfillmentOptions.name}-executor`,
|
606 | })
|
607 | .withProjectListener(containerFulfillerCacheRestore);
|
608 |
|
609 |
|
610 |
|
611 |
|
612 |
|
613 |
|
614 |
|
615 | async function containerStarted(container: K8sContainer, attempts: number = 240): Promise<void> {
|
616 | let core: k8s.CoreV1Api;
|
617 | try {
|
618 | core = container.config.makeApiClient(k8s.CoreV1Api);
|
619 | } catch (e) {
|
620 | e.message = `Failed to create Kubernetes core API client: ${e.message}`;
|
621 | container.log.write(e.message);
|
622 | throw e;
|
623 | }
|
624 |
|
625 | const sleepTime = 500;
|
626 | for (let i = 0; i < attempts; i++) {
|
627 | await sleep(500);
|
628 | const pod = (await core.readNamespacedPod(container.pod, container.ns)).body;
|
629 | const containerStatus = pod.status.containerStatuses.find(c => c.name === container.name);
|
630 | if (containerStatus && (!!_.get(containerStatus, "state.running.startedAt") || !!_.get(containerStatus, "state.terminated"))) {
|
631 | const message = `Container '${container.name}' started`;
|
632 | container.log.write(message);
|
633 | return;
|
634 | }
|
635 | }
|
636 |
|
637 | const errMsg = `Container '${container.name}' failed to start within ${attempts * sleepTime} ms`;
|
638 | container.log.write(errMsg);
|
639 | throw new Error(errMsg);
|
640 | }
|
641 |
|
642 |
|
643 |
|
644 |
|
645 |
|
646 |
|
647 |
|
648 |
|
649 |
|
650 |
|
651 | function containerWatch(container: K8sContainer): Promise<k8s.V1PodStatus> {
|
652 | return new Promise((resolve, reject) => {
|
653 | let watch: k8s.Watch;
|
654 | try {
|
655 | watch = new k8s.Watch(container.config);
|
656 | } catch (e) {
|
657 | e.message = `Failed to create Kubernetes watch client: ${e.message}`;
|
658 | container.log.write(e.message);
|
659 | reject(e);
|
660 | }
|
661 | const watchPath = `/api/v1/watch/namespaces/${container.ns}/pods/${container.pod}`;
|
662 | let watcher: any;
|
663 | watcher = watch.watch(watchPath, {}, (phase, obj) => {
|
664 | const pod = obj as k8s.V1Pod;
|
665 | if (pod && pod.status && pod.status.containerStatuses) {
|
666 | const containerStatus = pod.status.containerStatuses.find(c => c.name === container.name);
|
667 | if (containerStatus && containerStatus.state && containerStatus.state.terminated) {
|
668 | const exitCode: number = _.get(containerStatus, "state.terminated.exitCode");
|
669 | if (exitCode === 0) {
|
670 | const msg = `Container '${container.name}' exited with status 0`;
|
671 | container.log.write(msg);
|
672 | resolve(pod.status);
|
673 | } else {
|
674 | const msg = `Container '${container.name}' exited with status ${exitCode}`;
|
675 | container.log.write(msg);
|
676 | const err = new Error(msg);
|
677 | (err as any).podStatus = pod.status;
|
678 | reject(err);
|
679 | }
|
680 | if (watcher) {
|
681 | watcher.abort();
|
682 | }
|
683 | return;
|
684 | }
|
685 | }
|
686 | container.log.write(`Container '${container.name}' still running`);
|
687 | }, err => {
|
688 | err.message = `Container watcher failed: ${err.message}`;
|
689 | container.log.write(err.message);
|
690 | reject(err);
|
691 | });
|
692 | });
|
693 | }
|
694 |
|
695 |
|
696 |
|
697 |
|
698 | function followK8sLog(container: K8sContainer): request.Request {
|
699 | const k8sLog = new k8s.Log(container.config);
|
700 | const logStream = new Writable({
|
701 | write: (chunk, encoding, callback) => {
|
702 | container.log.write(chunk.toString());
|
703 | callback();
|
704 | },
|
705 | });
|
706 | const doneCallback = e => {
|
707 | if (e) {
|
708 | if (e.message) {
|
709 | container.log.write(e.message);
|
710 | } else {
|
711 | container.log.write(stringify(e));
|
712 | }
|
713 | }
|
714 | };
|
715 | const logOptions: k8s.LogOptions = { follow: true };
|
716 | return k8sLog.log(container.ns, container.pod, container.name, logStream, doneCallback, logOptions);
|
717 | }
|