11import datetime
22import gzip
3+ import json
34import os
45import tempfile
56from typing import List
67
78import pandas as pd
9+ from pandas .api .types import is_numeric_dtype
810from psycopg2 import sql
911
1012from mapswipe_workers import auth
@@ -28,6 +30,31 @@ def add_metadata_to_csv(filename: str):
2830 logger .info (f"added metadata to { filename } ." )
2931
3032
33+ def normalize_project_type_specifics (path ):
34+ """Explode nested json column project_type_specifics and drop empty columns."""
35+ df = pd .read_csv (path )
36+
37+ if "project_type_specifics" in df .columns .tolist () and not is_numeric_dtype (
38+ df ["project_type_specifics" ]
39+ ):
40+ # convert json string to json dict
41+ df ["project_type_specifics" ] = df ["project_type_specifics" ].map (json .loads )
42+
43+ normalized = pd .json_normalize (df ["project_type_specifics" ])
44+ normalized .index = df .index
45+ df = pd .concat ([df , normalized ], axis = 1 ).drop (
46+ columns = ["project_type_specifics" ]
47+ )
48+ for column in list (normalized .columns ):
49+ if "properties" in column :
50+ df .rename (
51+ columns = {column : column .replace ("properties." , "" )}, inplace = True
52+ )
53+
54+ df .dropna (inplace = True , axis = 0 )
55+ df .to_csv (path )
56+
57+
3158def write_sql_to_gzipped_csv (filename : str , sql_query : sql .SQL ):
3259 """
3360 Use the copy statement to write data from postgres to a csv file.
@@ -39,6 +66,8 @@ def write_sql_to_gzipped_csv(filename: str, sql_query: sql.SQL):
3966 with open (tmp_csv_file , "w" ) as f :
4067 pg_db .copy_expert (sql_query , f )
4168
69+ normalize_project_type_specifics (tmp_csv_file )
70+
4271 with open (tmp_csv_file , "rb" ) as f_in , gzip .open (filename , "wb" ) as f_out :
4372 f_out .writelines (f_in )
4473
@@ -119,7 +148,8 @@ def get_tasks(filename: str, project_id: str) -> pd.DataFrame:
119148 sql_query = sql .SQL (
120149 """
121150 COPY (
122- SELECT project_id, group_id, task_id, ST_AsText(geom) as geom
151+ SELECT project_id, group_id, task_id, ST_AsText(geom) as geom,
152+ project_type_specifics
123153 FROM tasks
124154 WHERE project_id = {}
125155 ) TO STDOUT WITH CSV HEADER
@@ -303,8 +333,9 @@ def get_agg_results_by_task_id(
303333 )
304334
305335 # add task geometry using left join
336+ tasks_df .drop (columns = ["project_id" , "group_id" ], inplace = True )
306337 agg_results_df = results_by_task_id_df .merge (
307- tasks_df [[ "geom" , "task_id" ]], left_on = "task_id" , right_on = "task_id"
338+ tasks_df , left_on = "task_id" , right_on = "task_id" ,
308339 )
309340 logger .info ("added geometry to aggregated results" )
310341
0 commit comments