diff --git a/docs/further.md b/docs/further.md index 18545b1..181b403 100644 --- a/docs/further.md +++ b/docs/further.md @@ -70,6 +70,8 @@ The SLURM executor plugin supports automatic partition selection based on job re *Jobs that explicitly specify a `slurm_partition` resource will bypass automatic selection and use the specified partition directly.* +> **Note:** Partition selection supports specifying the target cluster using any of the resource keys `cluster`, `clusters`, or `slurm_cluster` in your workflow profile or the partition configuration file. All three are treated equivalently by the plugin. + ##### Partition Limits Specification To enable automatic partition selection, create a YAML configuration file that defines the available partitions and their resource limits. This file should be structured as follows: diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 2abb5ba..0ca00f6 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -868,10 +868,44 @@ def get_partition_arg(self, job: JobExecutorInterface): else raises an error - implicetly. """ partition = None + + # Check if a specific partition is requested if job.resources.get("slurm_partition"): - partition = job.resources.slurm_partition - elif self._partitions: + # But also check if there's a cluster requirement that might override it + job_cluster = ( + job.resources.get("slurm_cluster") + or job.resources.get("cluster") + or job.resources.get("clusters") + ) + + if job_cluster and self._partitions: + # If a cluster is specified, verify the partition exists and matches + # Otherwise, use auto-selection to find a partition for that cluster + partition_obj = next( + ( + p + for p in self._partitions + if p.name == job.resources.slurm_partition + ), + None, + ) + if ( + partition_obj + and partition_obj.partition_cluster + and partition_obj.partition_cluster != job_cluster + ): + # Partition exists but is for a different cluster: + # use auto-selection + partition = get_best_partition(self._partitions, job, self.logger) + else: + partition = job.resources.slurm_partition + else: + partition = job.resources.slurm_partition + + # If no partition was selected yet, try auto-selection + if not partition and self._partitions: partition = get_best_partition(self._partitions, job, self.logger) + # we didnt get a partition yet so try fallback. if not partition: if self._fallback_partition is None: diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index 668141f..4be329b 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -39,12 +39,16 @@ def read_partition_file(partition_file: Path) -> List["Partition"]: raise KeyError("Partition name cannot be empty") # Extract optional cluster name from partition config - cluster = partition_config.pop("cluster", None) + cluster = None + for key in ("slurm_cluster", "cluster", "clusters"): + if key in partition_config: + cluster = partition_config.pop(key) + break out.append( Partition( name=partition_name, - cluster=cluster, + partition_cluster=cluster, limits=PartitionLimits(**partition_config), ) ) @@ -60,7 +64,7 @@ def get_best_partition( for p in candidate_partitions: score = p.score_job_fit(job) logger.debug(f"Partition '{p.name}' score for job {job.name}: {score}") - if score is not None: + if score is not None and score > 0: scored_partitions.append((p, score)) if scored_partitions: @@ -241,7 +245,7 @@ class Partition: name: str limits: PartitionLimits - cluster: Optional[str] = None + partition_cluster: Optional[str] = None def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: """ @@ -269,14 +273,21 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: # Accept multiple possible resource names for cluster specification job_cluster = ( job.resources.get("slurm_cluster") - or job.resources.get("clusters") or job.resources.get("cluster") + or job.resources.get("clusters") ) + # Enforce strict cluster eligibility: + # - If the job specifies a cluster, only partitions with a matching cluster + # are eligible + # - If the job does not specify a cluster, only partitions without a cluster + # are eligible if job_cluster is not None: - # Job specifies a cluster - partition must match - if self.cluster is not None and self.cluster != job_cluster: - return None # Partition is for a different cluster + if self.partition_cluster != job_cluster: + return None # Not eligible + else: + if self.partition_cluster is not None: + return None # Not eligible for resource_key, limit in numerical_resources.items(): job_requirement = job.resources.get(resource_key, 0) diff --git a/snakemake_executor_plugin_slurm/submit_string.py b/snakemake_executor_plugin_slurm/submit_string.py index 0e1498e..112eb46 100644 --- a/snakemake_executor_plugin_slurm/submit_string.py +++ b/snakemake_executor_plugin_slurm/submit_string.py @@ -42,8 +42,16 @@ def get_submit_command(job, params): # "- p '{partition_name}'" call += f" {params.partition}" - if job.resources.get("clusters"): - call += f" --clusters {safe_quote(job.resources.clusters)}" + # Add cluster specification if provided + # Check for cluster first (singular), then fall back to clusters (plural) + # for backwards compatibility + cluster_val = ( + job.resources.get("cluster") + or job.resources.get("clusters") + or job.resources.get("slurm_cluster") + ) + if cluster_val: + call += f" --cluster {safe_quote(cluster_val)}" if job.resources.get("runtime"): call += f" -t {safe_quote(job.resources.runtime)}" diff --git a/tests/test_partition_selection.py b/tests/test_partition_selection.py index e88c21d..c673bd8 100644 --- a/tests/test_partition_selection.py +++ b/tests/test_partition_selection.py @@ -591,8 +591,8 @@ def test_cluster_specification_via_slurm_cluster( # Verify cluster field is read correctly # Find partitions by name instead of assuming order partition_map = {p.name: p for p in partitions} - assert partition_map["normal-small"].cluster == "normal" - assert partition_map["deviating-small"].cluster == "deviating" + assert partition_map["normal-small"].partition_cluster == "normal" + assert partition_map["deviating-small"].partition_cluster == "deviating" # Job targeting 'normal' cluster job = mock_job(threads=16, mem_mb=32000, slurm_cluster="normal") @@ -664,10 +664,59 @@ def test_cluster_mismatch_excludes_partitions( finally: temp_path.unlink() + def test_job_with_cluster_does_not_select_partition_without_cluster( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """ + Test that jobs with a cluster requirement do not select partitions + without a cluster. + """ + config = dict(multicluster_partition_config) + config["partitions"]["no-cluster"] = { + "max_runtime": 360, + "max_threads": 32, + "max_mem_mb": 64000, + } + temp_path = temp_yaml_file(config) + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=16, mem_mb=32000, slurm_cluster="normal") + selected_partition = get_best_partition(partitions, job, mock_logger) + # Should select a partition with cluster 'normal', not 'no-cluster' + assert selected_partition in ["normal-small", "normal-large"] + finally: + temp_path.unlink() + + def test_job_without_cluster_can_select_partition_without_cluster( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """ + Test that jobs without cluster requirement can select partitions + without a cluster. + """ + config = dict(multicluster_partition_config) + config["partitions"]["no-cluster"] = { + "max_runtime": 360, + "max_threads": 32, + "max_mem_mb": 64000, + } + temp_path = temp_yaml_file(config) + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=16, mem_mb=32000) + selected_partition = get_best_partition(partitions, job, mock_logger) + # Should be able to select 'no-cluster' partition + assert selected_partition in ["normal-small", "normal-large", "no-cluster"] + finally: + temp_path.unlink() + def test_job_without_cluster_uses_any_partition( self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger ): - """Test that jobs without cluster specification can use any partition.""" + """ + Test that jobs without cluster specification can use any partition + without a cluster assignment. + """ temp_path = temp_yaml_file(multicluster_partition_config) try: @@ -677,9 +726,8 @@ def test_job_without_cluster_uses_any_partition( job = mock_job(threads=16, mem_mb=32000) selected_partition = get_best_partition(partitions, job, mock_logger) - # Should select a partition (any cluster is acceptable) - assert selected_partition is not None - assert mock_logger.info.call_count >= 1 + # Should return None since all partitions have a cluster assignment + assert selected_partition is None finally: temp_path.unlink()