|
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +Academic Data Processing Pipeline with Snakemake |
| 4 | +================================================= |
| 5 | +
|
| 6 | +This pipeline demonstrates a comprehensive academic data processing workflow |
| 7 | +with built-in resume capabilities and experiment tracking. |
| 8 | +
|
| 9 | +Features: |
| 10 | +- Automatic experiment tracking |
| 11 | +- Resume-friendly design with checkpoints |
| 12 | +- Comprehensive logging and reporting |
| 13 | +- Modular rule design |
| 14 | +- Resource-aware execution |
| 15 | +- Quality control at each step |
| 16 | +
|
| 17 | +Author: Research Team |
| 18 | +Version: 1.0.0 |
| 19 | +""" |
| 20 | + |
| 21 | +import pandas as pd |
| 22 | +from pathlib import Path |
| 23 | + |
| 24 | +# Configuration loading |
| 25 | +configfile: "config.yaml" |
| 26 | + |
| 27 | +# Load sample information |
| 28 | +samples_df = pd.read_csv(config["samples"], sep="\t") |
| 29 | +SAMPLES = samples_df["sample_id"].tolist() |
| 30 | + |
| 31 | +# Global variables from config |
| 32 | +EXPERIMENT_ID = config.get("experiment_id", "unknown") |
| 33 | +OUTPUT_DIR = config.get("output_dir", "results") |
| 34 | +LOG_DIR = config.get("log_dir", "logs") |
| 35 | + |
| 36 | +# Ensure output directories exist |
| 37 | +Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True) |
| 38 | +Path(LOG_DIR).mkdir(parents=True, exist_ok=True) |
| 39 | + |
| 40 | +# Define final outputs |
| 41 | +rule all: |
| 42 | + input: |
| 43 | + # Quality control reports |
| 44 | + expand(f"{OUTPUT_DIR}/qc/{{sample}}_fastqc.html", sample=SAMPLES), |
| 45 | + |
| 46 | + # Processed data |
| 47 | + expand(f"{OUTPUT_DIR}/processed/{{sample}}_processed.txt", sample=SAMPLES), |
| 48 | + |
| 49 | + # Summary reports |
| 50 | + f"{OUTPUT_DIR}/reports/experiment_summary.html", |
| 51 | + f"{OUTPUT_DIR}/reports/quality_summary.txt", |
| 52 | + |
| 53 | + # Final consolidated output |
| 54 | + f"{OUTPUT_DIR}/final/consolidated_results.csv" |
| 55 | + |
| 56 | +# Rule: Quality control analysis |
| 57 | +rule quality_control: |
| 58 | + """Perform quality control analysis on input data""" |
| 59 | + input: |
| 60 | + sample = lambda wildcards: samples_df[samples_df["sample_id"] == wildcards.sample]["fastq_1"].iloc[0] |
| 61 | + output: |
| 62 | + html = f"{OUTPUT_DIR}/qc/{{sample}}_fastqc.html", |
| 63 | + zip = f"{OUTPUT_DIR}/qc/{{sample}}_fastqc.zip" |
| 64 | + params: |
| 65 | + outdir = f"{OUTPUT_DIR}/qc" |
| 66 | + log: |
| 67 | + f"{LOG_DIR}/qc/{{sample}}_fastqc.log" |
| 68 | + threads: 2 |
| 69 | + resources: |
| 70 | + mem_mb = 4000, |
| 71 | + time_min = 30 |
| 72 | + conda: |
| 73 | + "envs/qc.yaml" |
| 74 | + shell: |
| 75 | + """ |
| 76 | + echo "Starting quality control for {wildcards.sample}" > {log} |
| 77 | + |
| 78 | + # Simulate FastQC analysis |
| 79 | + mkdir -p {params.outdir} |
| 80 | + |
| 81 | + # Mock quality analysis (replace with actual FastQC) |
| 82 | + echo "Quality control analysis for {wildcards.sample}" > {output.html} |
| 83 | + echo "Sample: {wildcards.sample}" >> {output.html} |
| 84 | + echo "Input: {input.sample}" >> {output.html} |
| 85 | + echo "Analysis completed: $(date)" >> {output.html} |
| 86 | + |
| 87 | + # Create zip output |
| 88 | + echo "FastQC results" > {output.zip} |
| 89 | + |
| 90 | + echo "Quality control completed for {wildcards.sample}" >> {log} |
| 91 | + """ |
| 92 | + |
| 93 | +# Rule: Data preprocessing |
| 94 | +rule preprocess_data: |
| 95 | + """Preprocess raw data with quality filtering""" |
| 96 | + input: |
| 97 | + fastq = lambda wildcards: samples_df[samples_df["sample_id"] == wildcards.sample]["fastq_1"].iloc[0], |
| 98 | + qc_html = f"{OUTPUT_DIR}/qc/{{sample}}_fastqc.html" |
| 99 | + output: |
| 100 | + processed = f"{OUTPUT_DIR}/processed/{{sample}}_processed.txt", |
| 101 | + stats = f"{OUTPUT_DIR}/processed/{{sample}}_stats.json" |
| 102 | + params: |
| 103 | + quality_threshold = config.get("quality_threshold", 30), |
| 104 | + min_length = config.get("min_length", 50) |
| 105 | + log: |
| 106 | + f"{LOG_DIR}/preprocess/{{sample}}_preprocess.log" |
| 107 | + threads: 4 |
| 108 | + resources: |
| 109 | + mem_mb = 8000, |
| 110 | + time_min = 60 |
| 111 | + conda: |
| 112 | + "envs/preprocessing.yaml" |
| 113 | + shell: |
| 114 | + """ |
| 115 | + echo "Starting preprocessing for {wildcards.sample}" > {log} |
| 116 | + echo "Quality threshold: {params.quality_threshold}" >> {log} |
| 117 | + echo "Minimum length: {params.min_length}" >> {log} |
| 118 | + |
| 119 | + # Mock preprocessing (replace with actual tools) |
| 120 | + echo "Processed data for {wildcards.sample}" > {output.processed} |
| 121 | + echo "Quality filtering applied with threshold {params.quality_threshold}" >> {output.processed} |
| 122 | + echo "Minimum length filter: {params.min_length}" >> {output.processed} |
| 123 | + echo "Processing completed: $(date)" >> {output.processed} |
| 124 | + |
| 125 | + # Generate processing statistics |
| 126 | + cat > {output.stats} << EOF |
| 127 | +{{ |
| 128 | + "sample_id": "{wildcards.sample}", |
| 129 | + "input_reads": 1000000, |
| 130 | + "processed_reads": 950000, |
| 131 | + "quality_threshold": {params.quality_threshold}, |
| 132 | + "min_length": {params.min_length}, |
| 133 | + "processing_date": "$(date -I)" |
| 134 | +}} |
| 135 | +EOF |
| 136 | + |
| 137 | + echo "Preprocessing completed for {wildcards.sample}" >> {log} |
| 138 | + """ |
| 139 | + |
| 140 | +# Checkpoint: Intermediate results validation |
| 141 | +checkpoint validate_processing: |
| 142 | + """Validate that all processing steps completed successfully""" |
| 143 | + input: |
| 144 | + processed = expand(f"{OUTPUT_DIR}/processed/{{sample}}_processed.txt", sample=SAMPLES), |
| 145 | + stats = expand(f"{OUTPUT_DIR}/processed/{{sample}}_stats.json", sample=SAMPLES) |
| 146 | + output: |
| 147 | + validation = f"{OUTPUT_DIR}/checkpoints/processing_validation.txt" |
| 148 | + log: |
| 149 | + f"{LOG_DIR}/validation/processing_validation.log" |
| 150 | + resources: |
| 151 | + mem_mb = 2000, |
| 152 | + time_min = 10 |
| 153 | + shell: |
| 154 | + """ |
| 155 | + echo "Validating processing results" > {log} |
| 156 | + |
| 157 | + # Create checkpoint directory |
| 158 | + mkdir -p $(dirname {output.validation}) |
| 159 | + |
| 160 | + # Validate all files exist and are non-empty |
| 161 | + all_valid=true |
| 162 | + for file in {input.processed} {input.stats}; do |
| 163 | + if [ ! -s "$file" ]; then |
| 164 | + echo "ERROR: $file is missing or empty" >> {log} |
| 165 | + all_valid=false |
| 166 | + else |
| 167 | + echo "VALID: $file" >> {log} |
| 168 | + fi |
| 169 | + done |
| 170 | + |
| 171 | + if [ "$all_valid" = true ]; then |
| 172 | + echo "All processing validation checks passed" > {output.validation} |
| 173 | + echo "Validation completed: $(date)" >> {output.validation} |
| 174 | + else |
| 175 | + echo "Validation failed - see log for details" >&2 |
| 176 | + exit 1 |
| 177 | + fi |
| 178 | + |
| 179 | + echo "Processing validation completed" >> {log} |
| 180 | + """ |
| 181 | + |
| 182 | +# Rule: Generate quality summary |
| 183 | +rule quality_summary: |
| 184 | + """Generate comprehensive quality summary across all samples""" |
| 185 | + input: |
| 186 | + validation = f"{OUTPUT_DIR}/checkpoints/processing_validation.txt", |
| 187 | + stats = expand(f"{OUTPUT_DIR}/processed/{{sample}}_stats.json", sample=SAMPLES) |
| 188 | + output: |
| 189 | + summary = f"{OUTPUT_DIR}/reports/quality_summary.txt" |
| 190 | + log: |
| 191 | + f"{LOG_DIR}/reports/quality_summary.log" |
| 192 | + resources: |
| 193 | + mem_mb = 4000, |
| 194 | + time_min = 15 |
| 195 | + conda: |
| 196 | + "envs/analysis.yaml" |
| 197 | + script: |
| 198 | + "scripts/generate_quality_summary.py" |
| 199 | + |
| 200 | +# Rule: Advanced analysis |
| 201 | +rule advanced_analysis: |
| 202 | + """Perform advanced analysis on processed data""" |
| 203 | + input: |
| 204 | + processed = expand(f"{OUTPUT_DIR}/processed/{{sample}}_processed.txt", sample=SAMPLES), |
| 205 | + validation = f"{OUTPUT_DIR}/checkpoints/processing_validation.txt" |
| 206 | + output: |
| 207 | + analysis = f"{OUTPUT_DIR}/analysis/advanced_results.txt", |
| 208 | + plots = f"{OUTPUT_DIR}/analysis/analysis_plots.png" |
| 209 | + params: |
| 210 | + analysis_type = config.get("analysis_type", "standard") |
| 211 | + log: |
| 212 | + f"{LOG_DIR}/analysis/advanced_analysis.log" |
| 213 | + threads: 8 |
| 214 | + resources: |
| 215 | + mem_mb = 16000, |
| 216 | + time_min = 120 |
| 217 | + conda: |
| 218 | + "envs/analysis.yaml" |
| 219 | + shell: |
| 220 | + """ |
| 221 | + echo "Starting advanced analysis" > {log} |
| 222 | + echo "Analysis type: {params.analysis_type}" >> {log} |
| 223 | + |
| 224 | + # Create analysis directory |
| 225 | + mkdir -p $(dirname {output.analysis}) |
| 226 | + |
| 227 | + # Mock advanced analysis |
| 228 | + echo "Advanced Analysis Results" > {output.analysis} |
| 229 | + echo "Experiment ID: {EXPERIMENT_ID}" >> {output.analysis} |
| 230 | + echo "Analysis type: {params.analysis_type}" >> {output.analysis} |
| 231 | + echo "Samples analyzed: {SAMPLES}" >> {output.analysis} |
| 232 | + echo "Analysis completed: $(date)" >> {output.analysis} |
| 233 | + |
| 234 | + # Generate mock plots |
| 235 | + echo "Analysis plots generated" > {output.plots} |
| 236 | + |
| 237 | + echo "Advanced analysis completed" >> {log} |
| 238 | + """ |
| 239 | + |
| 240 | +# Rule: Consolidate results |
| 241 | +rule consolidate_results: |
| 242 | + """Consolidate all analysis results into final output""" |
| 243 | + input: |
| 244 | + analysis = f"{OUTPUT_DIR}/analysis/advanced_results.txt", |
| 245 | + quality_summary = f"{OUTPUT_DIR}/reports/quality_summary.txt", |
| 246 | + stats = expand(f"{OUTPUT_DIR}/processed/{{sample}}_stats.json", sample=SAMPLES) |
| 247 | + output: |
| 248 | + consolidated = f"{OUTPUT_DIR}/final/consolidated_results.csv" |
| 249 | + log: |
| 250 | + f"{LOG_DIR}/final/consolidation.log" |
| 251 | + resources: |
| 252 | + mem_mb = 4000, |
| 253 | + time_min = 30 |
| 254 | + conda: |
| 255 | + "envs/analysis.yaml" |
| 256 | + script: |
| 257 | + "scripts/consolidate_results.py" |
| 258 | + |
| 259 | +# Rule: Generate experiment report |
| 260 | +rule experiment_report: |
| 261 | + """Generate comprehensive experiment report""" |
| 262 | + input: |
| 263 | + consolidated = f"{OUTPUT_DIR}/final/consolidated_results.csv", |
| 264 | + qc_files = expand(f"{OUTPUT_DIR}/qc/{{sample}}_fastqc.html", sample=SAMPLES), |
| 265 | + analysis = f"{OUTPUT_DIR}/analysis/advanced_results.txt" |
| 266 | + output: |
| 267 | + report = f"{OUTPUT_DIR}/reports/experiment_summary.html" |
| 268 | + params: |
| 269 | + experiment_id = EXPERIMENT_ID, |
| 270 | + researcher = config.get("researcher", "Unknown"), |
| 271 | + project = config.get("project", "Research Project") |
| 272 | + log: |
| 273 | + f"{LOG_DIR}/reports/experiment_report.log" |
| 274 | + resources: |
| 275 | + mem_mb = 2000, |
| 276 | + time_min = 20 |
| 277 | + conda: |
| 278 | + "envs/reporting.yaml" |
| 279 | + script: |
| 280 | + "scripts/generate_experiment_report.py" |
| 281 | + |
| 282 | +# Rule: Cleanup temporary files |
| 283 | +rule cleanup: |
| 284 | + """Clean up temporary files while preserving important results""" |
| 285 | + input: |
| 286 | + report = f"{OUTPUT_DIR}/reports/experiment_summary.html", |
| 287 | + consolidated = f"{OUTPUT_DIR}/final/consolidated_results.csv" |
| 288 | + output: |
| 289 | + cleanup_log = f"{OUTPUT_DIR}/cleanup.log" |
| 290 | + shell: |
| 291 | + """ |
| 292 | + echo "Cleaning up temporary files" > {output.cleanup_log} |
| 293 | + echo "Cleanup completed: $(date)" >> {output.cleanup_log} |
| 294 | + |
| 295 | + # Add actual cleanup commands here if needed |
| 296 | + # find {OUTPUT_DIR}/temp -name "*.tmp" -delete |
| 297 | + """ |
| 298 | + |
| 299 | +# Error handling and resume support |
| 300 | +onerror: |
| 301 | + print(f"Pipeline failed for experiment {EXPERIMENT_ID}") |
| 302 | + print("Check logs in {LOG_DIR} for details") |
| 303 | + print("Resume with: snakemake --rerun-incomplete") |
| 304 | + |
| 305 | +onsuccess: |
| 306 | + print(f"Pipeline completed successfully for experiment {EXPERIMENT_ID}") |
| 307 | + print(f"Results available in: {OUTPUT_DIR}") |
| 308 | + print(f"Report: {OUTPUT_DIR}/reports/experiment_summary.html") |
| 309 | + |
| 310 | +# Resource allocation functions |
| 311 | +def get_mem_mb(wildcards, attempt): |
| 312 | + """Get memory allocation with increasing retry attempts""" |
| 313 | + base_mem = 4000 |
| 314 | + return base_mem * (2 ** (attempt - 1)) |
| 315 | + |
| 316 | +def get_time_min(wildcards, attempt): |
| 317 | + """Get time allocation with increasing retry attempts""" |
| 318 | + base_time = 60 |
| 319 | + return min(base_time * (2 ** (attempt - 1)), 1440) # Max 24 hours |
| 320 | + |
| 321 | +# Resume-friendly intermediate file handling |
| 322 | +temp_files = [ |
| 323 | + f"{OUTPUT_DIR}/temp/{{sample}}_intermediate.txt" |
| 324 | +] |
| 325 | + |
| 326 | +# Benchmark tracking |
| 327 | +benchmark_files = [ |
| 328 | + f"{OUTPUT_DIR}/benchmarks/{{rule}}_{{sample}}.txt" |
| 329 | +] |
0 commit comments