Skip to content

Commit 1b24e6f

Browse files
fix: delete k8s jobs after failure (important in setups where the system attaches sidecar pods, which can otherwise remain after failure) (#49)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Bug Fixes** - Improved error handling and logging for job status checks and failures. - Enhanced robustness when dealing with missing or failed Kubernetes jobs. - Refined pod log retrieval and job/pod cleanup processes for better reliability. - **Refactor** - Updated job status checking to use asynchronous processing for better performance and reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent bb5ccca commit 1b24e6f

File tree

1 file changed

+71
-39
lines changed

1 file changed

+71
-39
lines changed

snakemake_executor_plugin_kubernetes/__init__.py

Lines changed: 71 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import shlex
55
import subprocess
66
import time
7-
from typing import List, Generator, Optional, Self
7+
from typing import Any, AsyncGenerator, List, Optional, Self
88
import uuid
99

1010
import kubernetes
@@ -388,15 +388,15 @@ def run_job(self, job: JobExecutorInterface):
388388
self.logger.error(f"Failed to create pod: {e}")
389389
raise WorkflowError(f"Failed to create pod: {e}")
390390

391-
self.logger.info("Get status with:\n" "kubectl describe job {jobid}\n")
391+
self.logger.info(f"Get status with: kubectl describe job {jobid}")
392392

393393
self.report_job_submission(
394394
SubmittedJobInfo(job=job, external_jobid=jobid, aux={"pod": pod})
395395
)
396396

397397
async def check_active_jobs(
398398
self, active_jobs: List[SubmittedJobInfo]
399-
) -> Generator[SubmittedJobInfo, None, None]:
399+
) -> AsyncGenerator[SubmittedJobInfo, None]:
400400
# Check the status of active jobs.
401401

