Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/further.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ The SLURM executor plugin supports automatic partition selection based on job re

*Jobs that explicitly specify a `slurm_partition` resource will bypass automatic selection and use the specified partition directly.*

> **Note:** Partition selection supports specifying the target cluster using any of the resource keys `cluster`, `clusters`, or `slurm_cluster` in your workflow profile or the partition configuration file. All three are treated equivalently by the plugin.

##### Partition Limits Specification

To enable automatic partition selection, create a YAML configuration file that defines the available partitions and their resource limits. This file should be structured as follows:
Expand Down
38 changes: 36 additions & 2 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,10 +868,44 @@ def get_partition_arg(self, job: JobExecutorInterface):
else raises an error - implicetly.
"""
partition = None

# Check if a specific partition is requested
if job.resources.get("slurm_partition"):
partition = job.resources.slurm_partition
elif self._partitions:
# But also check if there's a cluster requirement that might override it
job_cluster = (
job.resources.get("slurm_cluster")
or job.resources.get("cluster")
or job.resources.get("clusters")
)
Comment on lines +875 to +879
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent cluster key priority order with submit_string.py.

The resolution order here (slurm_clusterclusterclusters) differs from submit_string.py (lines 48-51) which uses (clusterclustersslurm_cluster). If a job specifies multiple cluster keys with different values, partition validation may use a different cluster than what's passed to sbatch.

             job_cluster = (
-                job.resources.get("slurm_cluster")
-                or job.resources.get("cluster")
+                job.resources.get("cluster")
                 or job.resources.get("clusters")
+                or job.resources.get("slurm_cluster")
             )
🤖 Prompt for AI Agents
In snakemake_executor_plugin_slurm/__init__.py around lines 875 to 879, the
cluster key lookup currently prefers slurm_cluster before cluster and clusters,
which is inconsistent with submit_string.py; change the resolution order to
match submit_string.py by checking job.resources for "cluster" then "clusters"
then "slurm_cluster" so partition validation uses the same cluster value passed
to sbatch, and update any related comments or docstrings to reflect the
canonical priority.


if job_cluster and self._partitions:
# If a cluster is specified, verify the partition exists and matches
# Otherwise, use auto-selection to find a partition for that cluster
partition_obj = next(
(
p
for p in self._partitions
if p.name == job.resources.slurm_partition
),
None,
)
if (
partition_obj
and partition_obj.partition_cluster
and partition_obj.partition_cluster != job_cluster
):
# Partition exists but is for a different cluster:
# use auto-selection
partition = get_best_partition(self._partitions, job, self.logger)
else:
partition = job.resources.slurm_partition
else:
partition = job.resources.slurm_partition

# If no partition was selected yet, try auto-selection
if not partition and self._partitions:
partition = get_best_partition(self._partitions, job, self.logger)

# we didnt get a partition yet so try fallback.
if not partition:
if self._fallback_partition is None:
Expand Down
27 changes: 19 additions & 8 deletions snakemake_executor_plugin_slurm/partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ def read_partition_file(partition_file: Path) -> List["Partition"]:
raise KeyError("Partition name cannot be empty")

# Extract optional cluster name from partition config
cluster = partition_config.pop("cluster", None)
cluster = None
for key in ("slurm_cluster", "cluster", "clusters"):
if key in partition_config:
cluster = partition_config.pop(key)
break

out.append(
Partition(
name=partition_name,
cluster=cluster,
partition_cluster=cluster,
limits=PartitionLimits(**partition_config),
)
)
Expand All @@ -60,7 +64,7 @@ def get_best_partition(
for p in candidate_partitions:
score = p.score_job_fit(job)
logger.debug(f"Partition '{p.name}' score for job {job.name}: {score}")
if score is not None:
if score is not None and score > 0:
scored_partitions.append((p, score))

if scored_partitions:
Expand Down Expand Up @@ -241,7 +245,7 @@ class Partition:

name: str
limits: PartitionLimits
cluster: Optional[str] = None
partition_cluster: Optional[str] = None

def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]:
"""
Expand Down Expand Up @@ -269,14 +273,21 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]:
# Accept multiple possible resource names for cluster specification
job_cluster = (
job.resources.get("slurm_cluster")
or job.resources.get("clusters")
or job.resources.get("cluster")
or job.resources.get("clusters")
)

