UNPKG

27.2 kBPlain TextView Raw
1/*
2 * Copyright © 2019 Atomist, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17import {
18 GitCommandGitProject,
19 GitProject,
20 guid,
21} from "@atomist/automation-client";
22import { sleep } from "@atomist/automation-client/lib/internal/util/poll";
23import {
24 ExecuteGoal,
25 GoalProjectListenerEvent,
26 GoalProjectListenerRegistration,
27 GoalScheduler,
28 GoalWithFulfillment,
29 ImplementationRegistration,
30 minimalClone,
31 ProgressLog,
32 RepoContext,
33 SdmGoalEvent,
34 SdmGoalState,
35 ServiceRegistrationGoalDataKey,
36} from "@atomist/sdm";
37import * as k8s from "@kubernetes/client-node";
38import * as fs from "fs-extra";
39import * as stringify from "json-stringify-safe";
40import * as _ from "lodash";
41import * as os from "os";
42import * as path from "path";
43import * as request from "request";
44import { Writable } from "stream";
45import {
46 DeepPartial,
47 Merge,
48} from "ts-essentials";
49import { loadKubeConfig } from "../../pack/k8s/config";
50import { DefaultKubernetesFulfillmentOptions } from "../../pack/k8s/KubernetesFulfillmentGoalScheduler";
51import {
52 k8sJobEnv,
53 KubernetesGoalScheduler,
54 readNamespace,
55} from "../../pack/k8s/KubernetesGoalScheduler";
56import {
57 K8sServiceRegistrationType,
58 K8sServiceSpec,
59} from "../../pack/k8s/service";
60import { toArray } from "../../util/misc/array";
61import {
62 CacheOutputGoalDataKey,
63 cachePut,
64 cacheRestore,
65} from "../cache/goalCaching";
66import {
67 Container,
68 ContainerInput,
69 ContainerOutput,
70 ContainerProjectHome,
71 ContainerRegistration,
72 ContainerRegistrationGoalDataKey,
73 ContainerScheduler,
74 GoalContainer,
75 GoalContainerVolume,
76} from "./container";
77import { prepareSecrets } from "./provider";
78import {
79 containerEnvVars,
80 prepareInputAndOutput,
81 processResult,
82} from "./util";
83
84// tslint:disable:max-file-line-count
85
86/** Merge of base and Kubernetes goal container interfaces. */
87export type K8sGoalContainer =
88 Merge<GoalContainer, DeepPartial<k8s.V1Container>>
89 & Pick<GoalContainer, "name" | "image">;
90/** Merge of base and Kubernetes goal container volume interfaces. */
91export type K8sGoalContainerVolume = Merge<k8s.V1Volume, GoalContainerVolume>;
92
93/** @deprecated use K8sContainerSpecCallback */
94export type K8sGoalContainerSpec = Omit<K8sContainerRegistration, "callback">;
95
96/**
97 * Function signature for callback that can modify and return the
98 * [[ContainerRegistration]] object.
99 */
100export type K8sContainerSpecCallback =
101 (r: K8sContainerRegistration, p: GitProject, g: Container, e: SdmGoalEvent, ctx: RepoContext) =>
102 Promise<Omit<K8sContainerRegistration, "callback">>;
103
104/**
105 * Additional options for Kubernetes implementation of container goals.
106 */
107export interface K8sContainerRegistration extends ContainerRegistration {
108 /**
109 * Replace generic containers in [[ContainerRegistration]] with
110 * Kubernetes containers.
111 *
112 * Containers to run for this goal. The goal result is based on
113 * the exit status of the first element of the `containers` array.
114 * The other containers are considered "sidecar" containers
115 * provided functionality that the main container needs to
116 * function. If not set, the working directory of the first
117 * container is set to [[ContainerProjectHome]], which contains
118 * the project upon which the goal should operate. If
119 * `workingDir` is set, it is not changed. If `workingDir` is set
120 * to the empty string, the `workingDir` property is deleted from
121 * the main container spec, meaning the container default working
122 * directory will be used.
123 */
124 containers: K8sGoalContainer[];
125 /**
126 * Replace generic callback in [[ContainerRegistration]] with
127 * Kubernetes-specific callback.
128 */
129 callback?: K8sContainerSpecCallback;
130 /**
131 * Init containers to run for this goal. Any containers provided
132 * here will run after the one inserted by the SDM to manage the
133 * cloned repository.
134 */
135 initContainers?: k8s.V1Container[];
136 /**
137 * Replace generic volumes in [[ContainerRegistration]] with
138 * Kubernetes volumes available to mount in containers.
139 */
140 volumes?: K8sGoalContainerVolume[];
141}
142
143export const k8sContainerScheduler: ContainerScheduler = (goal, registration: K8sContainerRegistration) => {
144 goal.addFulfillment({
145 goalExecutor: executeK8sJob(),
146 ...registration as ImplementationRegistration,
147 });
148
149 goal.addFulfillmentCallback({
150 goal,
151 callback: k8sFulfillmentCallback(goal, registration),
152 });
153};
154
155/**
156 * Add Kubernetes job scheduling information to SDM goal event data
157 * for use by the [[KubernetesGoalScheduler]].
158 */
159export function k8sFulfillmentCallback(
160 goal: Container,
161 registration: K8sContainerRegistration,
162): (sge: SdmGoalEvent, rc: RepoContext) => Promise<SdmGoalEvent> {
163 // tslint:disable-next-line:cyclomatic-complexity
164 return async (goalEvent, repoContext) => {
165 let spec: K8sContainerRegistration = _.cloneDeep(registration);
166 if (registration.callback) {
167 spec = await repoContext.configuration.sdm.projectLoader.doWithProject({
168 ...repoContext,
169 readOnly: true,
170 cloneOptions: minimalClone(goalEvent.push, { detachHead: true }),
171 }, async p => {
172 return {
173 ...spec,
174 ...(await registration.callback(_.cloneDeep(registration), p, goal, goalEvent, repoContext)) || {},
175 };
176 });
177 }
178
179 if (!spec.containers || spec.containers.length < 1) {
180 throw new Error("No containers defined in K8sGoalContainerSpec");
181 }
182
183 // Preserve the container registration in the goal data before it gets munged with internals
184 let data = parseGoalEventData(goalEvent);
185 let newData: any = {};
186 delete spec.callback;
187 _.set<any>(newData, ContainerRegistrationGoalDataKey, spec);
188 goalEvent.data = JSON.stringify(_.merge(data, newData));
189
190 if (spec.containers[0].workingDir === "") {
191 delete spec.containers[0].workingDir;
192 } else if (!spec.containers[0].workingDir) {
193 spec.containers[0].workingDir = ContainerProjectHome;
194 }
195
196 const goalSchedulers: GoalScheduler[] = toArray(repoContext.configuration.sdm.goalScheduler) || [];
197 const k8sScheduler = goalSchedulers.find(gs => gs instanceof KubernetesGoalScheduler) as KubernetesGoalScheduler;
198 if (!k8sScheduler) {
199 throw new Error("Failed to find KubernetesGoalScheduler in goal schedulers");
200 }
201 if (!k8sScheduler.podSpec) {
202 throw new Error("KubernetesGoalScheduler has no podSpec defined");
203 }
204
205 const containerEnvs = await containerEnvVars(goalEvent, repoContext);
206 const projectVolume = `project-${guid().split("-")[0]}`;
207 const inputVolume = `input-${guid().split("-")[0]}`;
208 const outputVolume = `output-${guid().split("-")[0]}`;
209 const ioVolumes = [
210 {
211 name: projectVolume,
212 emptyDir: {},
213 },
214 {
215 name: inputVolume,
216 emptyDir: {},
217 },
218 {
219 name: outputVolume,
220 emptyDir: {},
221 },
222 ];
223 const ioVolumeMounts = [
224 {
225 mountPath: ContainerProjectHome,
226 name: projectVolume,
227 },
228 {
229 mountPath: ContainerInput,
230 name: inputVolume,
231 },
232 {
233 mountPath: ContainerOutput,
234 name: outputVolume,
235 },
236 ];
237
238 const copyContainer = _.cloneDeep(k8sScheduler.podSpec.spec.containers[0]);
239 delete copyContainer.lifecycle;
240 delete copyContainer.livenessProbe;
241 delete copyContainer.readinessProbe;
242 copyContainer.name = `container-goal-init-${guid().split("-")[0]}`;
243 copyContainer.env = [
244 ...(copyContainer.env || []),
245 ...k8sJobEnv(k8sScheduler.podSpec, goalEvent, repoContext.context as any),
246 ...containerEnvs,
247 {
248 name: "ATOMIST_ISOLATED_GOAL_INIT",
249 value: "true",
250 },
251 {
252 name: "ATOMIST_CONFIG",
253 value: JSON.stringify({
254 cluster: {
255 enabled: false,
256 },
257 ws: {
258 enabled: false,
259 },
260 }),
261 },
262 ];
263 spec.initContainers = spec.initContainers || [];
264
265 const parameters = JSON.parse((goalEvent as any).parameters || "{}");
266 const secrets = await prepareSecrets(
267 _.merge({}, registration.containers[0], (parameters["@atomist/sdm/secrets"] || {})), repoContext);
268 delete spec.containers[0].secrets;
269 [...spec.containers, ...spec.initContainers].forEach(c => {
270 c.env = [
271 ...(secrets.env || []),
272 ...containerEnvs,
273 ...(c.env || []),
274 ];
275 });
276 if (!!secrets?.files) {
277 for (const file of secrets.files) {
278 const fileName = path.basename(file.mountPath);
279 const dirname = path.dirname(file.mountPath);
280 let secretName = `secret-${guid().split("-")[0]}`;
281
282 const vm = (copyContainer.volumeMounts || [])
283 .find(m => m.mountPath === dirname);
284 if (!!vm) {
285 secretName = vm.name;
286 } else {
287 copyContainer.volumeMounts = [
288 ...(copyContainer.volumeMounts || []),
289 {
290 mountPath: dirname,
291 name: secretName,
292 },
293 ];
294 spec.volumes = [
295 ...(spec.volumes || []),
296 {
297 name: secretName,
298 emptyDir: {},
299 } as any,
300 ];
301 }
302 [...spec.containers, ...spec.initContainers].forEach((c: k8s.V1Container) => {
303 c.volumeMounts = [
304 ...(c.volumeMounts || []),
305 {
306 mountPath: file.mountPath,
307 name: secretName,
308 subPath: fileName,
309 },
310 ];
311 });
312 }
313 }
314 spec.initContainers = [
315 copyContainer,
316 ...spec.initContainers,
317 ];
318
319 const serviceSpec: { type: string, spec: K8sServiceSpec } = {
320 type: K8sServiceRegistrationType.K8sService,
321 spec: {
322 container: spec.containers,
323 initContainer: spec.initContainers,
324 volume: [
325 ...ioVolumes,
326 ...(spec.volumes || []),
327 ],
328 volumeMount: ioVolumeMounts,
329 },
330 };
331
332 // Store k8s service registration in goal data
333 data = JSON.parse(goalEvent.data || "{}");
334 newData = {};
335 _.set<any>(newData, `${ServiceRegistrationGoalDataKey}.${registration.name}`, serviceSpec);
336 goalEvent.data = JSON.stringify(_.merge(data, newData));
337 return goalEvent;
338 };
339}
340
341/**
342 * Get container registration from goal event data, use
343 * [[k8sFulfillmentcallback]] to get a goal event schedulable by a
344 * [[KubernetesGoalScheduler]], then schedule the goal using that
345 * scheduler.
346 */
347export const scheduleK8sJob: ExecuteGoal = async gi => {
348 const { goalEvent } = gi;
349 const { uniqueName } = goalEvent;
350 const data = parseGoalEventData(goalEvent);
351 const containerReg: K8sContainerRegistration = data["@atomist/sdm/container"];
352 if (!containerReg) {
353 throw new Error(`Goal ${uniqueName} event data has no container spec: ${goalEvent.data}`);
354 }
355
356 const goalSchedulers: GoalScheduler[] = toArray(gi.configuration.sdm.goalScheduler) || [];
357 const k8sScheduler = goalSchedulers.find(gs => gs instanceof KubernetesGoalScheduler) as KubernetesGoalScheduler;
358 if (!k8sScheduler) {
359 throw new Error(`Failed to find KubernetesGoalScheduler in goal schedulers: ${stringify(goalSchedulers)}`);
360 }
361
362 // the k8sFulfillmentCallback may already have been called, so wipe it out
363 delete data[ServiceRegistrationGoalDataKey];
364 goalEvent.data = JSON.stringify(data);
365
366 try {
367 const schedulableGoalEvent = await k8sFulfillmentCallback(gi.goal as Container, containerReg)(goalEvent, gi);
368 const scheduleResult = await k8sScheduler.schedule({ ...gi, goalEvent: schedulableGoalEvent });
369 if (scheduleResult.code) {
370 return { ...scheduleResult, message: `Failed to schedule container goal ${uniqueName}: ${scheduleResult.message}` };
371 }
372 schedulableGoalEvent.state = SdmGoalState.in_process;
373 return schedulableGoalEvent;
374 } catch (e) {
375 const message = `Failed to schedule container goal ${uniqueName} as Kubernetes job: ${e.message}`;
376 gi.progressLog.write(message);
377 return { code: 1, message };
378 }
379};
380
381/** Container information useful the various functions. */
382interface K8sContainer {
383 /** Kubernetes configuration to use when creating API clients */
384 config: k8s.KubeConfig;
385 /** Name of container in pod */
386 name: string;
387 /** Pod name */
388 pod: string;
389 /** Pod namespace */
390 ns: string;
391 /** Log */
392 log: ProgressLog;
393}
394
395/**
396 * Wait for first container to exit and stream its logs to the
397 * progress log.
398 */
399export function executeK8sJob(): ExecuteGoal {
400 // tslint:disable-next-line:cyclomatic-complexity
401 return async gi => {
402 const { goalEvent, progressLog, configuration, id, credentials } = gi;
403
404 const projectDir = process.env.ATOMIST_PROJECT_DIR || ContainerProjectHome;
405 const inputDir = process.env.ATOMIST_INPUT_DIR || ContainerInput;
406 const outputDir = process.env.ATOMIST_OUTPUT_DIR || ContainerOutput;
407
408 const data = parseGoalEventData(goalEvent);
409 if (!data[ContainerRegistrationGoalDataKey]) {
410 throw new Error("Failed to read k8s ContainerRegistration from goal data");
411 }
412 if (!data[ContainerRegistrationGoalDataKey]) {
413 throw new Error(`Goal ${gi.goal.uniqueName} has no Kubernetes container registration: ${gi.goalEvent.data}`);
414 }
415 const registration: K8sContainerRegistration = data[ContainerRegistrationGoalDataKey];
416
417 if (process.env.ATOMIST_ISOLATED_GOAL_INIT === "true") {
418 return configuration.sdm.projectLoader.doWithProject({
419 ...gi,
420 readOnly: false,
421 cloneDir: projectDir,
422 cloneOptions: minimalClone(goalEvent.push, { detachHead: true }),
423 }, async project => {
424 try {
425 await prepareInputAndOutput(inputDir, outputDir, gi);
426 } catch (e) {
427 const message = `Failed to prepare input and output for goal ${goalEvent.name}: ${e.message}`;
428 progressLog.write(message);
429 return { code: 1, message };
430 }
431 const secrets = await prepareSecrets(
432 _.merge({}, registration.containers[0], ((gi.parameters || {})["@atomist/sdm/secrets"] || {})), gi);
433 if (!!secrets?.files) {
434 for (const file of secrets.files) {
435 await fs.writeFile(file.mountPath, file.value);
436 }
437 }
438
439 goalEvent.state = SdmGoalState.in_process;
440 return goalEvent;
441
442 });
443 }
444
445 let containerName: string = _.get(registration, "containers[0].name");
446 if (!containerName) {
447 const msg = `Failed to get main container name from goal registration: ${stringify(registration)}`;
448 progressLog.write(msg);
449 let svcSpec: K8sServiceSpec;
450 try {
451 svcSpec = _.get(data, `${ServiceRegistrationGoalDataKey}.${registration.name}.spec`);
452 } catch (e) {
453 const message = `Failed to parse Kubernetes spec from goal data '${goalEvent.data}': ${e.message}`;
454 progressLog.write(message);
455 return { code: 1, message };
456 }
457 containerName = _.get(svcSpec, "container[1].name");
458 if (!containerName) {
459 const message = `Failed to get main container name from either goal registration or data: '${goalEvent.data}'`;
460 progressLog.write(message);
461 return { code: 1, message };
462 }
463 }
464 const ns = await readNamespace();
465 const podName = os.hostname();
466
467 let kc: k8s.KubeConfig;
468 try {
469 kc = loadKubeConfig();
470 } catch (e) {
471 const message = `Failed to load Kubernetes configuration: ${e.message}`;
472 progressLog.write(message);
473 return { code: 1, message };
474 }
475
476 const container: K8sContainer = {
477 config: kc,
478 name: containerName,
479 pod: podName,
480 ns,
481 log: progressLog,
482 };
483
484 try {
485 await containerStarted(container);
486 } catch (e) {
487 progressLog.write(e.message);
488 return { code: 1, message: e.message };
489 }
490
491 const log = followK8sLog(container);
492
493 const status = { code: 0, message: `Container '${containerName}' completed successfully` };
494 try {
495 const podStatus = await containerWatch(container);
496 progressLog.write(`Container '${containerName}' exited: ${stringify(podStatus)}`);
497 } catch (e) {
498 const message = `Container '${containerName}' failed: ${e.message}`;
499 progressLog.write(message);
500 status.code++;
501 status.message = message;
502 } finally {
503 // Give the logs some time to be delivered
504 await sleep(1000);
505 log.abort();
506 }
507
508 const outputFile = path.join(outputDir, "result.json");
509 let outputResult;
510 if ((await fs.pathExists(outputFile)) && status.code === 0) {
511 try {
512 outputResult = await processResult(await fs.readJson(outputFile), gi);
513 } catch (e) {
514 const message = `Failed to read output from container: ${e.message}`;
515 progressLog.write(message);
516 status.code++;
517 status.message += ` but f${message.slice(1)}`;
518 }
519 }
520
521 const cacheEntriesToPut = [
522 ...(registration.output || []),
523 ...((gi.parameters || {})[CacheOutputGoalDataKey] || []),
524 ];
525 if (cacheEntriesToPut.length > 0) {
526 try {
527 const project = GitCommandGitProject.fromBaseDir(id, projectDir, credentials, async () => {
528 });
529 const cp = cachePut({ entries: cacheEntriesToPut });
530 await cp.listener(project, gi, GoalProjectListenerEvent.after);
531 } catch (e) {
532 const message = `Failed to put cache output from container: ${e.message}`;
533 progressLog.write(message);
534 status.code++;
535 status.message += ` but f${message.slice(1)}`;
536 }
537 }
538
539 return outputResult || status;
540 };
541}
542
543/**
544 * Read and parse container goal registration from goal event data.
545 */
546function parseGoalEventData(goalEvent: SdmGoalEvent): any {
547 const goalName = goalEvent.uniqueName;
548 if (!goalEvent || !goalEvent.data) {
549 return {};
550 }
551 let data: any;
552 try {
553 data = JSON.parse(goalEvent.data);
554 } catch (e) {
555 e.message = `Failed to parse goal event data for ${goalName} as JSON '${goalEvent.data}': ${e.message}`;
556 throw e;
557 }
558 return data;
559}
560
561/**
562 * If running as isolated goal, use [[executeK8sJob]] to execute the
563 * goal. Otherwise, schedule the goal execution as a Kubernetes job
564 * using [[scheduleK8sjob]].
565 */
566const containerExecutor: ExecuteGoal = gi => (process.env.ATOMIST_ISOLATED_GOAL) ? executeK8sJob()(gi) : scheduleK8sJob(gi);
567
568/**
569 * Restore cache input entries before fulfilling goal.
570 */
571const containerFulfillerCacheRestore: GoalProjectListenerRegistration = {
572 name: "ContainerFulfillerCacheRestore",
573 events: [GoalProjectListenerEvent.before],
574 listener: async (project, gi, event) => {
575 const data = parseGoalEventData(gi.goalEvent);
576 if (!data[ContainerRegistrationGoalDataKey]) {
577 throw new Error(`Goal ${gi.goal.uniqueName} has no Kubernetes container registration: ${gi.goalEvent.data}`);
578 }
579 const registration: K8sContainerRegistration = data[ContainerRegistrationGoalDataKey];
580 if (registration.input && registration.input.length > 0) {
581 try {
582 const cp = cacheRestore({ entries: registration.input });
583 return cp.listener(project, gi, GoalProjectListenerEvent.before);
584 } catch (e) {
585 const message = `Failed to restore cache input to container for goal ${gi.goal.uniqueName}: ${e.message}`;
586 gi.progressLog.write(message);
587 return { code: 1, message };
588 }
589 } else {
590 return { code: 0, message: "No container input cache entries to restore" };
591 }
592 },
593};
594
595/**
596 * Goal that fulfills requested container goals by scheduling them as
597 * Kubernetes jobs.
598 */
599export const K8sContainerFulfiller = new GoalWithFulfillment({
600 displayName: "Kubernetes Container Goal Fulfiller",
601 uniqueName: DefaultKubernetesFulfillmentOptions.name,
602})
603 .with({
604 goalExecutor: containerExecutor,
605 name: `${DefaultKubernetesFulfillmentOptions.name}-executor`,
606 })
607 .withProjectListener(containerFulfillerCacheRestore);
608
609/**
610 * Wait for container in pod to start, return when it does.
611 *
612 * @param container Information about container to check
613 * @param attempts Maximum number of attempts, waiting 500 ms between
614 */
615async function containerStarted(container: K8sContainer, attempts: number = 240): Promise<void> {
616 let core: k8s.CoreV1Api;
617 try {
618 core = container.config.makeApiClient(k8s.CoreV1Api);
619 } catch (e) {
620 e.message = `Failed to create Kubernetes core API client: ${e.message}`;
621 container.log.write(e.message);
622 throw e;
623 }
624
625 const sleepTime = 500; // ms
626 for (let i = 0; i < attempts; i++) {
627 await sleep(500);
628 const pod = (await core.readNamespacedPod(container.pod, container.ns)).body;
629 const containerStatus = pod.status.containerStatuses.find(c => c.name === container.name);
630 if (containerStatus && (!!_.get(containerStatus, "state.running.startedAt") || !!_.get(containerStatus, "state.terminated"))) {
631 const message = `Container '${container.name}' started`;
632 container.log.write(message);
633 return;
634 }
635 }
636
637 const errMsg = `Container '${container.name}' failed to start within ${attempts * sleepTime} ms`;
638 container.log.write(errMsg);
639 throw new Error(errMsg);
640}
641
642/**
643 * Watch pod until container `container.name` exits. Resolve promise
644 * with status if container `container.name` exits with status 0.
645 * Reject promise otherwise, including pod status in the `podStatus`
646 * property of the error.
647 *
648 * @param container Information about container to watch
649 * @return Status of pod after container terminates
650 */
651function containerWatch(container: K8sContainer): Promise<k8s.V1PodStatus> {
652 return new Promise((resolve, reject) => {
653 let watch: k8s.Watch;
654 try {
655 watch = new k8s.Watch(container.config);
656 } catch (e) {
657 e.message = `Failed to create Kubernetes watch client: ${e.message}`;
658 container.log.write(e.message);
659 reject(e);
660 }
661 const watchPath = `/api/v1/watch/namespaces/${container.ns}/pods/${container.pod}`;
662 let watcher: any;
663 watcher = watch.watch(watchPath, {}, (phase, obj) => {
664 const pod = obj as k8s.V1Pod;
665 if (pod && pod.status && pod.status.containerStatuses) {
666 const containerStatus = pod.status.containerStatuses.find(c => c.name === container.name);
667 if (containerStatus && containerStatus.state && containerStatus.state.terminated) {
668 const exitCode: number = _.get(containerStatus, "state.terminated.exitCode");
669 if (exitCode === 0) {
670 const msg = `Container '${container.name}' exited with status 0`;
671 container.log.write(msg);
672 resolve(pod.status);
673 } else {
674 const msg = `Container '${container.name}' exited with status ${exitCode}`;
675 container.log.write(msg);
676 const err = new Error(msg);
677 (err as any).podStatus = pod.status;
678 reject(err);
679 }
680 if (watcher) {
681 watcher.abort();
682 }
683 return;
684 }
685 }
686 container.log.write(`Container '${container.name}' still running`);
687 }, err => {
688 err.message = `Container watcher failed: ${err.message}`;
689 container.log.write(err.message);
690 reject(err);
691 });
692 });
693}
694
695/**
696 * Set up log follower for container.
697 */
698function followK8sLog(container: K8sContainer): request.Request {
699 const k8sLog = new k8s.Log(container.config);
700 const logStream = new Writable({
701 write: (chunk, encoding, callback) => {
702 container.log.write(chunk.toString());
703 callback();
704 },
705 });
706 const doneCallback = e => {
707 if (e) {
708 if (e.message) {
709 container.log.write(e.message);
710 } else {
711 container.log.write(stringify(e));
712 }
713 }
714 };
715 const logOptions: k8s.LogOptions = { follow: true };
716 return k8sLog.log(container.ns, container.pod, container.name, logStream, doneCallback, logOptions);
717}