402402
# You have to iterate over the given list active_jobs.
@@ -416,24 +416,23 @@ async def check_active_jobs(
416416
async with self.status_rate_limiter:
417417
try:
418418
res = self._kubernetes_retry(
419-
lambda: self.batchapi.read_namespaced_job_status(
419+
lambda j=j: self.batchapi.read_namespaced_job_status(
420420
j.external_jobid, self.namespace
421421
)
422422
)
423423
except kubernetes.client.rest.ApiException as e:
424-
if e.status == 404:
425-
# Jobid not found
426-
# The job is likely already done and was deleted on
427-
# the server.
428-
j.callback(j.job)
429-
continue
430-
else:
431-
self.logger.error(f"ApiException when checking pod status: {e}")
432-
self.report_job_error(j, msg=str(e))
433-
continue
424+
self.logger.error(f"ApiException when checking pod status: {e}")
425+
continue
434426
except WorkflowError as e:
435427
self.logger.error(f"WorkflowError when checking pod status: {e}")
436-
self.report_job_error(j, msg=str(e))
428+
continue
429+
430+
if res is None:
431+
msg = (
432+
"Unknown job {jobid}. Has the job been deleted manually?"
433+
).format(jobid=j.external_jobid)
434+
self.logger.error(msg)
435+
self.report_job_error(j, msg=msg)
437436
continue
438437

439438
# Sometimes, just checking the status of a job is not enough, because
@@ -442,9 +441,11 @@ async def check_active_jobs(
442441
# that a pod is already terminated.
443442
# We therefore check the status of the snakemake container in addition
444443
# to the job status.
445-
pods = self.kubeapi.list_namespaced_pod(
446-
namespace=self.namespace,
447-
label_selector=f"job-name={j.external_jobid}",
444+
pods = self._kubernetes_retry(
445+
lambda j=j: self.kubeapi.list_namespaced_pod(
446+
namespace=self.namespace,
447+
label_selector=f"job-name={j.external_jobid}",
448+
)
448449
)
449450
assert len(pods.items) <= 1
450451
if pods.items:
@@ -459,42 +460,59 @@ async def check_active_jobs(
459460
if snakemake_container.state.terminated is not None
460461
else None
461462
)
463+
pod_name = pod.metadata.name
462464
else:
463465
snakemake_container = None
464466
snakemake_container_exit_code = None
467+
pod_name = None
465468

466-
if res is None:
467-
msg = (
468-
"Unknown job {jobid}. Has the job been deleted manually?"
469-
).format(jobid=j.external_jobid)
470-
self.logger.error(msg)
471-
self.report_job_error(j, msg=msg)
472-
elif res.status.failed == 1 or (
469+
if (res.status.failed and res.status.failed > 0) or (
473470
snakemake_container_exit_code is not None
474471
and snakemake_container_exit_code != 0
475472
):
476473
msg = (
477474
"For details, please issue:\n"
478475
f"kubectl describe job {j.external_jobid}"
479476
)
480-
# failed
481-
kube_log = self.log_path / f"{j.external_jobid}.log"
482-
with open(kube_log, "w") as f:
483-
kube_log_content = self.kubeapi.read_namespaced_pod_log(
484-
name=pod.metadata.name,
485-
namespace=self.namespace,
486-
container=snakemake_container.name,
487-
)
488-
print(kube_log_content, file=f)
477+
478+
if pod_name is not None:
479+
assert snakemake_container is not None
480+
kube_log = self.log_path / f"{j.external_jobid}.log"
481+
with open(kube_log, "w") as f:
482+
483+
def read_log(
484+
pod_name=pod_name,
485+
container_name=snakemake_container.name,
486+
):
487+
return self.kubeapi.read_namespaced_pod_log(
488+
name=pod_name,
489+
namespace=self.namespace,
490+
container=container_name,
491+
)
492+
493+
kube_log_content = self._kubernetes_retry(read_log)
494+
print(kube_log_content, file=f)
495+
aux_logs = [str(kube_log)]
496+
else:
497+
aux_logs = []
498+
489499
self.logger.error(f"Job {j.external_jobid} failed. {msg}")
490-
self.report_job_error(j, msg=msg, aux_logs=[str(kube_log)])
491-
elif res.status.succeeded == 1 or (snakemake_container_exit_code == 0):
500+
self.report_job_error(j, msg=msg, aux_logs=aux_logs)
501+
502+
self._kubernetes_retry(
503+
lambda j=j: self.safe_delete_job(
504+
j.external_jobid, ignore_not_found=True
505+
)
506+
)
507+
elif (res.status.succeeded and res.status.succeeded >= 1) or (
508+
snakemake_container_exit_code == 0
509+
):
492510
# finished
493511
self.logger.info(f"Job {j.external_jobid} succeeded.")
494512
self.report_job_success(j)
495513

496514
self._kubernetes_retry(
497-
lambda: self.safe_delete_job(
515+
lambda j=j: self.safe_delete_job(
498516
j.external_jobid, ignore_not_found=True
499517
)
500518
)
@@ -558,13 +576,27 @@ def safe_delete_job(self, jobid, ignore_not_found=True):
558576
import kubernetes.client
559577

560578
body = kubernetes.client.V1DeleteOptions()
579+
self.logger.debug(f"Deleting job {jobid} in namespace {self.namespace}")
561580
try:
581+
# Usually, kubernetes should delete the pods automatically
582+
# when the job is deleted, but in some cases, this does not
583+
# happen, so we delete the pods manually.
584+
pods = self.kubeapi.list_namespaced_pod(
585+
namespace=self.namespace,
586+
label_selector=f"job-name={jobid}",
587+
)
588+
for pod in pods.items:
589+
self.logger.debug(f"Deleting pod {pod.metadata.name} for job {jobid}")
590+
self.kubeapi.delete_namespaced_pod(
591+
pod.metadata.name, self.namespace, body=body
592+
)
593+
562594
self.batchapi.delete_namespaced_job(
563595
jobid, self.namespace, propagation_policy="Foreground", body=body
564596
)
565597
except kubernetes.client.rest.ApiException as e:
566598
if e.status == 404 and ignore_not_found:
567-
self.logger.warning(
599+
self.logger.debug(
568600
"[WARNING] 404 not found when trying to delete the job: {jobid}\n"
569601
"[WARNING] Ignore this error\n".format(jobid=jobid)
570602
)
@@ -602,7 +634,7 @@ def _reauthenticate_and_retry(self, func=None):
602634
if func:
603635
return func()
604636

605-
def _kubernetes_retry(self, func):
637+
def _kubernetes_retry(self, func) -> Any:
606638
import kubernetes
607639
import urllib3
608640

0 commit comments

Comments
 (0)