# Enforce strict cluster eligibility:
# - If the job specifies a cluster, only partitions with a matching cluster
# are eligible
# - If the job does not specify a cluster, only partitions without a cluster
# are eligible
if job_cluster is not None:
# Job specifies a cluster - partition must match
if self.cluster is not None and self.cluster != job_cluster:
return None # Partition is for a different cluster
if self.partition_cluster != job_cluster:
return None # Not eligible
else:
if self.partition_cluster is not None:
return None # Not eligible

for resource_key, limit in numerical_resources.items():
job_requirement = job.resources.get(resource_key, 0)
Expand Down
12 changes: 10 additions & 2 deletions snakemake_executor_plugin_slurm/submit_string.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ def get_submit_command(job, params):
# "- p '{partition_name}'"
call += f" {params.partition}"

if job.resources.get("clusters"):
call += f" --clusters {safe_quote(job.resources.clusters)}"
# Add cluster specification if provided
# Check for cluster first (singular), then fall back to clusters (plural)
# for backwards compatibility
cluster_val = (
job.resources.get("cluster")
or job.resources.get("clusters")
or job.resources.get("slurm_cluster")
)
if cluster_val:
call += f" --cluster {safe_quote(cluster_val)}"

if job.resources.get("runtime"):
call += f" -t {safe_quote(job.resources.runtime)}"
Expand Down
60 changes: 54 additions & 6 deletions tests/test_partition_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,8 @@ def test_cluster_specification_via_slurm_cluster(
# Verify cluster field is read correctly
# Find partitions by name instead of assuming order
partition_map = {p.name: p for p in partitions}
assert partition_map["normal-small"].cluster == "normal"
assert partition_map["deviating-small"].cluster == "deviating"
assert partition_map["normal-small"].partition_cluster == "normal"
assert partition_map["deviating-small"].partition_cluster == "deviating"

# Job targeting 'normal' cluster
job = mock_job(threads=16, mem_mb=32000, slurm_cluster="normal")
Expand Down Expand Up @@ -664,10 +664,59 @@ def test_cluster_mismatch_excludes_partitions(
finally:
temp_path.unlink()

def test_job_with_cluster_does_not_select_partition_without_cluster(
self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger
):
"""
Test that jobs with a cluster requirement do not select partitions
without a cluster.
"""
config = dict(multicluster_partition_config)
config["partitions"]["no-cluster"] = {
"max_runtime": 360,
"max_threads": 32,
"max_mem_mb": 64000,
}
temp_path = temp_yaml_file(config)
try:
partitions = read_partition_file(temp_path)
job = mock_job(threads=16, mem_mb=32000, slurm_cluster="normal")
selected_partition = get_best_partition(partitions, job, mock_logger)
# Should select a partition with cluster 'normal', not 'no-cluster'
assert selected_partition in ["normal-small", "normal-large"]
finally:
temp_path.unlink()

def test_job_without_cluster_can_select_partition_without_cluster(
self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger
):
"""
Test that jobs without cluster requirement can select partitions
without a cluster.
"""
config = dict(multicluster_partition_config)
config["partitions"]["no-cluster"] = {
"max_runtime": 360,
"max_threads": 32,
"max_mem_mb": 64000,
}
temp_path = temp_yaml_file(config)
try:
partitions = read_partition_file(temp_path)
job = mock_job(threads=16, mem_mb=32000)
selected_partition = get_best_partition(partitions, job, mock_logger)
# Should be able to select 'no-cluster' partition
assert selected_partition in ["normal-small", "normal-large", "no-cluster"]
finally:
temp_path.unlink()

def test_job_without_cluster_uses_any_partition(
self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger
):
"""Test that jobs without cluster specification can use any partition."""
"""
Test that jobs without cluster specification can use any partition
without a cluster assignment.
"""
temp_path = temp_yaml_file(multicluster_partition_config)

try:
Expand All @@ -677,9 +726,8 @@ def test_job_without_cluster_uses_any_partition(
job = mock_job(threads=16, mem_mb=32000)
selected_partition = get_best_partition(partitions, job, mock_logger)

# Should select a partition (any cluster is acceptable)
assert selected_partition is not None
assert mock_logger.info.call_count >= 1
# Should return None since all partitions have a cluster assignment
assert selected_partition is None
finally:
temp_path.unlink()

Expand Down