Skip to content

Commit 90dfbef

Browse files
committed
use plaintext experiment-tracking for nextflow [ci skip]
1 parent 3411af5 commit 90dfbef

File tree

10 files changed

+1394
-0
lines changed

10 files changed

+1394
-0
lines changed
Lines changed: 381 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
#!/usr/bin/env nextflow
2+
3+
/*
4+
* Data Processing Pipeline with Nextflow
5+
*
6+
* This pipeline demonstrates a typical data science workflow:
7+
* 1. Data validation
8+
* 2. Data cleaning
9+
* 3. Feature engineering
10+
* 4. Data quality assessment
11+
* 5. Export processed data
12+
*/
13+
14+
// Pipeline parameters
15+
params.input = '../data/01_raw/*.csv'
16+
params.output_dir = '../data/02_processed'
17+
params.config = '../pipelines/configs/pipeline-config.yaml'
18+
params.help = false
19+
20+
// Show help message
21+
if (params.help) {
22+
log.info """
23+
Data Processing Pipeline
24+
========================
25+
26+
Usage:
27+
nextflow run data-pipeline.nf [options]
28+
29+
Options:
30+
--input <path> Input data files (default: ${params.input})
31+
--output_dir <path> Output directory (default: ${params.output_dir})
32+
--config <path> Pipeline configuration file (default: ${params.config})
33+
--help Show this help message
34+
35+
Example:
36+
nextflow run data-pipeline.nf --input 'data/*.csv' --output_dir 'processed_data'
37+
"""
38+
exit 0
39+
}
40+
41+
// Log pipeline parameters
42+
log.info """
43+
Data Processing Pipeline
44+
========================
45+
Input files: ${params.input}
46+
Output directory: ${params.output_dir}
47+
Configuration: ${params.config}
48+
"""
49+
50+
// Input channels
51+
input_files = Channel.fromPath(params.input)
52+
53+
/*
54+
* Process 1: Validate input data
55+
*/
56+
process validate_data {
57+
tag "Validating ${file.name}"
58+
59+
input:
60+
path file from input_files
61+
62+
output:
63+
path file into validated_files
64+
path "${file.baseName}_validation_report.json" into validation_reports
65+
66+
script:
67+
"""
68+
#!/usr/bin/env python3
69+
import pandas as pd
70+
import json
71+
from pathlib import Path
72+
73+
# Load data
74+
df = pd.read_csv('${file}')
75+
76+
# Basic validation
77+
validation_results = {
78+
'file': '${file.name}',
79+
'shape': df.shape,
80+
'columns': df.columns.tolist(),
81+
'missing_values': df.isnull().sum().to_dict(),
82+
'duplicates': df.duplicated().sum(),
83+
'data_types': df.dtypes.astype(str).to_dict()
84+
}
85+
86+
# Save validation report
87+
with open('${file.baseName}_validation_report.json', 'w') as f:
88+
json.dump(validation_results, f, indent=2)
89+
90+
print(f"Validation completed for {df.shape[0]} rows, {df.shape[1]} columns")
91+
"""
92+
}
93+
94+
/*
95+
* Process 2: Clean data
96+
*/
97+
process clean_data {
98+
tag "Cleaning ${file.name}"
99+
100+
input:
101+
path file from validated_files
102+
103+
output:
104+
path "${file.baseName}_cleaned.csv" into cleaned_files
105+
path "${file.baseName}_cleaning_report.json" into cleaning_reports
106+
107+
script:
108+
"""
109+
#!/usr/bin/env python3
110+
import pandas as pd
111+
import json
112+
113+
# Load data
114+
df = pd.read_csv('${file}')
115+
original_shape = df.shape
116+
117+
# Data cleaning steps
118+
# 1. Remove duplicates
119+
df_clean = df.drop_duplicates()
120+
121+
# 2. Handle missing values (drop rows with any missing values)
122+
df_clean = df_clean.dropna()
123+
124+
# 3. Reset index
125+
df_clean = df_clean.reset_index(drop=True)
126+
127+
final_shape = df_clean.shape
128+
129+
# Save cleaned data
130+
df_clean.to_csv('${file.baseName}_cleaned.csv', index=False)
131+
132+
# Generate cleaning report
133+
cleaning_report = {
134+
'file': '${file.name}',
135+
'original_shape': original_shape,
136+
'final_shape': final_shape,
137+
'rows_removed': original_shape[0] - final_shape[0],
138+
'cleaning_steps': [
139+
'remove_duplicates',
140+
'drop_missing_values',
141+
'reset_index'
142+
]
143+
}
144+
145+
with open('${file.baseName}_cleaning_report.json', 'w') as f:
146+
json.dump(cleaning_report, f, indent=2)
147+
148+
print(f"Cleaning completed: {original_shape} -> {final_shape}")
149+
"""
150+
}
151+
152+
/*
153+
* Process 3: Feature engineering
154+
*/
155+
process feature_engineering {
156+
tag "Feature engineering ${file.name}"
157+
158+
input:
159+
path file from cleaned_files
160+
161+
output:
162+
path "${file.baseName}_features.csv" into feature_files
163+
path "${file.baseName}_features_report.json" into feature_reports
164+
165+
script:
166+
"""
167+
#!/usr/bin/env python3
168+
import pandas as pd
169+
import numpy as np
170+
import json
171+
172+
# Load cleaned data
173+
df = pd.read_csv('${file}')
174+
original_columns = df.columns.tolist()
175+
176+
# Feature engineering examples
177+
# Note: These are generic examples - customize based on your data
178+
179+
# 1. Create interaction features for numeric columns
180+
numeric_cols = df.select_dtypes(include=[np.number]).columns
181+
if len(numeric_cols) >= 2:
182+
for i, col1 in enumerate(numeric_cols):
183+
for col2 in numeric_cols[i+1:]:
184+
df[f'{col1}_x_{col2}'] = df[col1] * df[col2]
185+
186+
# 2. Create polynomial features for numeric columns
187+
for col in numeric_cols:
188+
if df[col].std() > 0: # Avoid constant columns
189+
df[f'{col}_squared'] = df[col] ** 2
190+
191+
# 3. Create statistical features
192+
if len(numeric_cols) > 0:
193+
df['numeric_mean'] = df[numeric_cols].mean(axis=1)
194+
df['numeric_std'] = df[numeric_cols].std(axis=1)
195+
196+
final_columns = df.columns.tolist()
197+
new_features = [col for col in final_columns if col not in original_columns]
198+
199+
# Save feature-engineered data
200+
df.to_csv('${file.baseName}_features.csv', index=False)
201+
202+
# Generate feature engineering report
203+
feature_report = {
204+
'file': '${file.name}',
205+
'original_features': len(original_columns),
206+
'final_features': len(final_columns),
207+
'new_features': new_features,
208+
'feature_types': {
209+
'interaction': [f for f in new_features if '_x_' in f],
210+
'polynomial': [f for f in new_features if '_squared' in f],
211+
'statistical': [f for f in new_features if f in ['numeric_mean', 'numeric_std']]
212+
}
213+
}
214+
215+
with open('${file.baseName}_features_report.json', 'w') as f:
216+
json.dump(feature_report, f, indent=2)
217+
218+
print(f"Feature engineering completed: {len(original_columns)} -> {len(final_columns)} features")
219+
"""
220+
}
221+
222+
/*
223+
* Process 4: Data quality assessment
224+
*/
225+
process quality_assessment {
226+
tag "Quality assessment ${file.name}"
227+
publishDir params.output_dir, mode: 'copy'
228+
229+
input:
230+
path file from feature_files
231+
232+
output:
233+
path "${file.baseName}_final.csv" into final_files
234+
path "${file.baseName}_quality_report.json" into quality_reports
235+
236+
script:
237+
"""
238+
#!/usr/bin/env python3
239+
import pandas as pd
240+
import numpy as np
241+
import json
242+
from scipy import stats
243+
244+
# Load feature-engineered data
245+
df = pd.read_csv('${file}')
246+
247+
# Quality assessment
248+
quality_metrics = {
249+
'file': '${file.name}',
250+
'final_shape': df.shape,
251+
'data_quality': {
252+
'missing_values': df.isnull().sum().sum(),
253+
'duplicate_rows': df.duplicated().sum(),
254+
'constant_columns': (df.nunique() == 1).sum(),
255+
'high_cardinality_columns': (df.nunique() > df.shape[0] * 0.9).sum()
256+
},
257+
'numeric_summary': {},
258+
'categorical_summary': {}
259+
}
260+
261+
# Numeric column analysis
262+
numeric_cols = df.select_dtypes(include=[np.number]).columns
263+
if len(numeric_cols) > 0:
264+
quality_metrics['numeric_summary'] = {
265+
'count': len(numeric_cols),
266+
'columns': numeric_cols.tolist(),
267+
'distributions': {}
268+
}
269+
270+
for col in numeric_cols:
271+
if df[col].std() > 0:
272+
skewness = stats.skew(df[col].dropna())
273+
kurtosis = stats.kurtosis(df[col].dropna())
274+
quality_metrics['numeric_summary']['distributions'][col] = {
275+
'skewness': float(skewness),
276+
'kurtosis': float(kurtosis),
277+
'outliers_iqr': int(((df[col] < (df[col].quantile(0.25) - 1.5 * (df[col].quantile(0.75) - df[col].quantile(0.25)))) |
278+
(df[col] > (df[col].quantile(0.75) + 1.5 * (df[col].quantile(0.75) - df[col].quantile(0.25))))).sum())
279+
}
280+
281+
# Categorical column analysis
282+
categorical_cols = df.select_dtypes(include=['object']).columns
283+
if len(categorical_cols) > 0:
284+
quality_metrics['categorical_summary'] = {
285+
'count': len(categorical_cols),
286+
'columns': categorical_cols.tolist(),
287+
'cardinality': {col: df[col].nunique() for col in categorical_cols}
288+
}
289+
290+
# Save final processed data
291+
df.to_csv('${file.baseName}_final.csv', index=False)
292+
293+
# Save quality report
294+
with open('${file.baseName}_quality_report.json', 'w') as f:
295+
json.dump(quality_metrics, f, indent=2, default=str)
296+
297+
print(f"Quality assessment completed for {df.shape[0]} rows, {df.shape[1]} features")
298+
"""
299+
}
300+
301+
/*
302+
* Process 5: Generate summary report
303+
*/
304+
process generate_summary {
305+
tag "Generating summary report"
306+
publishDir params.output_dir, mode: 'copy'
307+
308+
input:
309+
path validation_reports from validation_reports.collect()
310+
path cleaning_reports from cleaning_reports.collect()
311+
path feature_reports from feature_reports.collect()
312+
path quality_reports from quality_reports.collect()
313+
314+
output:
315+
path "pipeline_summary_report.json"
316+
317+
script:
318+
"""
319+
#!/usr/bin/env python3
320+
import json
321+
import glob
322+
from datetime import datetime
323+
324+
# Collect all reports
325+
validation_files = glob.glob('*_validation_report.json')
326+
cleaning_files = glob.glob('*_cleaning_report.json')
327+
feature_files = glob.glob('*_features_report.json')
328+
quality_files = glob.glob('*_quality_report.json')
329+
330+
# Load and aggregate reports
331+
summary = {
332+
'pipeline_execution': {
333+
'timestamp': datetime.now().isoformat(),
334+
'files_processed': len(validation_files)
335+
},
336+
'validation_summary': [],
337+
'cleaning_summary': [],
338+
'feature_summary': [],
339+
'quality_summary': []
340+
}
341+
342+
# Aggregate validation reports
343+
for file in validation_files:
344+
with open(file, 'r') as f:
345+
summary['validation_summary'].append(json.load(f))
346+
347+
# Aggregate cleaning reports
348+
for file in cleaning_files:
349+
with open(file, 'r') as f:
350+
summary['cleaning_summary'].append(json.load(f))
351+
352+
# Aggregate feature reports
353+
for file in feature_files:
354+
with open(file, 'r') as f:
355+
summary['feature_summary'].append(json.load(f))
356+
357+
# Aggregate quality reports
358+
for file in quality_files:
359+
with open(file, 'r') as f:
360+
summary['quality_summary'].append(json.load(f))
361+
362+
# Save summary report
363+
with open('pipeline_summary_report.json', 'w') as f:
364+
json.dump(summary, f, indent=2, default=str)
365+
366+
print("Pipeline summary report generated successfully")
367+
"""
368+
}
369+
370+
/*
371+
* Workflow completion
372+
*/
373+
workflow.onComplete {
374+
log.info """
375+
Pipeline execution completed!
376+
=================================
377+
Success: ${workflow.success}
378+
Duration: ${workflow.duration}
379+
Output directory: ${params.output_dir}
380+
"""
381+
}

0 commit comments

Comments
 (0)