22import io
33
44import dateutil .parser
5+ import psycopg2
56
67from mapswipe_workers import auth
78from mapswipe_workers .definitions import logger , sentry
89from mapswipe_workers .firebase_to_postgres import update_data
910
1011
1112def transfer_results (project_id_list = None ):
13+ """Transfer results for one project after the other.
14+ Will only trigger the transfer of results for projects
15+ that are defined in the postgres database.
16+ Will not transfer results for tutorials and
17+ for projects which are not set up in postgres.
1218 """
13- Download results from firebase,
14- saves them to postgres and then deletes the results in firebase.
15- This is implemented as a transactional operation as described in
16- the Firebase docs to avoid missing new generated results in
17- Firebase during execution of this function.
18- """
19-
20- # Firebase transaction function
21- def transfer (current_results ):
22- if current_results is None :
23- logger .info (f"{ project_id } : No results in Firebase" )
24- return dict ()
25- else :
26- results_user_id_list = get_user_ids_from_results (current_results )
27- update_data .update_user_data (results_user_id_list )
28- results_file = results_to_file (current_results , project_id )
29- save_results_to_postgres (results_file )
30- return dict ()
31-
32- fb_db = auth .firebaseDB ()
33-
34- if not project_id_list :
19+ if project_id_list is None :
3520 # get project_ids from existing results if no project ids specified
21+ fb_db = auth .firebaseDB ()
3622 project_id_list = fb_db .reference ("v2/results/" ).get (shallow = True )
37- if not project_id_list :
23+ if project_id_list is None :
3824 project_id_list = []
3925 logger .info ("There are no results to transfer." )
4026
41- # get all project ids from postgres,
42- # we will only transfer results for projects we have there
27+ # Get all project ids from postgres.
28+ # We will only transfer results for projects we in postgres.
4329 postgres_project_ids = get_projects_from_postgres ()
4430
31+ project_id_list_transfered = []
4532 for project_id in project_id_list :
4633 if project_id not in postgres_project_ids :
4734 logger .info (
@@ -55,24 +42,101 @@ def transfer(current_results):
5542 f"We will not transfer these"
5643 )
5744 continue
45+ else :
46+ logger .info (f"{ project_id } : Start transfer results" )
47+ fb_db = auth .firebaseDB ()
48+ results_ref = fb_db .reference (f"v2/results/{ project_id } " )
49+ results = results_ref .get ()
50+ del fb_db
51+ transfer_results_for_project (project_id , results )
52+ project_id_list_transfered .append (project_id )
53+
54+ return project_id_list_transfered
55+
56+
57+ def transfer_results_for_project (project_id , results ):
58+ """Transfer the results for a specific project.
59+ Save results into an in-memory file.
60+ Copy the results to postgres.
61+ Delete results in firebase.
62+ We are NOT using a Firebase transaction functions here anymore.
63+ This has caused problems, in situations where a lot of mappers are
64+ uploading results to Firebase at the same time. Basically, this is
65+ due to the behaviour of Firebase Transaction function:
66+ "If another client writes to this location
67+ before the new value is successfully saved,
68+ the update function is called again with the new current value,
69+ and the write will be retried."
70+ (source: https://firebase.google.com/docs/reference/admin/python/firebase_admin.db#firebase_admin.db.Reference.transaction) # noqa
71+ Using Firebase transaction on the group level
72+ has turned out to be too slow when using "normal" queries,
73+ e.g. without using threading. Threading should be avoided here
74+ as well to not run into unforeseen errors.
75+ For more details see issue #478.
76+ """
5877
59- logger .info (f"{ project_id } : Start transfering results" )
60-
61- results_ref = fb_db .reference (f"v2/results/{ project_id } " )
78+ if results is None :
79+ logger .info (f"{ project_id } : No results in Firebase" )
80+ else :
81+ # First we check for new users in Firebase.
82+ # The user_id is used as a key in the postgres database for the results
83+ # and thus users need to be inserted before results get inserted.
84+ results_user_id_list = get_user_ids_from_results (results )
85+ update_data .update_user_data (results_user_id_list )
86+
87+ try :
88+ # Results are dumped into an in-memory file.
89+ # This allows us to use the COPY statement to insert many
90+ # results at relatively high speed.
91+ results_file = results_to_file (results , project_id )
6292 truncate_temp_results ()
93+ save_results_to_postgres (results_file )
94+ except psycopg2 .errors .ForeignKeyViolation as e :
95+ sentry .capture_exception (e )
96+ sentry .capture_message (
97+ "could not transfer results to postgres due to ForeignKeyViolation: "
98+ f"{ project_id } "
99+ )
100+ logger .exception (e )
101+ logger .warning (
102+ "could not transfer results to postgres due to ForeignKeyViolation: "
103+ f"{ project_id } "
104+ )
105+ except Exception as e :
106+ sentry .capture_exception (e )
107+ sentry .capture_message (f"could not transfer results to postgres: { project_id } " )
108+ logger .exception (e )
109+ logger .warning (f"could not transfer results to postgres: { project_id } " )
110+ else :
111+ # It is important here that we first insert results into postgres
112+ # and then delete these results from Firebase.
113+ # In case something goes wrong during the insert, results in Firebase
114+ # will not get deleted.
115+ delete_results_from_firebase (project_id , results )
116+ logger .info (f"{ project_id } : Transferred results to postgres" )
117+
118+
119+ def delete_results_from_firebase (project_id , results ):
120+ """Delete results from Firebase using update function.
121+ We use the update method of firebase instead of delete.
122+ Update allows to delete items at multiple locations at the same time
123+ and is much faster.
124+ """
63125
64- try :
65- results_ref .transaction (transfer )
66- logger .info (f"{ project_id } : Transfered results to postgres" )
67- except fb_db .TransactionAbortedError :
68- logger .exception (
69- f"{ project_id } : Firebase transaction for "
70- f"transfering results failed to commit"
71- )
72- sentry .capture_exception ()
126+ fb_db = auth .firebaseDB ()
73127
74- del fb_db
75- return project_id_list
128+ # we will use a multi-location update to delete the entries
129+ # therefore we create a dict with the items we want to delete
130+ data = {}
131+ for group_id , users in results .items ():
132+ for user_id , result in users .items ():
133+ key = f"{ group_id } /{ user_id } "
134+ data [key ] = None
135+
136+ results_ref = fb_db .reference (f"v2/results/{ project_id } /" )
137+ results_ref .update (data )
138+
139+ logger .info (f"removed results for project { project_id } " )
76140
77141
78142def results_to_file (results , projectId ):
@@ -200,7 +264,6 @@ def save_results_to_postgres(results_file):
200264 Saves results to a temporary table in postgres
201265 using the COPY Statement of Postgres
202266 for a more efficient import into the database.
203-
204267 Parameters
205268 ----------
206269 results_file: io.StringIO
@@ -228,6 +291,7 @@ def save_results_to_postgres(results_file):
228291 """
229292 p_con .query (query_insert_results )
230293 del p_con
294+ logger .info ("copied results into postgres." )
231295
232296
233297def truncate_temp_results ():
0 commit comments