Skip to content

Commit a4cbe36

Browse files
cademirchcmeesters
andauthored
fix: efficiency report jobsteps (#338)
Fixes #337. This PR makes sure for each job in sacct output, only the jobstep is reported and that each record has rulename and memrequested. Also refactors efficiency report slightly to allow unit testing of parsing sacct output. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Refactor** * Split the efficiency report into separate data-fetching and parsing stages for more reliable SLURM data handling. * Improved handling of job steps with inheritance of job metadata (rule names, requested memory) and robust CPU/memory efficiency calculations with low-efficiency warnings. * **Tests** * Added a test verifying SLURM output parsing, metadata inheritance for job steps, and correct memory/efficiency reporting. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Christian Meesters <[email protected]>
1 parent d8d605c commit a4cbe36

File tree

2 files changed

+83
-19
lines changed

2 files changed

+83
-19
lines changed

snakemake_executor_plugin_slurm/efficiency_report.py

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,10 @@ def parse_reqmem(reqmem, number_of_nodes=1):
5555
return 0
5656

5757

58-
def create_efficiency_report(e_threshold, run_uuid, e_report_path, logger):
59-
"""
60-
Fetch sacct job data for a Snakemake workflow
61-
and compute efficiency metrics.
62-
"""
58+
def get_sacct_data(run_uuid, logger):
59+
"""Fetch raw sacct data for a workflow."""
6360
cmd = f"sacct --name={run_uuid} --parsable2 --noheader"
64-
cmd += (
65-
" --format=JobID,JobName,Comment,Elapsed,TotalCPU," "NNodes,NCPUS,MaxRSS,ReqMem"
66-
)
61+
cmd += " --format=JobID,JobName,Comment,Elapsed,TotalCPU,NNodes,NCPUS,MaxRSS,ReqMem"
6762

6863
try:
6964
result = subprocess.run(
@@ -74,12 +69,14 @@ def create_efficiency_report(e_threshold, run_uuid, e_report_path, logger):
7469
logger.warning(f"No job data found for workflow {run_uuid}.")
7570
return None
7671
lines = raw.split("\n")
72+
return lines
7773

7874
except subprocess.CalledProcessError:
7975
logger.error(f"Failed to retrieve job data for workflow {run_uuid}.")
8076
return None
8177

82-
# Convert to DataFrame
78+
79+
def parse_sacct_data(lines, e_threshold, run_uuid, logger):
8380
df = pd.DataFrame(
8481
(line.split("|") for line in lines),
8582
columns=[
@@ -120,20 +117,44 @@ def create_efficiency_report(e_threshold, run_uuid, e_report_path, logger):
120117
df["Elapsed_sec"] = df["Elapsed"].apply(time_to_seconds)
121118
df["TotalCPU_sec"] = df["TotalCPU"].apply(time_to_seconds)
122119

123-
# Compute CPU efficiency
124-
df["CPU Efficiency (%)"] = (
125-
df["TotalCPU_sec"]
126-
/ (df["Elapsed_sec"].clip(lower=1) * df["NCPUS"].clip(lower=1))
127-
) * 100
128-
df.replace([np.inf, -np.inf], 0, inplace=True)
129-
130120
# Convert MaxRSS
131121
df["MaxRSS_MB"] = df["MaxRSS"].apply(parse_maxrss)
132122

133123
# Convert ReqMem and calculate memory efficiency
134124
df["RequestedMem_MB"] = df.apply(
135125
lambda row: parse_reqmem(row["ReqMem"], row["NNodes"]), axis=1
136126
)
127+
128+
# Drop all rows containing "batch" or "extern" as job names
129+
df = df[~df["JobName"].str.contains("batch|extern", na=False)]
130+
131+
# Extract main job ID for grouping
132+
df["MainJobID"] = df["JobID"].str.extract(r"^(\d+)", expand=False)
133+
134+
# Separate main jobs and job steps
135+
main_jobs = df[~df["JobID"].str.contains(r"\.\d+", regex=True)].copy()
136+
job_steps = df[df["JobID"].str.contains(r"\.\d+", regex=True)].copy()
137+
138+
# Create maps from main jobs for inheritance
139+
if not nocomment:
140+
rule_name_map = main_jobs.set_index("MainJobID")["RuleName"].to_dict()
141+
mem_map = main_jobs.set_index("MainJobID")["RequestedMem_MB"].to_dict()
142+
143+
# Inherit data from main jobs to job steps
144+
if not nocomment:
145+
job_steps["RuleName"] = job_steps["MainJobID"].map(rule_name_map).fillna("")
146+
job_steps["RequestedMem_MB"] = job_steps["MainJobID"].map(mem_map).fillna(0)
147+
148+
# Use job steps as the final dataset (they have the actual resource usage)
149+
df = job_steps.copy()
150+
151+
# Compute CPU efficiency
152+
df["CPU Efficiency (%)"] = (
153+
df["TotalCPU_sec"]
154+
/ (df["Elapsed_sec"].clip(lower=1) * df["NCPUS"].clip(lower=1))
155+
) * 100
156+
df.replace([np.inf, -np.inf], 0, inplace=True)
157+
137158
df["Memory Usage (%)"] = df.apply(
138159
lambda row: (
139160
(row["MaxRSS_MB"] / row["RequestedMem_MB"] * 100)
@@ -145,9 +166,6 @@ def create_efficiency_report(e_threshold, run_uuid, e_report_path, logger):
145166

146167
df["Memory Usage (%)"] = df["Memory Usage (%)"].fillna(0).round(2)
147168

148-
# Drop all rows containing "batch" or "extern" as job names
149-
df = df[~df["JobName"].str.contains("batch|extern", na=False)]
150-
151169
# Log warnings for low efficiency
152170
for _, row in df.iterrows():
153171
if row["CPU Efficiency (%)"] < e_threshold:
@@ -164,6 +182,20 @@ def create_efficiency_report(e_threshold, run_uuid, e_report_path, logger):
164182
f"({row['JobName']}) has low CPU efficiency: "
165183
f"{row['CPU Efficiency (%)']}%."
166184
)
185+
return df
186+
187+
188+
def create_efficiency_report(e_threshold, run_uuid, e_report_path, logger):
189+
"""
190+
Fetch sacct job data for a Snakemake workflow
191+
and compute efficiency metrics.
192+
"""
193+
lines = get_sacct_data(run_uuid, logger)
194+
195+
if lines is None or not lines:
196+
return None
197+
198+
df = parse_sacct_data(lines, e_threshold, run_uuid, logger)
167199

168200
# we construct a path object to allow for a customi
169201
# logdir, if specified

tests/tests.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import pytest
1010

1111
from snakemake_executor_plugin_slurm import ExecutorSettings
12+
from snakemake_executor_plugin_slurm.efficiency_report import parse_sacct_data
1213
from snakemake_executor_plugin_slurm.utils import set_gres_string
1314
from snakemake_executor_plugin_slurm.submit_string import get_submit_command
1415
from snakemake_interface_common.exceptions import WorkflowError
@@ -27,6 +28,37 @@ def get_executor_settings(self) -> Optional[ExecutorSettingsBase]:
2728
)
2829

2930

31+
def test_parse_sacct_data():
32+
from io import StringIO
33+
34+
test_data = [
35+
"10294159|b10191d0-6985-4c3a-8ccb-"
36+
"aa7d23ebffc7|rule_bam_bwa_mem_mosdepth_"
37+
"simulate_reads|00:01:31|00:24.041|1|1||32000M",
38+
"10294159.batch|batch||00:01:31|00:03.292|1|1|71180K|",
39+
"10294159.0|python3.12||00:01:10|00:20.749|1|1|183612K|",
40+
"10294160|b10191d0-6985-4c3a-8ccb-"
41+
"aa7d23ebffc7|rule_bam_bwa_mem_mosdepth_"
42+
"simulate_reads|00:01:30|00:24.055|1|1||32000M",
43+
"10294160.batch|batch||00:01:30|00:03.186|1|1|71192K|",
44+
"10294160.0|python3.12||00:01:10|00:20.868|1|1|184352K|",
45+
]
46+
df = parse_sacct_data(
47+
lines=test_data, e_threshold=0.0, run_uuid="test", logger=None
48+
)
49+
output = StringIO()
50+
df.to_csv(output, index=False)
51+
print(output.getvalue())
52+
# this should only be two rows once collapsed
53+
assert len(df) == 2
54+
# check that RuleName is properly inherited from main jobs
55+
assert all(df["RuleName"] == "rule_bam_bwa_mem_mosdepth_simulate_reads")
56+
# check that RequestedMem_MB is properly inherited
57+
assert all(df["RequestedMem_MB"] == 32000.0)
58+
# check that MaxRSS_MB is properly calculated from job steps
59+
assert df.iloc[0]["MaxRSS_MB"] > 0 # Should have actual memory usage from job step
60+
61+
3062
class TestEfficiencyReport(snakemake.common.tests.TestWorkflowsLocalStorageBase):
3163
__test__ = True
3264

0 commit comments

Comments
 (0)