From d988053f41ce4bc24be6c8b5996e53c0b0cfa930 Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Wed, 5 Nov 2025 21:35:08 +0100 Subject: [PATCH 01/25] add structures to get the obspgm and station fromtimes --- ingestion/src/cron.rs | 19 ++++- ingestion/src/util/stinfosys.rs | 99 +++++++++++++++++++++++-- ingestion/src/util/tsupdate.rs | 21 +++++- integration_tests/tests/common/mocks.rs | 16 +++- integration_tests/tests/end_to_end.rs | 16 +++- 5 files changed, 154 insertions(+), 17 deletions(-) diff --git a/ingestion/src/cron.rs b/ingestion/src/cron.rs index f3319886..8a810f9d 100644 --- a/ingestion/src/cron.rs +++ b/ingestion/src/cron.rs @@ -39,11 +39,24 @@ pub async fn refresh_deactivated((stinfosys, pools): &(Stinfosys, DbPools)) { let mut open_conn = pools.open.get().await.unwrap(); let mut restricted_conn = pools.restricted.get().await.unwrap(); - let (station_totime, obs_pgm_totime) = stinfosys.cache_deactivated_stinfosys().await.unwrap(); + let (station_totime, station_fromtime, obs_pgm_totime, obs_pgm_fromtime) = + stinfosys.cache_deactivated_stinfosys().await.unwrap(); let (open_res, restricted_res) = tokio::join!( - tsupdate::set_deactivated(&mut open_conn, &obs_pgm_totime, &station_totime), - tsupdate::set_deactivated(&mut restricted_conn, &obs_pgm_totime, &station_totime), + tsupdate::set_deactivated( + &mut open_conn, + &obs_pgm_totime, + &obs_pgm_fromtime, + &station_totime, + &station_fromtime + ), + tsupdate::set_deactivated( + &mut restricted_conn, + &obs_pgm_totime, + &obs_pgm_fromtime, + &station_totime, + &station_fromtime + ), ); if let Err(err) = open_res { diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index ba4bc044..0f667e39 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -21,7 +21,9 @@ pub struct Stinfosys { } type StationTotimeMap = HashMap>; +type StationFromtimeMap = HashMap>; type ObsPgmTotimeMap = HashMap>; +type ObsPgmFromtimeMap = HashMap>; impl Stinfosys { pub fn new(conn_string: String, levels: LevelTable) -> Self { @@ -36,6 +38,8 @@ impl Stinfosys { ) -> Result< ( HashMap>, + HashMap>, + HashMap>, HashMap>, ), Error, @@ -49,37 +53,61 @@ impl Stinfosys { }); // Fetch all deactivated timeseries in Stinfosys - let (station_totime, obs_pgm_totime) = tokio::try_join!( + let (station_totime, station_fromtime, obs_pgm_totime, obs_pgm_fromtime) = tokio::try_join!( fetch_station_totime(&client), + fetch_station_fromtime(&client), fetch_obs_pgm_totime(self.levels.clone(), &client), + fetch_obs_pgm_fromtime(self.levels.clone(), &client), )?; - Ok((station_totime, obs_pgm_totime)) + Ok(( + station_totime, + station_fromtime, + obs_pgm_totime, + obs_pgm_fromtime, + )) } } pub async fn fetch_deactivated( obs_pgm_totime: &HashMap>, + obs_pgm_fromtime: &HashMap>, station_totime: &HashMap>, + station_fromtime: &HashMap>, labels: Vec, ) -> Result, Error> { let mut futures = labels .iter() .map(async |label| -> Result<_, Error> { - // Prefer obs_pgm if available + // TODO: Figure out how best to set these when obs_pgm makes no sense ... + // particularly when it does not overlap at all with the time range + // |------obs_pgm------| + // |--station--| + // but also if the from or to from obs_pgm actually open beyond when the + // station had data aka: + // |------obs_pgm------| + // |--station--| let totime = obs_pgm_totime .get(&label.key) .or(station_totime.get(&label.key.station_id)) .copied(); + let fromtime = obs_pgm_fromtime + .get(&label.key) + .or(station_fromtime.get(&label.key.station_id)) + .copied(); - Ok((label.id, totime)) + Ok((label.id, fromtime, totime)) }) .collect::>(); let mut deactivated = vec![]; while let Some(res) = futures.next().await { let ts = match res? { - (tsid, Some(totime)) => DeactivatedTimeseries { tsid, totime }, + (tsid, Some(fromtime), Some(totime)) => DeactivatedTimeseries { + tsid, + fromtime, + totime, + }, // Skip if a valid totime was not found in stinfosys _ => continue, }; @@ -133,6 +161,46 @@ async fn fetch_obs_pgm_totime(levels: LevelTable, conn: &Client) -> Result Result { + const OBS_PGM_QUERY: &str = "\ + SELECT \ + stationid, \ + paramid, \ + hlevel, \ + nsensor, \ + priority_messageid, \ + (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] \ + FROM obs_pgm \ + GROUP BY stationid, paramid, hlevel, nsensor, priority_messageid \ + HAVING (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] IS NOT NULL"; + + let rows = conn.query(OBS_PGM_QUERY, &[]).await?; + + let mut map = ObsPgmFromtimeMap::new(); + for row in rows { + let param_id: i32 = row.get(1); + + let level = row.get(2); + let level = param_get_level(levels.clone(), param_id, level)?; + + let key = MetTimeseriesKey { + station_id: row.get(0), + param_id, + level, + sensor: row.get(3), + type_id: row.get(4), + }; + + let totime: NaiveDateTime = row.get(5); + map.insert(key, totime.and_utc()); + } + + Ok(map) +} + async fn fetch_station_totime(conn: &Client) -> Result { // The funny looking ARRAY_AGG is needed because each station can have multiple from/to times. // For example, the timeseries might have been "reset" after a change of the station position, @@ -159,3 +227,24 @@ async fn fetch_station_totime(conn: &Client) -> Result }) .collect()) } + +async fn fetch_station_fromtime(conn: &Client) -> Result { + const STATION_QUERY: &str = "\ + SELECT \ + stationid, \ + (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] \ + FROM station \ + GROUP BY stationid \ + HAVING (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] IS NOT NULL"; + + let rows = conn.query(STATION_QUERY, &[]).await?; + + Ok(rows + .iter() + .map(|row| { + let totime: NaiveDateTime = row.get(1); + + (row.get(0), totime.and_utc()) + }) + .collect()) +} diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index faa4f6e3..abae4128 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -25,12 +25,15 @@ const OPEN_TIMESERIES_QUERY: &str = "\ const UPDATE_QUERY: &str = "\ UPDATE public.timeseries SET \ totime = $1, \ + fromtime = $2, \ deactivated = true \ - WHERE id = $2"; + WHERE id = $3"; pub struct DeactivatedTimeseries { /// Timeseries to be updated pub tsid: i64, + /// Fromtime value found in the metadata source + pub fromtime: DateTime, /// Totime value found in the metadata source pub totime: DateTime, } @@ -38,7 +41,9 @@ pub struct DeactivatedTimeseries { pub async fn set_deactivated( conn: &mut PooledPgConn<'_>, obs_pgm_totime: &HashMap>, + obs_pgm_fromtime: &HashMap>, station_totime: &HashMap>, + station_fromtime: &HashMap>, ) -> Result<(), Error> { let tx = conn.transaction().await?; @@ -65,10 +70,20 @@ pub async fn set_deactivated( }) .collect(); - let deactivated = fetch_deactivated(obs_pgm_totime, station_totime, labels).await?; + let deactivated = fetch_deactivated( + obs_pgm_totime, + obs_pgm_fromtime, + station_totime, + station_fromtime, + labels, + ) + .await?; future::join_all(deactivated.into_iter().map(async |ts| { - match tx.execute(UPDATE_QUERY, &[&ts.totime, &ts.tsid]).await { + match tx + .execute(UPDATE_QUERY, &[&ts.totime, &ts.fromtime, &ts.tsid]) + .await + { Ok(_) => (), //info!("Tsid {} updated", ts.tsid), Err(err) => error!("Could not update tsid {}: {}", ts.tsid, err), } diff --git a/integration_tests/tests/common/mocks.rs b/integration_tests/tests/common/mocks.rs index cf2eda06..6aae28e8 100644 --- a/integration_tests/tests/common/mocks.rs +++ b/integration_tests/tests/common/mocks.rs @@ -16,6 +16,7 @@ use lard_ingestion::util::{ pub struct MetadataMock { pub station: i32, + pub fromtime: DateTime, pub totime: DateTime, } @@ -25,6 +26,8 @@ impl MetadataMock { ) -> Result< ( HashMap>, + HashMap>, + HashMap>, HashMap>, ), lard_ingestion::Error, @@ -32,9 +35,18 @@ impl MetadataMock { let mut station_totime = HashMap::new(); station_totime.insert(self.station, self.totime); - let obs_pgm_totime: HashMap> = HashMap::new(); + let mut station_fromtime = HashMap::new(); + station_fromtime.insert(self.station, self.fromtime); - Ok((station_totime, obs_pgm_totime)) + let obs_pgm_totime: HashMap> = HashMap::new(); + let obs_pgm_fromtime: HashMap> = HashMap::new(); + + Ok(( + station_totime, + station_fromtime, + obs_pgm_totime, + obs_pgm_fromtime, + )) } } diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 0bdb9b9e..a65e5281 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -162,10 +162,12 @@ async fn test_totime_update() { }, ]; + let fromtime = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); let totime = Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(); let metadata_mock = MetadataMock { station: 10001, + fromtime, totime, }; @@ -190,12 +192,18 @@ async fn test_totime_update() { assert_eq!(totime, None); } - let (station_totime, obs_pgm_totime) = + let (station_totime, station_fromtime, obs_pgm_totime, obs_pgm_fromtime) = metadata_mock.cache_deactivated_stinfosys().await.unwrap(); - set_deactivated(&mut conn, &obs_pgm_totime, &station_totime) - .await - .unwrap(); + set_deactivated( + &mut conn, + &obs_pgm_totime, + &obs_pgm_fromtime, + &station_totime, + &station_fromtime, + ) + .await + .unwrap(); let after = get_totime(&conn).await; From b2134b7a099966f8271815b840e87df1aaaa100b Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Thu, 6 Nov 2025 13:04:04 +0100 Subject: [PATCH 02/25] first pass at logic for setting appropriate from/to for deactivated --- ingestion/src/util/stinfosys.rs | 79 +++++++++++++++++++++++++-------- 1 file changed, 61 insertions(+), 18 deletions(-) diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index 0f667e39..487e77f1 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -79,24 +79,67 @@ pub async fn fetch_deactivated( let mut futures = labels .iter() .map(async |label| -> Result<_, Error> { - // TODO: Figure out how best to set these when obs_pgm makes no sense ... - // particularly when it does not overlap at all with the time range - // |------obs_pgm------| - // |--station--| - // but also if the from or to from obs_pgm actually open beyond when the - // station had data aka: - // |------obs_pgm------| - // |--station--| - let totime = obs_pgm_totime - .get(&label.key) - .or(station_totime.get(&label.key.station_id)) - .copied(); - let fromtime = obs_pgm_fromtime - .get(&label.key) - .or(station_fromtime.get(&label.key.station_id)) - .copied(); - - Ok((label.id, fromtime, totime)) + if station_fromtime.get(&label.key.station_id).is_some() + && obs_pgm_totime.get(&label.key).is_some() + && obs_pgm_totime.get(&label.key) < station_fromtime.get(&label.key.station_id) + { + // |------obs_pgm------| + // |--station--| + // use the station from/to so as not to cause "twisting" + // aka a to time before from time + // TODO: Could do something smarter here, this data is maybe mislabeled? + let fromtime = station_fromtime.get(&label.key.station_id).copied(); + let totime = station_totime.get(&label.key.station_id).copied(); + Ok((label.id, fromtime, totime)) + } else { + // station had data and it overlaps in some way with obs_pgm + // so we assume we should use obs_pgm deactivation times... + // |------obs_pgm------| + // |--station--| + let mut fromtime = obs_pgm_fromtime + .get(&label.key) + .or(station_fromtime.get(&label.key.station_id)) + .copied(); + // check which is less "permissive" for fromtime + if obs_pgm_fromtime.get(&label.key).is_some() + && station_fromtime.get(&label.key.station_id).is_some() + { + // choose the later time + if let (Some(pgm_time), Some(station_time)) = ( + obs_pgm_fromtime.get(&label.key), + station_fromtime.get(&label.key.station_id), + ) { + // |----obs_pgm----| + // |--station--| + if station_time > pgm_time { + // use station time + fromtime = Some(*station_time); + } + } + } + let mut totime = obs_pgm_totime + .get(&label.key) + .or(station_totime.get(&label.key.station_id)) + .copied(); + // check which is less "permissive" for totime + if obs_pgm_totime.get(&label.key).is_some() + && station_totime.get(&label.key.station_id).is_some() + { + // choose the earlier time + if let (Some(pgm_time), Some(station_time)) = ( + obs_pgm_totime.get(&label.key), + station_totime.get(&label.key.station_id), + ) { + // |----obs_pgm----| + // |--station--| + if station_time < pgm_time { + // use station time + totime = Some(*station_time); + } + } + } + Ok((label.id, fromtime, totime)) + } }) .collect::>(); From faedff7535baa81682fb33ce76a7a382890ba866 Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Thu, 6 Nov 2025 13:09:01 +0100 Subject: [PATCH 03/25] add sql and function for fixing twisted from/to times --- ingestion/src/util/tsupdate.rs | 74 ++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index abae4128..5f0243a6 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -22,6 +22,21 @@ const OPEN_TIMESERIES_QUERY: &str = "\ WHERE met.param_id IS NOT NULL \ AND timeseries.totime IS NULL"; +// find timeseries where totime is before fromtime +const TWISTED_TIMESERIES_QUERY: &str = "\ + SELECT \ + timeseries.id, \ + met.station_id, \ + met.param_id, \ + met.type_id, \ + met.lvl, \ + met.sensor \ + FROM labels.met \ + JOIN timeseries \ + ON met.timeseries = timeseries.id \ + WHERE met.param_id IS NOT NULL \ + AND timeseries.totime < timeseries.fromtime"; + const UPDATE_QUERY: &str = "\ UPDATE public.timeseries SET \ totime = $1, \ @@ -94,3 +109,62 @@ pub async fn set_deactivated( Ok(()) } + +// these were created by the original fetch_deactivated function, since it only set totime +// need to be found and fixed separately +pub async fn untwist_timeseries( + conn: &mut PooledPgConn<'_>, + obs_pgm_totime: &HashMap>, + obs_pgm_fromtime: &HashMap>, + station_totime: &HashMap>, + station_fromtime: &HashMap>, +) -> Result<(), Error> { + let tx = conn.transaction().await?; + + // Explicitly take the lock so we can prevent concurrent access to the rows we are going to update + tx.execute( + "LOCK TABLE public.timeseries IN SHARE ROW EXCLUSIVE MODE", + &[], + ) + .await?; + + let rows = tx.query(TWISTED_TIMESERIES_QUERY, &[]).await?; + + let labels = rows + .iter() + .map(|row| { + MetLabel::new( + row.get(0), + row.get(1), + row.get(2), + row.get(3), + row.get(4), + row.get(5), + ) + }) + .collect(); + + let deactivated = fetch_deactivated( + obs_pgm_totime, + obs_pgm_fromtime, + station_totime, + station_fromtime, + labels, + ) + .await?; + + future::join_all(deactivated.into_iter().map(async |ts| { + match tx + .execute(UPDATE_QUERY, &[&ts.totime, &ts.fromtime, &ts.tsid]) + .await + { + Ok(_) => (), //info!("Tsid {} updated", ts.tsid), + Err(err) => error!("Could not update tsid {}: {}", ts.tsid, err), + } + })) + .await; + + tx.commit().await?; + + Ok(()) +} From 6bbf03fce9ee2d6ab0f67db06717159fcbc52ff0 Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Thu, 6 Nov 2025 13:27:33 +0100 Subject: [PATCH 04/25] refactor to put from / to in better order --- ingestion/src/cron.rs | 14 +++++++------- ingestion/src/util/stinfosys.rs | 22 +++++++++++----------- ingestion/src/util/tsupdate.rs | 16 ++++++++-------- integration_tests/tests/common/mocks.rs | 4 ++-- integration_tests/tests/end_to_end.rs | 4 ++-- 5 files changed, 30 insertions(+), 30 deletions(-) diff --git a/ingestion/src/cron.rs b/ingestion/src/cron.rs index 8a810f9d..76f627b5 100644 --- a/ingestion/src/cron.rs +++ b/ingestion/src/cron.rs @@ -39,23 +39,23 @@ pub async fn refresh_deactivated((stinfosys, pools): &(Stinfosys, DbPools)) { let mut open_conn = pools.open.get().await.unwrap(); let mut restricted_conn = pools.restricted.get().await.unwrap(); - let (station_totime, station_fromtime, obs_pgm_totime, obs_pgm_fromtime) = + let (obs_pgm_fromtime, obs_pgm_totime, station_fromtime, station_totime) = stinfosys.cache_deactivated_stinfosys().await.unwrap(); let (open_res, restricted_res) = tokio::join!( tsupdate::set_deactivated( &mut open_conn, - &obs_pgm_totime, &obs_pgm_fromtime, - &station_totime, - &station_fromtime + &obs_pgm_totime, + &station_fromtime, + &station_totime ), tsupdate::set_deactivated( &mut restricted_conn, - &obs_pgm_totime, &obs_pgm_fromtime, - &station_totime, - &station_fromtime + &obs_pgm_totime, + &station_fromtime, + &station_totime ), ); diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index 487e77f1..aeff21ce 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -37,10 +37,10 @@ impl Stinfosys { &self, ) -> Result< ( - HashMap>, - HashMap>, HashMap>, HashMap>, + HashMap>, + HashMap>, ), Error, > { @@ -53,27 +53,27 @@ impl Stinfosys { }); // Fetch all deactivated timeseries in Stinfosys - let (station_totime, station_fromtime, obs_pgm_totime, obs_pgm_fromtime) = tokio::try_join!( - fetch_station_totime(&client), - fetch_station_fromtime(&client), - fetch_obs_pgm_totime(self.levels.clone(), &client), + let (obs_pgm_fromtime, obs_pgm_totime, station_fromtime, station_totime) = tokio::try_join!( fetch_obs_pgm_fromtime(self.levels.clone(), &client), + fetch_obs_pgm_totime(self.levels.clone(), &client), + fetch_station_fromtime(&client), + fetch_station_totime(&client), )?; Ok(( - station_totime, - station_fromtime, - obs_pgm_totime, obs_pgm_fromtime, + obs_pgm_totime, + station_fromtime, + station_totime, )) } } pub async fn fetch_deactivated( - obs_pgm_totime: &HashMap>, obs_pgm_fromtime: &HashMap>, - station_totime: &HashMap>, + obs_pgm_totime: &HashMap>, station_fromtime: &HashMap>, + station_totime: &HashMap>, labels: Vec, ) -> Result, Error> { let mut futures = labels diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index 5f0243a6..7a2db490 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -55,10 +55,10 @@ pub struct DeactivatedTimeseries { pub async fn set_deactivated( conn: &mut PooledPgConn<'_>, - obs_pgm_totime: &HashMap>, obs_pgm_fromtime: &HashMap>, - station_totime: &HashMap>, + obs_pgm_totime: &HashMap>, station_fromtime: &HashMap>, + station_totime: &HashMap>, ) -> Result<(), Error> { let tx = conn.transaction().await?; @@ -86,10 +86,10 @@ pub async fn set_deactivated( .collect(); let deactivated = fetch_deactivated( - obs_pgm_totime, obs_pgm_fromtime, - station_totime, + obs_pgm_totime, station_fromtime, + station_totime, labels, ) .await?; @@ -114,10 +114,10 @@ pub async fn set_deactivated( // need to be found and fixed separately pub async fn untwist_timeseries( conn: &mut PooledPgConn<'_>, - obs_pgm_totime: &HashMap>, obs_pgm_fromtime: &HashMap>, - station_totime: &HashMap>, + obs_pgm_totime: &HashMap>, station_fromtime: &HashMap>, + station_totime: &HashMap>, ) -> Result<(), Error> { let tx = conn.transaction().await?; @@ -145,10 +145,10 @@ pub async fn untwist_timeseries( .collect(); let deactivated = fetch_deactivated( - obs_pgm_totime, obs_pgm_fromtime, - station_totime, + obs_pgm_totime, station_fromtime, + station_totime, labels, ) .await?; diff --git a/integration_tests/tests/common/mocks.rs b/integration_tests/tests/common/mocks.rs index 6aae28e8..fbbfb0aa 100644 --- a/integration_tests/tests/common/mocks.rs +++ b/integration_tests/tests/common/mocks.rs @@ -42,10 +42,10 @@ impl MetadataMock { let obs_pgm_fromtime: HashMap> = HashMap::new(); Ok(( - station_totime, station_fromtime, - obs_pgm_totime, + station_totime, obs_pgm_fromtime, + obs_pgm_totime, )) } } diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index a65e5281..d1df3e86 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -197,10 +197,10 @@ async fn test_totime_update() { set_deactivated( &mut conn, - &obs_pgm_totime, &obs_pgm_fromtime, - &station_totime, + &obs_pgm_totime, &station_fromtime, + &station_totime, ) .await .unwrap(); From 94eed714bfe19bcd89fd7885d6185e46ba6fcbe4 Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Thu, 6 Nov 2025 13:36:08 +0100 Subject: [PATCH 05/25] code to call the untwisting function --- ingestion/src/cron.rs | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/ingestion/src/cron.rs b/ingestion/src/cron.rs index 76f627b5..51247b64 100644 --- a/ingestion/src/cron.rs +++ b/ingestion/src/cron.rs @@ -33,7 +33,7 @@ pub async fn refresh_levels((stinfo_conn_string, level_table): &(String, LevelTa } pub async fn refresh_deactivated((stinfosys, pools): &(Stinfosys, DbPools)) { - info!("Updating timeseries totime"); + info!("Updating timeseries fromtime & totime"); // TODO: add retries instead of panicking? let mut open_conn = pools.open.get().await.unwrap(); @@ -66,4 +66,30 @@ pub async fn refresh_deactivated((stinfosys, pools): &(Stinfosys, DbPools)) { if let Err(err) = restricted_res { error!("Error while updating restricted db timeseries: {err}"); } + + info!("Updating / fixing twisted timeseries"); + let (open_res, restricted_res) = tokio::join!( + tsupdate::untwist_timeseries( + &mut open_conn, + &obs_pgm_fromtime, + &obs_pgm_totime, + &station_fromtime, + &station_totime + ), + tsupdate::untwist_timeseries( + &mut restricted_conn, + &obs_pgm_fromtime, + &obs_pgm_totime, + &station_fromtime, + &station_totime + ), + ); + + if let Err(err) = open_res { + error!("Error while untwisting open db timeseries: {err}"); + } + + if let Err(err) = restricted_res { + error!("Error while untwisting restricted db timeseries: {err}"); + } } From 0ea8a8d15801cdcd77409575e75b812cc2aa239f Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Thu, 6 Nov 2025 13:55:43 +0100 Subject: [PATCH 06/25] fix test --- integration_tests/tests/end_to_end.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index d1df3e86..140c65d9 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -192,7 +192,7 @@ async fn test_totime_update() { assert_eq!(totime, None); } - let (station_totime, station_fromtime, obs_pgm_totime, obs_pgm_fromtime) = + let (station_fromtime, station_totime, obs_pgm_fromtime, obs_pgm_totime) = metadata_mock.cache_deactivated_stinfosys().await.unwrap(); set_deactivated( From 3c64c91f3f653797095ef2a45069c9658919e5ac Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Sat, 8 Nov 2025 21:39:33 +0100 Subject: [PATCH 07/25] set deactivated to false when updating --- ingestion/src/util/tsupdate.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index 7a2db490..0f7084a7 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -37,11 +37,13 @@ const TWISTED_TIMESERIES_QUERY: &str = "\ WHERE met.param_id IS NOT NULL \ AND timeseries.totime < timeseries.fromtime"; +// Deactivated is information for the database +// for a timeseries it is enough that the fromtime is closed const UPDATE_QUERY: &str = "\ UPDATE public.timeseries SET \ totime = $1, \ fromtime = $2, \ - deactivated = true \ + deactivated = false \ WHERE id = $3"; pub struct DeactivatedTimeseries { From 86069a5f70d5efabecee642fe56e508d19323647 Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Mon, 10 Nov 2025 11:36:45 +0100 Subject: [PATCH 08/25] remove untwisting code, but add OR case to open timeseries query --- ingestion/src/cron.rs | 26 ------------ ingestion/src/util/tsupdate.rs | 77 +--------------------------------- 2 files changed, 2 insertions(+), 101 deletions(-) diff --git a/ingestion/src/cron.rs b/ingestion/src/cron.rs index 51247b64..9a2482ac 100644 --- a/ingestion/src/cron.rs +++ b/ingestion/src/cron.rs @@ -66,30 +66,4 @@ pub async fn refresh_deactivated((stinfosys, pools): &(Stinfosys, DbPools)) { if let Err(err) = restricted_res { error!("Error while updating restricted db timeseries: {err}"); } - - info!("Updating / fixing twisted timeseries"); - let (open_res, restricted_res) = tokio::join!( - tsupdate::untwist_timeseries( - &mut open_conn, - &obs_pgm_fromtime, - &obs_pgm_totime, - &station_fromtime, - &station_totime - ), - tsupdate::untwist_timeseries( - &mut restricted_conn, - &obs_pgm_fromtime, - &obs_pgm_totime, - &station_fromtime, - &station_totime - ), - ); - - if let Err(err) = open_res { - error!("Error while untwisting open db timeseries: {err}"); - } - - if let Err(err) = restricted_res { - error!("Error while untwisting restricted db timeseries: {err}"); - } } diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index 0f7084a7..3e0e77eb 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -20,22 +20,8 @@ const OPEN_TIMESERIES_QUERY: &str = "\ JOIN timeseries \ ON met.timeseries = timeseries.id \ WHERE met.param_id IS NOT NULL \ - AND timeseries.totime IS NULL"; - -// find timeseries where totime is before fromtime -const TWISTED_TIMESERIES_QUERY: &str = "\ - SELECT \ - timeseries.id, \ - met.station_id, \ - met.param_id, \ - met.type_id, \ - met.lvl, \ - met.sensor \ - FROM labels.met \ - JOIN timeseries \ - ON met.timeseries = timeseries.id \ - WHERE met.param_id IS NOT NULL \ - AND timeseries.totime < timeseries.fromtime"; + AND (timeseries.totime IS NULL \ + OR timeseries.totime < timeseries.fromtime)"; // Deactivated is information for the database // for a timeseries it is enough that the fromtime is closed @@ -111,62 +97,3 @@ pub async fn set_deactivated( Ok(()) } - -// these were created by the original fetch_deactivated function, since it only set totime -// need to be found and fixed separately -pub async fn untwist_timeseries( - conn: &mut PooledPgConn<'_>, - obs_pgm_fromtime: &HashMap>, - obs_pgm_totime: &HashMap>, - station_fromtime: &HashMap>, - station_totime: &HashMap>, -) -> Result<(), Error> { - let tx = conn.transaction().await?; - - // Explicitly take the lock so we can prevent concurrent access to the rows we are going to update - tx.execute( - "LOCK TABLE public.timeseries IN SHARE ROW EXCLUSIVE MODE", - &[], - ) - .await?; - - let rows = tx.query(TWISTED_TIMESERIES_QUERY, &[]).await?; - - let labels = rows - .iter() - .map(|row| { - MetLabel::new( - row.get(0), - row.get(1), - row.get(2), - row.get(3), - row.get(4), - row.get(5), - ) - }) - .collect(); - - let deactivated = fetch_deactivated( - obs_pgm_fromtime, - obs_pgm_totime, - station_fromtime, - station_totime, - labels, - ) - .await?; - - future::join_all(deactivated.into_iter().map(async |ts| { - match tx - .execute(UPDATE_QUERY, &[&ts.totime, &ts.fromtime, &ts.tsid]) - .await - { - Ok(_) => (), //info!("Tsid {} updated", ts.tsid), - Err(err) => error!("Could not update tsid {}: {}", ts.tsid, err), - } - })) - .await; - - tx.commit().await?; - - Ok(()) -} From a9dfed918ebd137b9af95db7ce50946a3850ba66 Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Mon, 10 Nov 2025 11:50:45 +0100 Subject: [PATCH 09/25] invert from/to in the test --- integration_tests/tests/end_to_end.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 140c65d9..a006a0a9 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -162,8 +162,9 @@ async fn test_totime_update() { }, ]; - let fromtime = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); - let totime = Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(); + // test "untwisting" from / to + let fromtime = Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(); + let totime = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); let metadata_mock = MetadataMock { station: 10001, From 7caca7ec153f962327457d42ab9c9aea2165f443 Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Wed, 12 Nov 2025 22:14:18 +0100 Subject: [PATCH 10/25] close off timeseries that are outside obs_pgm range --- ingestion/src/util/stinfosys.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index aeff21ce..0a3ce307 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -86,11 +86,12 @@ pub async fn fetch_deactivated( // |------obs_pgm------| // |--station--| // use the station from/to so as not to cause "twisting" - // aka a to time before from time - // TODO: Could do something smarter here, this data is maybe mislabeled? + // (twisting = a to time before from time) let fromtime = station_fromtime.get(&label.key.station_id).copied(); - let totime = station_totime.get(&label.key.station_id).copied(); - Ok((label.id, fromtime, totime)) + let _totime = station_totime.get(&label.key.station_id).copied(); + // NOTE: we are choosing to essentially close off this timeseries, since we believe + // it is mislabelled. Obs_pgm is essentially saying it should not exist. + Ok((label.id, fromtime, fromtime)) } else { // station had data and it overlaps in some way with obs_pgm // so we assume we should use obs_pgm deactivation times... From a60a9613241a5e42e81fd9388cb2f2cec81b885b Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Thu, 13 Nov 2025 21:48:00 +0100 Subject: [PATCH 11/25] get the from/to for the timeseries from lard, and take that into account along with obspgm and station times --- ingestion/src/cron.rs | 4 +- ingestion/src/util/stinfosys.rs | 89 ++++++++++++++++----------- ingestion/src/util/tsupdate.rs | 43 +++++++++++-- integration_tests/tests/end_to_end.rs | 4 +- 4 files changed, 93 insertions(+), 47 deletions(-) diff --git a/ingestion/src/cron.rs b/ingestion/src/cron.rs index 9a2482ac..f8423076 100644 --- a/ingestion/src/cron.rs +++ b/ingestion/src/cron.rs @@ -43,14 +43,14 @@ pub async fn refresh_deactivated((stinfosys, pools): &(Stinfosys, DbPools)) { stinfosys.cache_deactivated_stinfosys().await.unwrap(); let (open_res, restricted_res) = tokio::join!( - tsupdate::set_deactivated( + tsupdate::set_from_to_obs_pgm( &mut open_conn, &obs_pgm_fromtime, &obs_pgm_totime, &station_fromtime, &station_totime ), - tsupdate::set_deactivated( + tsupdate::set_from_to_obs_pgm( &mut restricted_conn, &obs_pgm_fromtime, &obs_pgm_totime, diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index 0a3ce307..f6bcdc2e 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -10,7 +10,7 @@ use util::{MetLabel, MetTimeseriesKey}; use crate::{ util::{ levels::{param_get_level, LevelTable}, - tsupdate::DeactivatedTimeseries, + tsupdate::{TSFromTo, TSupdateTimeseries}, }, Error, }; @@ -69,52 +69,69 @@ impl Stinfosys { } } -pub async fn fetch_deactivated( +pub async fn fetch_from_to_for_update( obs_pgm_fromtime: &HashMap>, obs_pgm_totime: &HashMap>, station_fromtime: &HashMap>, station_totime: &HashMap>, + ts_from_to: HashMap, labels: Vec, -) -> Result, Error> { +) -> Result, Error> { let mut futures = labels .iter() .map(async |label| -> Result<_, Error> { - if station_fromtime.get(&label.key.station_id).is_some() - && obs_pgm_totime.get(&label.key).is_some() - && obs_pgm_totime.get(&label.key) < station_fromtime.get(&label.key.station_id) + if obs_pgm_totime.get(&label.key).is_some() + && ts_from_to.contains_key(&label.id) + && obs_pgm_totime.get(&label.key) + < ts_from_to.get(&label.id).unwrap().fromtime.as_ref() { + // check if the fromtime of the timeseries is before the totime from obspgm // |------obs_pgm------| - // |--station--| - // use the station from/to so as not to cause "twisting" + // |--timeseries--| + // use the timeseries from/to so as not to cause "twisting" // (twisting = a to time before from time) - let fromtime = station_fromtime.get(&label.key.station_id).copied(); - let _totime = station_totime.get(&label.key.station_id).copied(); + let fromtime = ts_from_to.get(&label.id).unwrap().fromtime; + let _totime = ts_from_to.get(&label.id).unwrap().totime; + // NOTE: we are choosing to essentially close off this timeseries, since we believe + // it is mislabelled. Obs_pgm is essentially saying it should not exist. + Ok((label.id, fromtime, fromtime)) + } else if station_totime.get(&label.key.station_id).is_some() + && ts_from_to.contains_key(&label.id) + && station_totime.get(&label.key.station_id) + < ts_from_to.get(&label.id).unwrap().fromtime.as_ref() + { + // check if the fromtime of the timeseries is before the totime from the station table + // |------station------| + // |--timeseries--| + // use the timeseries from/to so as not to cause "twisting" + // (twisting = a to time before from time) + let fromtime = ts_from_to.get(&label.id).unwrap().fromtime; + let _totime = ts_from_to.get(&label.id).unwrap().totime; // NOTE: we are choosing to essentially close off this timeseries, since we believe // it is mislabelled. Obs_pgm is essentially saying it should not exist. Ok((label.id, fromtime, fromtime)) } else { - // station had data and it overlaps in some way with obs_pgm - // so we assume we should use obs_pgm deactivation times... - // |------obs_pgm------| - // |--station--| + // station had data and it overlaps in some way with obs_pgm or the station table + // so we assume we should use obs_pgm or station deactivation times... + // |------obs_pgm / station------| + // |--timeseries--| let mut fromtime = obs_pgm_fromtime .get(&label.key) .or(station_fromtime.get(&label.key.station_id)) .copied(); // check which is less "permissive" for fromtime - if obs_pgm_fromtime.get(&label.key).is_some() - && station_fromtime.get(&label.key.station_id).is_some() + if obs_pgm_fromtime.get(&label.key).is_some() && ts_from_to.contains_key(&label.id) { // choose the later time - if let (Some(pgm_time), Some(station_time)) = ( + if let (Some(pgm_time), Some(ts_time)) = ( obs_pgm_fromtime.get(&label.key), - station_fromtime.get(&label.key.station_id), + ts_from_to.get(&label.id).unwrap().fromtime, ) { - // |----obs_pgm----| - // |--station--| - if station_time > pgm_time { - // use station time - fromtime = Some(*station_time); + // |-----obs_pgm-----| + // |---timeseries---| + if ts_time > *pgm_time { + // use timeseries time + fromtime = Some(ts_time); } } } @@ -123,19 +140,17 @@ pub async fn fetch_deactivated( .or(station_totime.get(&label.key.station_id)) .copied(); // check which is less "permissive" for totime - if obs_pgm_totime.get(&label.key).is_some() - && station_totime.get(&label.key.station_id).is_some() - { + if obs_pgm_totime.get(&label.key).is_some() && ts_from_to.contains_key(&label.id) { // choose the earlier time - if let (Some(pgm_time), Some(station_time)) = ( + if let (Some(pgm_time), Some(ts_time)) = ( obs_pgm_totime.get(&label.key), - station_totime.get(&label.key.station_id), + ts_from_to.get(&label.id).unwrap().totime, ) { - // |----obs_pgm----| - // |--station--| - if station_time < pgm_time { - // use station time - totime = Some(*station_time); + // |------obs_pgm------| + // |--timeseries--| + if ts_time < *pgm_time { + // use timeseries time + totime = Some(ts_time); } } } @@ -144,10 +159,10 @@ pub async fn fetch_deactivated( }) .collect::>(); - let mut deactivated = vec![]; + let mut ts_update = vec![]; while let Some(res) = futures.next().await { let ts = match res? { - (tsid, Some(fromtime), Some(totime)) => DeactivatedTimeseries { + (tsid, Some(fromtime), Some(totime)) => TSupdateTimeseries { tsid, fromtime, totime, @@ -156,10 +171,10 @@ pub async fn fetch_deactivated( _ => continue, }; - deactivated.push(ts); + ts_update.push(ts); } - Ok(deactivated) + Ok(ts_update) } async fn fetch_obs_pgm_totime(levels: LevelTable, conn: &Client) -> Result { diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index 3e0e77eb..c2f8c6d9 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -5,9 +5,11 @@ use tracing::error; use util::{MetLabel, MetTimeseriesKey, PooledPgConn}; -use crate::{util::stinfosys::fetch_deactivated, Error}; +use crate::{util::stinfosys::fetch_from_to_for_update, Error}; // TODO: remove the WHERE when we remove/prevent NULL param IDs in the table +// TODO: actually get the from/to of the underlying data with a different call? +// unsure if these from/to times are correct / up to date... const OPEN_TIMESERIES_QUERY: &str = "\ SELECT \ timeseries.id, \ @@ -15,7 +17,9 @@ const OPEN_TIMESERIES_QUERY: &str = "\ met.param_id, \ met.type_id, \ met.lvl, \ - met.sensor \ + met.sensor, \ + timeseries.fromtime, \ + timeseries.totime \ FROM labels.met \ JOIN timeseries \ ON met.timeseries = timeseries.id \ @@ -32,7 +36,7 @@ const UPDATE_QUERY: &str = "\ deactivated = false \ WHERE id = $3"; -pub struct DeactivatedTimeseries { +pub struct TSupdateTimeseries { /// Timeseries to be updated pub tsid: i64, /// Fromtime value found in the metadata source @@ -41,7 +45,28 @@ pub struct DeactivatedTimeseries { pub totime: DateTime, } -pub async fn set_deactivated( +impl TSupdateTimeseries { + pub fn new(tsid: i64, fromtime: DateTime, totime: DateTime) -> TSupdateTimeseries { + TSupdateTimeseries { + tsid, + fromtime, + totime, + } + } +} + +pub struct TSFromTo { + pub fromtime: Option>, + pub totime: Option>, +} + +impl TSFromTo { + pub fn new(fromtime: Option>, totime: Option>) -> TSFromTo { + TSFromTo { fromtime, totime } + } +} + +pub async fn set_from_to_obs_pgm( conn: &mut PooledPgConn<'_>, obs_pgm_fromtime: &HashMap>, obs_pgm_totime: &HashMap>, @@ -59,7 +84,7 @@ pub async fn set_deactivated( let rows = tx.query(OPEN_TIMESERIES_QUERY, &[]).await?; - let labels = rows + let labels: Vec = rows .iter() .map(|row| { MetLabel::new( @@ -73,11 +98,17 @@ pub async fn set_deactivated( }) .collect(); - let deactivated = fetch_deactivated( + let mut ts_from_to: HashMap = HashMap::new(); + rows.iter().for_each(|row| { + ts_from_to.insert(row.get(0), TSFromTo::new(row.get(6), row.get(7))); + }); + + let deactivated = fetch_from_to_for_update( obs_pgm_fromtime, obs_pgm_totime, station_fromtime, station_totime, + ts_from_to, labels, ) .await?; diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index a006a0a9..682688ca 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -5,7 +5,7 @@ use rove::data_switch::{DataConnector, SpaceSpec, TimeSpec, Timestamp}; use tokio_postgres::NoTls; use lard_egress::{timeseries::Timeseries, LatestResp, TimeseriesResp, TimesliceResp}; -use lard_ingestion::{util::tsupdate::set_deactivated, KldataResp}; +use lard_ingestion::{util::tsupdate::set_from_to_obs_pgm, KldataResp}; pub mod common; use common::{e2e_test_wrapper, mocks::MetadataMock, Param, TestData}; @@ -196,7 +196,7 @@ async fn test_totime_update() { let (station_fromtime, station_totime, obs_pgm_fromtime, obs_pgm_totime) = metadata_mock.cache_deactivated_stinfosys().await.unwrap(); - set_deactivated( + set_from_to_obs_pgm( &mut conn, &obs_pgm_fromtime, &obs_pgm_totime, From 601f642ac522c926d4592cfcd1253df557e805d9 Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Thu, 13 Nov 2025 22:24:07 +0100 Subject: [PATCH 12/25] refactor the if/else logic for deciding which from/to to use --- ingestion/src/util/stinfosys.rs | 88 ++++++++++++++++----------------- 1 file changed, 42 insertions(+), 46 deletions(-) diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index f6bcdc2e..b4f78d3a 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -80,29 +80,32 @@ pub async fn fetch_from_to_for_update( let mut futures = labels .iter() .map(async |label| -> Result<_, Error> { - if obs_pgm_totime.get(&label.key).is_some() + // use obs_pgm if exists, or else station if exists, or else will be none + let mut fromtime = obs_pgm_fromtime + .get(&label.key) + .or(station_fromtime.get(&label.key.station_id)) + .copied(); + + let mut totime = obs_pgm_totime + .get(&label.key) + .or(station_totime.get(&label.key.station_id)) + .copied(); + + // check if the fromtime of the timeseries is before the totime from obspgm + // |------obs_pgm------| + // |--timeseries--| + // or if the totime of the timeseries is before the fromtime from obspgm + // |------obs_pgm------| + // |--timeseries--| + if (totime.is_some() && ts_from_to.contains_key(&label.id) - && obs_pgm_totime.get(&label.key) - < ts_from_to.get(&label.id).unwrap().fromtime.as_ref() + && ts_from_to.get(&label.id).unwrap().fromtime.is_some() + && totime.unwrap() < ts_from_to.get(&label.id).unwrap().fromtime.unwrap()) + || (fromtime.is_some() + && ts_from_to.contains_key(&label.id) + && ts_from_to.get(&label.id).unwrap().totime.is_some() + && fromtime.unwrap() > ts_from_to.get(&label.id).unwrap().totime.unwrap()) { - // check if the fromtime of the timeseries is before the totime from obspgm - // |------obs_pgm------| - // |--timeseries--| - // use the timeseries from/to so as not to cause "twisting" - // (twisting = a to time before from time) - let fromtime = ts_from_to.get(&label.id).unwrap().fromtime; - let _totime = ts_from_to.get(&label.id).unwrap().totime; - // NOTE: we are choosing to essentially close off this timeseries, since we believe - // it is mislabelled. Obs_pgm is essentially saying it should not exist. - Ok((label.id, fromtime, fromtime)) - } else if station_totime.get(&label.key.station_id).is_some() - && ts_from_to.contains_key(&label.id) - && station_totime.get(&label.key.station_id) - < ts_from_to.get(&label.id).unwrap().fromtime.as_ref() - { - // check if the fromtime of the timeseries is before the totime from the station table - // |------station------| - // |--timeseries--| // use the timeseries from/to so as not to cause "twisting" // (twisting = a to time before from time) let fromtime = ts_from_to.get(&label.id).unwrap().fromtime; @@ -114,46 +117,39 @@ pub async fn fetch_from_to_for_update( // station had data and it overlaps in some way with obs_pgm or the station table // so we assume we should use obs_pgm or station deactivation times... // |------obs_pgm / station------| - // |--timeseries--| - let mut fromtime = obs_pgm_fromtime - .get(&label.key) - .or(station_fromtime.get(&label.key.station_id)) - .copied(); + // |---timeseries---| // check which is less "permissive" for fromtime - if obs_pgm_fromtime.get(&label.key).is_some() && ts_from_to.contains_key(&label.id) - { + if fromtime.is_some() && ts_from_to.contains_key(&label.id) { // choose the later time - if let (Some(pgm_time), Some(ts_time)) = ( - obs_pgm_fromtime.get(&label.key), - ts_from_to.get(&label.id).unwrap().fromtime, - ) { - // |-----obs_pgm-----| - // |---timeseries---| - if ts_time > *pgm_time { + if let (Some(meta_time), Some(ts_time)) = + (fromtime, ts_from_to.get(&label.id).unwrap().fromtime) + { + // |------obs_pgm------| + // |---timeseries---| + if ts_time > meta_time { // use timeseries time fromtime = Some(ts_time); } } } - let mut totime = obs_pgm_totime - .get(&label.key) - .or(station_totime.get(&label.key.station_id)) - .copied(); // check which is less "permissive" for totime - if obs_pgm_totime.get(&label.key).is_some() && ts_from_to.contains_key(&label.id) { + if totime.is_some() && ts_from_to.contains_key(&label.id) { // choose the earlier time - if let (Some(pgm_time), Some(ts_time)) = ( - obs_pgm_totime.get(&label.key), - ts_from_to.get(&label.id).unwrap().totime, - ) { + if let (Some(meta_time), Some(ts_time)) = + (totime, ts_from_to.get(&label.id).unwrap().totime) + { // |------obs_pgm------| - // |--timeseries--| - if ts_time < *pgm_time { + // |---timeseries---| + if ts_time < meta_time { // use timeseries time totime = Some(ts_time); } } } + // lastly: override the fromtime to be the station's if nothing existed in the metadata + if fromtime.is_none() && ts_from_to.contains_key(&label.id) { + fromtime = ts_from_to.get(&label.id).unwrap().totime + } Ok((label.id, fromtime, totime)) } }) From 4495abc5b6541838509fbcb73e29c95ccec98f8a Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Fri, 14 Nov 2025 09:22:22 +0100 Subject: [PATCH 13/25] add a note about sql for getting ts from lard --- ingestion/src/util/tsupdate.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index c2f8c6d9..327319d5 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -8,8 +8,9 @@ use util::{MetLabel, MetTimeseriesKey, PooledPgConn}; use crate::{util::stinfosys::fetch_from_to_for_update, Error}; // TODO: remove the WHERE when we remove/prevent NULL param IDs in the table -// TODO: actually get the from/to of the underlying data with a different call? -// unsure if these from/to times are correct / up to date... +// NOTE: In addition to finding open timeseries, we also find the timeseries +// where somehow the fromtime is before the to time. This is because of an +// earlier bug, but could happen for other reasons. const OPEN_TIMESERIES_QUERY: &str = "\ SELECT \ timeseries.id, \ @@ -26,6 +27,8 @@ const OPEN_TIMESERIES_QUERY: &str = "\ WHERE met.param_id IS NOT NULL \ AND (timeseries.totime IS NULL \ OR timeseries.totime < timeseries.fromtime)"; +// TODO: should we also get the from/to from the underlying data? +// this would be an intensive call and maybe should not be done often? // Deactivated is information for the database // for a timeseries it is enough that the fromtime is closed From a80b1a7976eeeb31b3ed798a9650d0000dc9b877 Mon Sep 17 00:00:00 2001 From: Louise Oram Date: Sun, 16 Nov 2025 22:13:29 +0100 Subject: [PATCH 14/25] use opentimerange from patchwork --- Cargo.lock | 1 + egress/src/patchwork.rs | 2 +- ingestion/Cargo.toml | 1 + ingestion/src/util/stinfosys.rs | 43 ++++++++++++++------------- ingestion/src/util/tsupdate.rs | 19 +++--------- integration_tests/tests/end_to_end.rs | 3 +- 6 files changed, 31 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9f787313..6c06f27c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1868,6 +1868,7 @@ dependencies = [ "chronoutil", "csv", "futures", + "lard_egress", "metrics", "metrics-exporter-prometheus", "quick-xml", diff --git a/egress/src/patchwork.rs b/egress/src/patchwork.rs index 6fb58d66..f429413f 100644 --- a/egress/src/patchwork.rs +++ b/egress/src/patchwork.rs @@ -189,7 +189,7 @@ impl OpenTimerange { /// Used to cut the priorities to cover ranges that actually matter to a particular timeseries /// Takes the from and to times of the timeseries as well as the from and to of the priority range /// Returns an option, since it could be they do not overlapp at all (and thus it returns empty) - fn overlap(&self, other: Self) -> Option { + pub fn overlap(&self, other: Self) -> Option { let fromtime = match (self.from, other.from) { (Some(lhs), Some(rhs)) => Some(lhs.max(rhs)), // return the later one (Some(lhs), None) => Some(lhs), diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml index 2bc77a30..8b384960 100644 --- a/ingestion/Cargo.toml +++ b/ingestion/Cargo.toml @@ -17,6 +17,7 @@ bytes.workspace = true chrono.workspace = true chronoutil.workspace = true util = { path = "../util" } +lard_egress = { path = "../egress" } csv.workspace = true futures.workspace = true metrics.workspace = true diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index b4f78d3a..3836271b 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -5,15 +5,15 @@ use futures::{stream::FuturesUnordered, StreamExt}; use tokio_postgres::{Client, NoTls}; use tracing::error; -use util::{MetLabel, MetTimeseriesKey}; - use crate::{ util::{ levels::{param_get_level, LevelTable}, - tsupdate::{TSFromTo, TSupdateTimeseries}, + tsupdate::TSupdateTimeseries, }, Error, }; +use lard_egress::patchwork::OpenTimerange; +use util::{MetLabel, MetTimeseriesKey}; pub struct Stinfosys { conn_string: String, @@ -74,7 +74,7 @@ pub async fn fetch_from_to_for_update( obs_pgm_totime: &HashMap>, station_fromtime: &HashMap>, station_totime: &HashMap>, - ts_from_to: HashMap, + ts_from_to: HashMap, labels: Vec, ) -> Result, Error> { let mut futures = labels @@ -91,25 +91,32 @@ pub async fn fetch_from_to_for_update( .or(station_totime.get(&label.key.station_id)) .copied(); + // no metadata, keep the ts from/to + if fromtime.is_none() && totime.is_none() && ts_from_to.contains_key(&label.id) { + let fromtime = ts_from_to.get(&label.id).unwrap().from; + let totime = ts_from_to.get(&label.id).unwrap().to; + Ok((label.id, fromtime, totime)) + } // check if the fromtime of the timeseries is before the totime from obspgm // |------obs_pgm------| // |--timeseries--| // or if the totime of the timeseries is before the fromtime from obspgm // |------obs_pgm------| // |--timeseries--| - if (totime.is_some() - && ts_from_to.contains_key(&label.id) - && ts_from_to.get(&label.id).unwrap().fromtime.is_some() - && totime.unwrap() < ts_from_to.get(&label.id).unwrap().fromtime.unwrap()) - || (fromtime.is_some() - && ts_from_to.contains_key(&label.id) - && ts_from_to.get(&label.id).unwrap().totime.is_some() - && fromtime.unwrap() > ts_from_to.get(&label.id).unwrap().totime.unwrap()) + else if ts_from_to.contains_key(&label.id) + && ts_from_to + .get(&label.id) + .unwrap() + .overlap(OpenTimerange { + from: fromtime, + to: totime, + }) + .is_none() { // use the timeseries from/to so as not to cause "twisting" // (twisting = a to time before from time) - let fromtime = ts_from_to.get(&label.id).unwrap().fromtime; - let _totime = ts_from_to.get(&label.id).unwrap().totime; + let fromtime = ts_from_to.get(&label.id).unwrap().from; + let _totime = ts_from_to.get(&label.id).unwrap().to; // NOTE: we are choosing to essentially close off this timeseries, since we believe // it is mislabelled. Obs_pgm is essentially saying it should not exist. Ok((label.id, fromtime, fromtime)) @@ -122,7 +129,7 @@ pub async fn fetch_from_to_for_update( if fromtime.is_some() && ts_from_to.contains_key(&label.id) { // choose the later time if let (Some(meta_time), Some(ts_time)) = - (fromtime, ts_from_to.get(&label.id).unwrap().fromtime) + (fromtime, ts_from_to.get(&label.id).unwrap().from) { // |------obs_pgm------| // |---timeseries---| @@ -136,7 +143,7 @@ pub async fn fetch_from_to_for_update( if totime.is_some() && ts_from_to.contains_key(&label.id) { // choose the earlier time if let (Some(meta_time), Some(ts_time)) = - (totime, ts_from_to.get(&label.id).unwrap().totime) + (totime, ts_from_to.get(&label.id).unwrap().to) { // |------obs_pgm------| // |---timeseries---| @@ -146,10 +153,6 @@ pub async fn fetch_from_to_for_update( } } } - // lastly: override the fromtime to be the station's if nothing existed in the metadata - if fromtime.is_none() && ts_from_to.contains_key(&label.id) { - fromtime = ts_from_to.get(&label.id).unwrap().totime - } Ok((label.id, fromtime, totime)) } }) diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index 327319d5..e706590b 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -3,9 +3,9 @@ use futures::future; use std::collections::HashMap; use tracing::error; -use util::{MetLabel, MetTimeseriesKey, PooledPgConn}; - use crate::{util::stinfosys::fetch_from_to_for_update, Error}; +use lard_egress::patchwork::OpenTimerange; +use util::{MetLabel, MetTimeseriesKey, PooledPgConn}; // TODO: remove the WHERE when we remove/prevent NULL param IDs in the table // NOTE: In addition to finding open timeseries, we also find the timeseries @@ -58,17 +58,6 @@ impl TSupdateTimeseries { } } -pub struct TSFromTo { - pub fromtime: Option>, - pub totime: Option>, -} - -impl TSFromTo { - pub fn new(fromtime: Option>, totime: Option>) -> TSFromTo { - TSFromTo { fromtime, totime } - } -} - pub async fn set_from_to_obs_pgm( conn: &mut PooledPgConn<'_>, obs_pgm_fromtime: &HashMap>, @@ -101,9 +90,9 @@ pub async fn set_from_to_obs_pgm( }) .collect(); - let mut ts_from_to: HashMap = HashMap::new(); + let mut ts_from_to: HashMap = HashMap::new(); rows.iter().for_each(|row| { - ts_from_to.insert(row.get(0), TSFromTo::new(row.get(6), row.get(7))); + ts_from_to.insert(row.get(0), OpenTimerange::new(row.get(6), row.get(7))); }); let deactivated = fetch_from_to_for_update( diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 682688ca..3ba11a0e 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -162,8 +162,7 @@ async fn test_totime_update() { }, ]; - // test "untwisting" from / to - let fromtime = Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(); + let fromtime = Utc.with_ymd_and_hms(1981, 1, 1, 0, 0, 0).unwrap(); let totime = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); let metadata_mock = MetadataMock { From 56b679baa61de726d40d79c6f7faf8ba6df6dcd1 Mon Sep 17 00:00:00 2001 From: Louise Carolyn Oram Date: Wed, 19 Nov 2025 21:36:04 +0100 Subject: [PATCH 15/25] simplify code for choosing from/to --- ingestion/src/util/stinfosys.rs | 116 ++++++++++++++------------------ 1 file changed, 50 insertions(+), 66 deletions(-) diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index 3836271b..3b24b620 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -80,31 +80,25 @@ pub async fn fetch_from_to_for_update( let mut futures = labels .iter() .map(async |label| -> Result<_, Error> { - // use obs_pgm if exists, or else station if exists, or else will be none - let mut fromtime = obs_pgm_fromtime - .get(&label.key) - .or(station_fromtime.get(&label.key.station_id)) - .copied(); - - let mut totime = obs_pgm_totime - .get(&label.key) - .or(station_totime.get(&label.key.station_id)) - .copied(); - - // no metadata, keep the ts from/to - if fromtime.is_none() && totime.is_none() && ts_from_to.contains_key(&label.id) { - let fromtime = ts_from_to.get(&label.id).unwrap().from; - let totime = ts_from_to.get(&label.id).unwrap().to; - Ok((label.id, fromtime, totime)) - } - // check if the fromtime of the timeseries is before the totime from obspgm - // |------obs_pgm------| - // |--timeseries--| - // or if the totime of the timeseries is before the fromtime from obspgm - // |------obs_pgm------| - // |--timeseries--| - else if ts_from_to.contains_key(&label.id) - && ts_from_to + // check we have this key for the TS + if ts_from_to.contains_key(&label.id) { + // use obs_pgm if exists, or else station if exists, or else will be none + let fromtime = obs_pgm_fromtime + .get(&label.key) + .or(station_fromtime.get(&label.key.station_id)) + .copied(); + + let totime = obs_pgm_totime + .get(&label.key) + .or(station_totime.get(&label.key.station_id)) + .copied(); + + if fromtime.is_none() && totime.is_none() { + // no metadata, keep the ts from/to + let fromtime = ts_from_to.get(&label.id).unwrap().from; + let totime = ts_from_to.get(&label.id).unwrap().to; + Ok((label.id, fromtime, totime)) + } else if ts_from_to .get(&label.id) .unwrap() .overlap(OpenTimerange { @@ -112,48 +106,38 @@ pub async fn fetch_from_to_for_update( to: totime, }) .is_none() - { - // use the timeseries from/to so as not to cause "twisting" - // (twisting = a to time before from time) - let fromtime = ts_from_to.get(&label.id).unwrap().from; - let _totime = ts_from_to.get(&label.id).unwrap().to; - // NOTE: we are choosing to essentially close off this timeseries, since we believe - // it is mislabelled. Obs_pgm is essentially saying it should not exist. - Ok((label.id, fromtime, fromtime)) - } else { - // station had data and it overlaps in some way with obs_pgm or the station table - // so we assume we should use obs_pgm or station deactivation times... - // |------obs_pgm / station------| - // |---timeseries---| - // check which is less "permissive" for fromtime - if fromtime.is_some() && ts_from_to.contains_key(&label.id) { - // choose the later time - if let (Some(meta_time), Some(ts_time)) = - (fromtime, ts_from_to.get(&label.id).unwrap().from) - { - // |------obs_pgm------| - // |---timeseries---| - if ts_time > meta_time { - // use timeseries time - fromtime = Some(ts_time); - } - } + { + // check if the fromtime of the timeseries is before the totime from obspgm + // |------obs_pgm------| + // |--timeseries--| + // or if the totime of the timeseries is before the fromtime from obspgm + // |------obs_pgm------| + // |--timeseries--| + // use the timeseries from/to so as not to cause "twisting" + // (twisting = a to time before from time) + let fromtime = ts_from_to.get(&label.id).unwrap().from; + let _totime = ts_from_to.get(&label.id).unwrap().to; + // NOTE: we are choosing to essentially close off this timeseries, since we believe + // it is mislabelled. Obs_pgm is essentially saying it should not exist. + Ok((label.id, fromtime, fromtime)) + } else { + // station had data and it overlaps in some way with obs_pgm or the station table + // so we assume we should use the overlapp between the TS from/to and the + // obs_pgm or station deactivation times... + // |------obs_pgm / station------| + // |---timeseries---| + let overlapp = ts_from_to + .get(&label.id) + .unwrap() + .overlap(OpenTimerange { + from: fromtime, + to: totime, + }) + .unwrap(); + Ok((label.id, overlapp.from, overlapp.to)) } - // check which is less "permissive" for totime - if totime.is_some() && ts_from_to.contains_key(&label.id) { - // choose the earlier time - if let (Some(meta_time), Some(ts_time)) = - (totime, ts_from_to.get(&label.id).unwrap().to) - { - // |------obs_pgm------| - // |---timeseries---| - if ts_time < meta_time { - // use timeseries time - totime = Some(ts_time); - } - } - } - Ok((label.id, fromtime, totime)) + } else { + Ok((label.id, None, None)) // would this ever occur? TODO: log? } }) .collect::>(); From 6a39f4382862f7e2bae9c7140d19c03e52fe94d8 Mon Sep 17 00:00:00 2001 From: Louise Carolyn Oram Date: Thu, 20 Nov 2025 09:38:14 +0100 Subject: [PATCH 16/25] get from/to for obs_pgm and station in one call each --- ingestion/src/cron.rs | 19 +-- ingestion/src/util/stinfosys.rs | 155 ++++++++---------------- ingestion/src/util/tsupdate.rs | 17 +-- integration_tests/tests/common/mocks.rs | 33 +++-- integration_tests/tests/end_to_end.rs | 14 +-- 5 files changed, 78 insertions(+), 160 deletions(-) diff --git a/ingestion/src/cron.rs b/ingestion/src/cron.rs index f8423076..d16004c2 100644 --- a/ingestion/src/cron.rs +++ b/ingestion/src/cron.rs @@ -39,24 +39,11 @@ pub async fn refresh_deactivated((stinfosys, pools): &(Stinfosys, DbPools)) { let mut open_conn = pools.open.get().await.unwrap(); let mut restricted_conn = pools.restricted.get().await.unwrap(); - let (obs_pgm_fromtime, obs_pgm_totime, station_fromtime, station_totime) = - stinfosys.cache_deactivated_stinfosys().await.unwrap(); + let (obs_pgm_times, station_times) = stinfosys.cache_deactivated_stinfosys().await.unwrap(); let (open_res, restricted_res) = tokio::join!( - tsupdate::set_from_to_obs_pgm( - &mut open_conn, - &obs_pgm_fromtime, - &obs_pgm_totime, - &station_fromtime, - &station_totime - ), - tsupdate::set_from_to_obs_pgm( - &mut restricted_conn, - &obs_pgm_fromtime, - &obs_pgm_totime, - &station_fromtime, - &station_totime - ), + tsupdate::set_from_to_obs_pgm(&mut open_conn, &obs_pgm_times, &station_times), + tsupdate::set_from_to_obs_pgm(&mut restricted_conn, &obs_pgm_times, &station_times), ); if let Err(err) = open_res { diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index 3b24b620..9d2221d0 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use chrono::{DateTime, NaiveDateTime, Utc}; +use chrono::NaiveDateTime; use futures::{stream::FuturesUnordered, StreamExt}; use tokio_postgres::{Client, NoTls}; use tracing::error; @@ -20,10 +20,8 @@ pub struct Stinfosys { levels: LevelTable, } -type StationTotimeMap = HashMap>; -type StationFromtimeMap = HashMap>; -type ObsPgmTotimeMap = HashMap>; -type ObsPgmFromtimeMap = HashMap>; +type StationFromTotimeMap = HashMap; +type ObsPgmFromTotimeMap = HashMap; impl Stinfosys { pub fn new(conn_string: String, levels: LevelTable) -> Self { @@ -37,10 +35,8 @@ impl Stinfosys { &self, ) -> Result< ( - HashMap>, - HashMap>, - HashMap>, - HashMap>, + HashMap, + HashMap, ), Error, > { @@ -53,27 +49,18 @@ impl Stinfosys { }); // Fetch all deactivated timeseries in Stinfosys - let (obs_pgm_fromtime, obs_pgm_totime, station_fromtime, station_totime) = tokio::try_join!( - fetch_obs_pgm_fromtime(self.levels.clone(), &client), - fetch_obs_pgm_totime(self.levels.clone(), &client), - fetch_station_fromtime(&client), - fetch_station_totime(&client), + let (obs_pgm_times, station_times) = tokio::try_join!( + fetch_obs_pgm_times(self.levels.clone(), &client), + fetch_station_times(&client), )?; - Ok(( - obs_pgm_fromtime, - obs_pgm_totime, - station_fromtime, - station_totime, - )) + Ok((obs_pgm_times, station_times)) } } pub async fn fetch_from_to_for_update( - obs_pgm_fromtime: &HashMap>, - obs_pgm_totime: &HashMap>, - station_fromtime: &HashMap>, - station_totime: &HashMap>, + obs_pgm_times: &HashMap, + station_times: &HashMap, ts_from_to: HashMap, labels: Vec, ) -> Result, Error> { @@ -83,15 +70,21 @@ pub async fn fetch_from_to_for_update( // check we have this key for the TS if ts_from_to.contains_key(&label.id) { // use obs_pgm if exists, or else station if exists, or else will be none - let fromtime = obs_pgm_fromtime - .get(&label.key) - .or(station_fromtime.get(&label.key.station_id)) - .copied(); - - let totime = obs_pgm_totime - .get(&label.key) - .or(station_totime.get(&label.key.station_id)) - .copied(); + let fromtime = match obs_pgm_times.get(&label.key) { + Some(pgm_fromto) => pgm_fromto.from, + None => match station_times.get(&label.key.station_id) { + Some(station_fromto) => station_fromto.from, + None => None, + }, + }; + + let totime = match obs_pgm_times.get(&label.key) { + Some(pgm_fromto) => pgm_fromto.to, + None => match station_times.get(&label.key.station_id) { + Some(station_fromto) => station_fromto.to, + None => None, + }, + }; if fromtime.is_none() && totime.is_none() { // no metadata, keep the ts from/to @@ -160,7 +153,10 @@ pub async fn fetch_from_to_for_update( Ok(ts_update) } -async fn fetch_obs_pgm_totime(levels: LevelTable, conn: &Client) -> Result { +async fn fetch_obs_pgm_times( + levels: LevelTable, + conn: &Client, +) -> Result { // The funny looking ARRAY_AGG is needed because each timeseries can have multiple from/to times. // Most likely related to the fact that stations in the `station` tables can also have // multiple entries, see [fetch_station_totime] @@ -174,6 +170,7 @@ async fn fetch_obs_pgm_totime(levels: LevelTable, conn: &Client) -> Result Result Result { - const OBS_PGM_QUERY: &str = "\ - SELECT \ - stationid, \ - paramid, \ - hlevel, \ - nsensor, \ - priority_messageid, \ - (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] \ - FROM obs_pgm \ - GROUP BY stationid, paramid, hlevel, nsensor, priority_messageid \ - HAVING (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] IS NOT NULL"; - - let rows = conn.query(OBS_PGM_QUERY, &[]).await?; - - let mut map = ObsPgmFromtimeMap::new(); + let mut map = ObsPgmFromTotimeMap::new(); for row in rows { let param_id: i32 = row.get(1); @@ -236,14 +193,21 @@ async fn fetch_obs_pgm_fromtime( type_id: row.get(4), }; - let totime: NaiveDateTime = row.get(5); - map.insert(key, totime.and_utc()); + let fromtime: NaiveDateTime = row.get(5); + let totime: NaiveDateTime = row.get(6); + map.insert( + key, + OpenTimerange { + from: Some(fromtime.and_utc()), + to: Some(totime.and_utc()), + }, + ); } Ok(map) } -async fn fetch_station_totime(conn: &Client) -> Result { +async fn fetch_station_times(conn: &Client) -> Result { // The funny looking ARRAY_AGG is needed because each station can have multiple from/to times. // For example, the timeseries might have been "reset" after a change of the station position, // even though the station ID did not change. @@ -253,6 +217,7 @@ async fn fetch_station_totime(conn: &Client) -> Result const STATION_QUERY: &str = "\ SELECT \ stationid, \ + MIN(fromtime), \ (ARRAY_AGG(totime ORDER BY totime DESC NULLS FIRST))[1] \ FROM station \ GROUP BY stationid \ @@ -263,30 +228,16 @@ async fn fetch_station_totime(conn: &Client) -> Result Ok(rows .iter() .map(|row| { - let totime: NaiveDateTime = row.get(1); - - (row.get(0), totime.and_utc()) - }) - .collect()) -} - -async fn fetch_station_fromtime(conn: &Client) -> Result { - const STATION_QUERY: &str = "\ - SELECT \ - stationid, \ - (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] \ - FROM station \ - GROUP BY stationid \ - HAVING (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] IS NOT NULL"; - - let rows = conn.query(STATION_QUERY, &[]).await?; - - Ok(rows - .iter() - .map(|row| { - let totime: NaiveDateTime = row.get(1); - - (row.get(0), totime.and_utc()) + let fromtime: NaiveDateTime = row.get(1); + let totime: NaiveDateTime = row.get(2); + + ( + row.get(0), + OpenTimerange { + from: Some(fromtime.and_utc()), + to: Some(totime.and_utc()), + }, + ) }) .collect()) } diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index e706590b..e206969f 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -60,10 +60,8 @@ impl TSupdateTimeseries { pub async fn set_from_to_obs_pgm( conn: &mut PooledPgConn<'_>, - obs_pgm_fromtime: &HashMap>, - obs_pgm_totime: &HashMap>, - station_fromtime: &HashMap>, - station_totime: &HashMap>, + obs_pgm_times: &HashMap, + station_times: &HashMap, ) -> Result<(), Error> { let tx = conn.transaction().await?; @@ -95,15 +93,8 @@ pub async fn set_from_to_obs_pgm( ts_from_to.insert(row.get(0), OpenTimerange::new(row.get(6), row.get(7))); }); - let deactivated = fetch_from_to_for_update( - obs_pgm_fromtime, - obs_pgm_totime, - station_fromtime, - station_totime, - ts_from_to, - labels, - ) - .await?; + let deactivated = + fetch_from_to_for_update(obs_pgm_times, station_times, ts_from_to, labels).await?; future::join_all(deactivated.into_iter().map(async |ts| { match tx diff --git a/integration_tests/tests/common/mocks.rs b/integration_tests/tests/common/mocks.rs index fbbfb0aa..b38b5d52 100644 --- a/integration_tests/tests/common/mocks.rs +++ b/integration_tests/tests/common/mocks.rs @@ -25,28 +25,23 @@ impl MetadataMock { &self, ) -> Result< ( - HashMap>, - HashMap>, - HashMap>, - HashMap>, + HashMap, + HashMap, ), lard_ingestion::Error, > { - let mut station_totime = HashMap::new(); - station_totime.insert(self.station, self.totime); - - let mut station_fromtime = HashMap::new(); - station_fromtime.insert(self.station, self.fromtime); - - let obs_pgm_totime: HashMap> = HashMap::new(); - let obs_pgm_fromtime: HashMap> = HashMap::new(); - - Ok(( - station_fromtime, - station_totime, - obs_pgm_fromtime, - obs_pgm_totime, - )) + let mut station_times = HashMap::new(); + station_times.insert( + self.station, + OpenTimerange { + from: Some(self.fromtime), + to: Some(self.totime), + }, + ); + + let obs_pgm_times: HashMap = HashMap::new(); + + Ok((obs_pgm_times, station_times)) } } diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 3ba11a0e..b966ae7e 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -192,18 +192,12 @@ async fn test_totime_update() { assert_eq!(totime, None); } - let (station_fromtime, station_totime, obs_pgm_fromtime, obs_pgm_totime) = + let (obs_pgm_times, station_times) = metadata_mock.cache_deactivated_stinfosys().await.unwrap(); - set_from_to_obs_pgm( - &mut conn, - &obs_pgm_fromtime, - &obs_pgm_totime, - &station_fromtime, - &station_totime, - ) - .await - .unwrap(); + set_from_to_obs_pgm(&mut conn, &obs_pgm_times, &station_times) + .await + .unwrap(); let after = get_totime(&conn).await; From a16bf60bd5e358e5f2cd162241712a0f5d20c6b2 Mon Sep 17 00:00:00 2001 From: Louise Carolyn Oram Date: Fri, 21 Nov 2025 12:56:15 +0100 Subject: [PATCH 17/25] better naming --- ingestion/src/cron.rs | 8 ++++---- ingestion/src/main.rs | 2 +- ingestion/src/util/stinfosys.rs | 16 ++++++++-------- integration_tests/tests/end_to_end.rs | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/ingestion/src/cron.rs b/ingestion/src/cron.rs index d16004c2..a9309977 100644 --- a/ingestion/src/cron.rs +++ b/ingestion/src/cron.rs @@ -32,18 +32,18 @@ pub async fn refresh_levels((stinfo_conn_string, level_table): &(String, LevelTa *tables = new_level_table; } -pub async fn refresh_deactivated((stinfosys, pools): &(Stinfosys, DbPools)) { +pub async fn refresh_from_to((stinfosys, pools): &(Stinfosys, DbPools)) { info!("Updating timeseries fromtime & totime"); // TODO: add retries instead of panicking? let mut open_conn = pools.open.get().await.unwrap(); let mut restricted_conn = pools.restricted.get().await.unwrap(); - let (obs_pgm_times, station_times) = stinfosys.cache_deactivated_stinfosys().await.unwrap(); + let (obs_pgm_times_map, station_times_map) = stinfosys.cache_closed_stinfosys().await.unwrap(); let (open_res, restricted_res) = tokio::join!( - tsupdate::set_from_to_obs_pgm(&mut open_conn, &obs_pgm_times, &station_times), - tsupdate::set_from_to_obs_pgm(&mut restricted_conn, &obs_pgm_times, &station_times), + tsupdate::set_from_to_obs_pgm(&mut open_conn, &obs_pgm_times_map, &station_times_map), + tsupdate::set_from_to_obs_pgm(&mut restricted_conn, &obs_pgm_times_map, &station_times_map), ); if let Err(err) = open_res { diff --git a/ingestion/src/main.rs b/ingestion/src/main.rs index 0a7b2096..b82466f2 100644 --- a/ingestion/src/main.rs +++ b/ingestion/src/main.rs @@ -81,7 +81,7 @@ async fn main() -> Result<(), Error> { Stinfosys::new(stinfo_conn_string, level_table.clone()), db_pools.clone(), ), - action: cron::refresh_deactivated, + action: cron::refresh_from_to, interval: tokio::time::interval(tokio::time::Duration::from_secs(6 * 3600)), } .run_forever(), diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index 9d2221d0..2a5aa7ca 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -31,7 +31,7 @@ impl Stinfosys { } } - pub async fn cache_deactivated_stinfosys( + pub async fn cache_closed_stinfosys( &self, ) -> Result< ( @@ -48,7 +48,7 @@ impl Stinfosys { } }); - // Fetch all deactivated timeseries in Stinfosys + // Fetch all closed timeseries in Stinfosys let (obs_pgm_times, station_times) = tokio::try_join!( fetch_obs_pgm_times(self.levels.clone(), &client), fetch_station_times(&client), @@ -59,8 +59,8 @@ impl Stinfosys { } pub async fn fetch_from_to_for_update( - obs_pgm_times: &HashMap, - station_times: &HashMap, + obs_pgm_times_map: &HashMap, + station_times_map: &HashMap, ts_from_to: HashMap, labels: Vec, ) -> Result, Error> { @@ -70,17 +70,17 @@ pub async fn fetch_from_to_for_update( // check we have this key for the TS if ts_from_to.contains_key(&label.id) { // use obs_pgm if exists, or else station if exists, or else will be none - let fromtime = match obs_pgm_times.get(&label.key) { + let fromtime = match obs_pgm_times_map.get(&label.key) { Some(pgm_fromto) => pgm_fromto.from, - None => match station_times.get(&label.key.station_id) { + None => match station_times_map.get(&label.key.station_id) { Some(station_fromto) => station_fromto.from, None => None, }, }; - let totime = match obs_pgm_times.get(&label.key) { + let totime = match obs_pgm_times_map.get(&label.key) { Some(pgm_fromto) => pgm_fromto.to, - None => match station_times.get(&label.key.station_id) { + None => match station_times_map.get(&label.key.station_id) { Some(station_fromto) => station_fromto.to, None => None, }, diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index b966ae7e..d346ac75 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -192,10 +192,10 @@ async fn test_totime_update() { assert_eq!(totime, None); } - let (obs_pgm_times, station_times) = + let (obs_pgm_times_map, station_times_map) = metadata_mock.cache_deactivated_stinfosys().await.unwrap(); - set_from_to_obs_pgm(&mut conn, &obs_pgm_times, &station_times) + set_from_to_obs_pgm(&mut conn, &obs_pgm_times_map, &station_times_map) .await .unwrap(); From 8433e2af95b466bb115b66b82a9f260c309e06ea Mon Sep 17 00:00:00 2001 From: Louise Carolyn Oram Date: Mon, 24 Nov 2025 10:16:39 +0100 Subject: [PATCH 18/25] take into account the max/min obstime of the underlying data for from/to update --- ingestion/src/lib.rs | 1 + ingestion/src/main.rs | 8 ++- ingestion/src/util/stinfosys.rs | 24 +++++--- ingestion/src/util/tsupdate.rs | 81 +++++++++++++++++-------- integration_tests/tests/common/mocks.rs | 2 +- integration_tests/tests/end_to_end.rs | 22 ++++--- 6 files changed, 90 insertions(+), 48 deletions(-) diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index 9f84a87f..f9e3aea4 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -66,6 +66,7 @@ pub const KAFKA_CHECKED_MESSAGES_RECEIVED: &str = "kafka_checked_messages_receiv pub const KAFKA_CHECKED_FAILURES: &str = "kafka_checked_failures"; pub const SCALAR_DATAPOINTS: &str = "scalar_datapoints"; pub const NONSCALAR_DATAPOINTS: &str = "nonscalar_datapoints"; +pub const FROM_TO_FUTURES_FAILURES: &str = "from_to_futures_failures"; /// Gets an environment variable, providing more details than calling std::env::var() directly. pub fn getenv(key: &str) -> Result { diff --git a/ingestion/src/main.rs b/ingestion/src/main.rs index b82466f2..4b61fd5d 100644 --- a/ingestion/src/main.rs +++ b/ingestion/src/main.rs @@ -10,9 +10,10 @@ use lard_ingestion::{ cron::{self}, get_conversions, getenv, legacy, util::{levels, permissions, stinfosys::Stinfosys}, - Error, HTTP_REQUESTS_DURATION_SECONDS, KAFKA_CHECKED_FAILURES, KAFKA_CHECKED_MESSAGES_RECEIVED, - KAFKA_RAW_FAILURES, KAFKA_RAW_MESSAGES_RECEIVED, KLDATA_FAILURES, KLDATA_MESSAGES_RECEIVED, - NONSCALAR_DATAPOINTS, QC_FAILURES, SCALAR_DATAPOINTS, + Error, FROM_TO_FUTURES_FAILURES, HTTP_REQUESTS_DURATION_SECONDS, KAFKA_CHECKED_FAILURES, + KAFKA_CHECKED_MESSAGES_RECEIVED, KAFKA_RAW_FAILURES, KAFKA_RAW_MESSAGES_RECEIVED, + KLDATA_FAILURES, KLDATA_MESSAGES_RECEIVED, NONSCALAR_DATAPOINTS, QC_FAILURES, + SCALAR_DATAPOINTS, }; use util::{Cron, DbPools}; @@ -114,6 +115,7 @@ async fn main() -> Result<(), Error> { let _ = metrics::counter!(KAFKA_CHECKED_FAILURES); let _ = metrics::counter!(SCALAR_DATAPOINTS); let _ = metrics::counter!(NONSCALAR_DATAPOINTS); + let _ = metrics::counter!(FROM_TO_FUTURES_FAILURES); // non kvalobs-dependent ingestion #[cfg(feature = "next")] diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index 2a5aa7ca..e448a347 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -1,9 +1,9 @@ use std::collections::HashMap; -use chrono::NaiveDateTime; +use chrono::{Duration, NaiveDateTime, Utc}; use futures::{stream::FuturesUnordered, StreamExt}; use tokio_postgres::{Client, NoTls}; -use tracing::error; +use tracing::{error, warn}; use crate::{ util::{ @@ -88,9 +88,16 @@ pub async fn fetch_from_to_for_update( if fromtime.is_none() && totime.is_none() { // no metadata, keep the ts from/to - let fromtime = ts_from_to.get(&label.id).unwrap().from; - let totime = ts_from_to.get(&label.id).unwrap().to; - Ok((label.id, fromtime, totime)) + let from_ts = ts_from_to.get(&label.id).unwrap().from; + let mut to_ts = ts_from_to.get(&label.id).unwrap().to; + // TODO: set to time to closed based on timeresolution of the timeseries + // for now have to leave it open unless very long time ago??? + let ten_years_duration = Duration::days(365 * 10); + let ten_years_ago = Utc::now() - ten_years_duration; + if to_ts > Some(ten_years_ago) { + to_ts = None; + } + Ok((label.id, from_ts, to_ts)) } else if ts_from_to .get(&label.id) .unwrap() @@ -108,11 +115,11 @@ pub async fn fetch_from_to_for_update( // |--timeseries--| // use the timeseries from/to so as not to cause "twisting" // (twisting = a to time before from time) - let fromtime = ts_from_to.get(&label.id).unwrap().from; - let _totime = ts_from_to.get(&label.id).unwrap().to; + let from_ts = ts_from_to.get(&label.id).unwrap().from; + let _to_ts = ts_from_to.get(&label.id).unwrap().to; // NOTE: we are choosing to essentially close off this timeseries, since we believe // it is mislabelled. Obs_pgm is essentially saying it should not exist. - Ok((label.id, fromtime, fromtime)) + Ok((label.id, from_ts, from_ts)) } else { // station had data and it overlaps in some way with obs_pgm or the station table // so we assume we should use the overlapp between the TS from/to and the @@ -130,6 +137,7 @@ pub async fn fetch_from_to_for_update( Ok((label.id, overlapp.from, overlapp.to)) } } else { + warn!("No from/to found for this timeseries {:?}", label.id); Ok((label.id, None, None)) // would this ever occur? TODO: log? } }) diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index e206969f..ed1a2c16 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -1,9 +1,10 @@ use chrono::{DateTime, Utc}; use futures::future; +use futures::stream::{FuturesUnordered, StreamExt}; use std::collections::HashMap; use tracing::error; -use crate::{util::stinfosys::fetch_from_to_for_update, Error}; +use crate::{util::stinfosys::fetch_from_to_for_update, Error, FROM_TO_FUTURES_FAILURES}; use lard_egress::patchwork::OpenTimerange; use util::{MetLabel, MetTimeseriesKey, PooledPgConn}; @@ -18,27 +19,32 @@ const OPEN_TIMESERIES_QUERY: &str = "\ met.param_id, \ met.type_id, \ met.lvl, \ - met.sensor, \ - timeseries.fromtime, \ - timeseries.totime \ + met.sensor \ FROM labels.met \ JOIN timeseries \ ON met.timeseries = timeseries.id \ WHERE met.param_id IS NOT NULL \ AND (timeseries.totime IS NULL \ OR timeseries.totime < timeseries.fromtime)"; -// TODO: should we also get the from/to from the underlying data? -// this would be an intensive call and maybe should not be done often? +// NOTE: the from to in the timeseries table need to be kept updated +// so we also need to check the from/to of the underlying data +const MAX_MIN_TIMESERIES_QUERY: &str = "SELECT timeseries, + MIN(obstime), \ + MAX(obstime) \ + FROM data \ + WHERE timeseries = $1 + GROUP BY timeseries"; // Deactivated is information for the database // for a timeseries it is enough that the fromtime is closed const UPDATE_QUERY: &str = "\ UPDATE public.timeseries SET \ - totime = $1, \ - fromtime = $2, \ + fromtime = $1, \ + totime = $2, \ deactivated = false \ WHERE id = $3"; +#[derive(Debug)] pub struct TSupdateTimeseries { /// Timeseries to be updated pub tsid: i64, @@ -58,21 +64,39 @@ impl TSupdateTimeseries { } } +async fn get_from_to_ts( + conn: &mut PooledPgConn<'_>, + labels: Vec, +) -> Result, Error> { + let mut ts_from_to: HashMap = HashMap::new(); + + let mut futures_ts_from_to = labels + .iter() + .map(async |label| conn.query_one(MAX_MIN_TIMESERIES_QUERY, &[&label.id]).await) + .collect::>() + .enumerate(); + + while let Some((i, res)) = futures_ts_from_to.next().await { + let row = match res { + Ok(val) => val, + Err(err) => { + // log these fails + metrics::counter!(FROM_TO_FUTURES_FAILURES).increment(1); + error!("max min for timeseries future failed: {}, {}", err, i); + continue; + } + }; + ts_from_to.insert(row.get(0), OpenTimerange::new(row.get(1), row.get(2))); + } + Ok(ts_from_to) +} + pub async fn set_from_to_obs_pgm( conn: &mut PooledPgConn<'_>, obs_pgm_times: &HashMap, station_times: &HashMap, ) -> Result<(), Error> { - let tx = conn.transaction().await?; - - // Explicitly take the lock so we can prevent concurrent access to the rows we are going to update - tx.execute( - "LOCK TABLE public.timeseries IN SHARE ROW EXCLUSIVE MODE", - &[], - ) - .await?; - - let rows = tx.query(OPEN_TIMESERIES_QUERY, &[]).await?; + let rows = conn.query(OPEN_TIMESERIES_QUERY, &[]).await?; let labels: Vec = rows .iter() @@ -88,17 +112,22 @@ pub async fn set_from_to_obs_pgm( }) .collect(); - let mut ts_from_to: HashMap = HashMap::new(); - rows.iter().for_each(|row| { - ts_from_to.insert(row.get(0), OpenTimerange::new(row.get(6), row.get(7))); - }); + let ts_from_to = get_from_to_ts(conn, labels.clone()).await?; + + let closed = fetch_from_to_for_update(obs_pgm_times, station_times, ts_from_to, labels).await?; + + let tx = conn.transaction().await?; - let deactivated = - fetch_from_to_for_update(obs_pgm_times, station_times, ts_from_to, labels).await?; + // Explicitly take the lock so we can prevent concurrent access to the rows we are going to update + tx.execute( + "LOCK TABLE public.timeseries IN SHARE ROW EXCLUSIVE MODE", + &[], + ) + .await?; - future::join_all(deactivated.into_iter().map(async |ts| { + future::join_all(closed.into_iter().map(async |ts| { match tx - .execute(UPDATE_QUERY, &[&ts.totime, &ts.fromtime, &ts.tsid]) + .execute(UPDATE_QUERY, &[&ts.fromtime, &ts.totime, &ts.tsid]) .await { Ok(_) => (), //info!("Tsid {} updated", ts.tsid), diff --git a/integration_tests/tests/common/mocks.rs b/integration_tests/tests/common/mocks.rs index b38b5d52..3024b734 100644 --- a/integration_tests/tests/common/mocks.rs +++ b/integration_tests/tests/common/mocks.rs @@ -21,7 +21,7 @@ pub struct MetadataMock { } impl MetadataMock { - pub async fn cache_deactivated_stinfosys( + pub async fn cache_closed_stinfosys( &self, ) -> Result< ( diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index d346ac75..5ff63f2c 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -143,14 +143,13 @@ async fn get_totime(conn: &PooledPgConn<'_>) -> Vec>> { async fn test_totime_update() { e2e_test_wrapper(async |db_pools| { let timeseries = vec![ - // Scalar and non-scalar TestData { station_id: 10001, - params: vec![Param::new("KLOBS"), Param::new("TA")], - start_time: Utc.with_ymd_and_hms(1980, 1, 1, 0, 0, 0).unwrap(), + params: vec![Param::new("TA"), Param::new("KLOBS")], + start_time: Utc.with_ymd_and_hms(1980, 12, 31, 12, 0, 0).unwrap(), period: Duration::hours(1), type_id: 503, - len: 12, + len: 14, // metadata should cut off the last part of this that goes into 1981 }, TestData { station_id: 20001, @@ -162,8 +161,8 @@ async fn test_totime_update() { }, ]; - let fromtime = Utc.with_ymd_and_hms(1981, 1, 1, 0, 0, 0).unwrap(); - let totime = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); + let fromtime = Utc.with_ymd_and_hms(1980, 12, 1, 0, 0, 0).unwrap(); + let totime: DateTime = Utc.with_ymd_and_hms(1981, 1, 1, 0, 0, 0).unwrap(); let metadata_mock = MetadataMock { station: 10001, @@ -171,12 +170,15 @@ async fn test_totime_update() { totime, }; + let totime_1950: DateTime = Utc.with_ymd_and_hms(1950, 1, 1, 11, 0, 0).unwrap(); + let expected = vec![ - // Both timeseries on station 10001 should be deactivated - Some(totime), + // timeseries on station 10001 should be deactivated Some(totime), - // timeseries on station 20001 is not + // no max/min obstime found for KLOBS data? None, + // timeseries on station 20001 is not, so keeps its own at the end of the data + Some(totime_1950), ]; for ts in timeseries { @@ -193,7 +195,7 @@ async fn test_totime_update() { } let (obs_pgm_times_map, station_times_map) = - metadata_mock.cache_deactivated_stinfosys().await.unwrap(); + metadata_mock.cache_closed_stinfosys().await.unwrap(); set_from_to_obs_pgm(&mut conn, &obs_pgm_times_map, &station_times_map) .await From 3f7011df2999d700daad543c3fc88af4d3503bf3 Mon Sep 17 00:00:00 2001 From: Louise Carolyn Oram Date: Mon, 24 Nov 2025 13:04:28 +0100 Subject: [PATCH 19/25] get the paramids that are scalar, for use to make sure use right sql for finding max/min obstime --- ingestion/src/lib.rs | 18 ++++++++++++++++++ ingestion/src/util/tsupdate.rs | 24 +++++++++++++++++++++--- integration_tests/tests/end_to_end.rs | 5 ++--- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index f9e3aea4..21b96cc2 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -501,6 +501,24 @@ pub fn get_conversions(filename: &str) -> Result { )) } +pub fn get_scalar_paramids(filename: &str) -> Result, Error> { + Ok(csv::Reader::from_path(filename) + .unwrap() + .into_records() + .filter_map(|record_result| match record_result { + Ok(record) => { + if record.get(3).unwrap() == "f" { + let paramid = record.get(0).unwrap().parse::().unwrap(); + Some(paramid) + } else { + None + } + } + Err(_) => None, + }) + .collect()) +} + /// Middleware function that runs around a request, so we can record how long it took async fn track_request_duration(req: Request, next: Next) -> impl IntoResponse { let start = std::time::Instant::now(); diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index ed1a2c16..95d1e25a 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -4,7 +4,9 @@ use futures::stream::{FuturesUnordered, StreamExt}; use std::collections::HashMap; use tracing::error; -use crate::{util::stinfosys::fetch_from_to_for_update, Error, FROM_TO_FUTURES_FAILURES}; +use crate::{ + get_scalar_paramids, util::stinfosys::fetch_from_to_for_update, Error, FROM_TO_FUTURES_FAILURES, +}; use lard_egress::patchwork::OpenTimerange; use util::{MetLabel, MetTimeseriesKey, PooledPgConn}; @@ -28,12 +30,18 @@ const OPEN_TIMESERIES_QUERY: &str = "\ OR timeseries.totime < timeseries.fromtime)"; // NOTE: the from to in the timeseries table need to be kept updated // so we also need to check the from/to of the underlying data -const MAX_MIN_TIMESERIES_QUERY: &str = "SELECT timeseries, +const MAX_MIN_TIMESERIES_DATA_QUERY: &str = "SELECT timeseries, MIN(obstime), \ MAX(obstime) \ FROM data \ WHERE timeseries = $1 GROUP BY timeseries"; +const MAX_MIN_TIMESERIES_NONSCALAR_DATA_QUERY: &str = "SELECT timeseries, + MIN(obstime), \ + MAX(obstime) \ + FROM nonscalar_data \ + WHERE timeseries = $1 + GROUP BY timeseries"; // Deactivated is information for the database // for a timeseries it is enough that the fromtime is closed @@ -69,10 +77,20 @@ async fn get_from_to_ts( labels: Vec, ) -> Result, Error> { let mut ts_from_to: HashMap = HashMap::new(); + let scalar_list = get_scalar_paramids("../resources/paramconversions.csv").unwrap(); let mut futures_ts_from_to = labels .iter() - .map(async |label| conn.query_one(MAX_MIN_TIMESERIES_QUERY, &[&label.id]).await) + .map(async |label| { + if scalar_list.contains(&label.key.param_id) { + // nonscalar + conn.query_one(MAX_MIN_TIMESERIES_NONSCALAR_DATA_QUERY, &[&label.id]) + .await + } else { + conn.query_one(MAX_MIN_TIMESERIES_DATA_QUERY, &[&label.id]) + .await + } + }) .collect::>() .enumerate(); diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 5ff63f2c..cf6eb96d 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -145,7 +145,7 @@ async fn test_totime_update() { let timeseries = vec![ TestData { station_id: 10001, - params: vec![Param::new("TA"), Param::new("KLOBS")], + params: vec![Param::new("KLOBS"), Param::new("TA")], start_time: Utc.with_ymd_and_hms(1980, 12, 31, 12, 0, 0).unwrap(), period: Duration::hours(1), type_id: 503, @@ -175,8 +175,7 @@ async fn test_totime_update() { let expected = vec![ // timeseries on station 10001 should be deactivated Some(totime), - // no max/min obstime found for KLOBS data? - None, + Some(totime), // timeseries on station 20001 is not, so keeps its own at the end of the data Some(totime_1950), ]; From 5b7e1a50e8e84dffd7d89955a09e6cfe37319776 Mon Sep 17 00:00:00 2001 From: Louise Carolyn Oram Date: Mon, 1 Dec 2025 14:05:41 +0100 Subject: [PATCH 20/25] ingrids code suggestion, and related cleanup --- ingestion/src/util/stinfosys.rs | 150 ++++++++------------------ ingestion/src/util/tsupdate.rs | 12 +-- integration_tests/tests/end_to_end.rs | 46 +++++--- 3 files changed, 79 insertions(+), 129 deletions(-) diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index e448a347..eff0397e 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -1,15 +1,11 @@ use std::collections::HashMap; -use chrono::{Duration, NaiveDateTime, Utc}; -use futures::{stream::FuturesUnordered, StreamExt}; +use chrono::NaiveDateTime; use tokio_postgres::{Client, NoTls}; -use tracing::{error, warn}; +use tracing::error; use crate::{ - util::{ - levels::{param_get_level, LevelTable}, - tsupdate::TSupdateTimeseries, - }, + util::levels::{param_get_level, LevelTable}, Error, }; use lard_egress::patchwork::OpenTimerange; @@ -58,107 +54,51 @@ impl Stinfosys { } } -pub async fn fetch_from_to_for_update( - obs_pgm_times_map: &HashMap, - station_times_map: &HashMap, - ts_from_to: HashMap, +pub fn calc_from_tos( + obs_pgm_ranges: &HashMap, + station_ranges: &HashMap, + data_ranges: HashMap, labels: Vec, -) -> Result, Error> { - let mut futures = labels +) -> Vec<(i64, OpenTimerange)> { + labels .iter() - .map(async |label| -> Result<_, Error> { - // check we have this key for the TS - if ts_from_to.contains_key(&label.id) { - // use obs_pgm if exists, or else station if exists, or else will be none - let fromtime = match obs_pgm_times_map.get(&label.key) { - Some(pgm_fromto) => pgm_fromto.from, - None => match station_times_map.get(&label.key.station_id) { - Some(station_fromto) => station_fromto.from, - None => None, - }, - }; - - let totime = match obs_pgm_times_map.get(&label.key) { - Some(pgm_fromto) => pgm_fromto.to, - None => match station_times_map.get(&label.key.station_id) { - Some(station_fromto) => station_fromto.to, - None => None, - }, - }; - - if fromtime.is_none() && totime.is_none() { - // no metadata, keep the ts from/to - let from_ts = ts_from_to.get(&label.id).unwrap().from; - let mut to_ts = ts_from_to.get(&label.id).unwrap().to; - // TODO: set to time to closed based on timeresolution of the timeseries - // for now have to leave it open unless very long time ago??? - let ten_years_duration = Duration::days(365 * 10); - let ten_years_ago = Utc::now() - ten_years_duration; - if to_ts > Some(ten_years_ago) { - to_ts = None; - } - Ok((label.id, from_ts, to_ts)) - } else if ts_from_to - .get(&label.id) - .unwrap() - .overlap(OpenTimerange { - from: fromtime, - to: totime, - }) - .is_none() - { - // check if the fromtime of the timeseries is before the totime from obspgm - // |------obs_pgm------| - // |--timeseries--| - // or if the totime of the timeseries is before the fromtime from obspgm - // |------obs_pgm------| - // |--timeseries--| - // use the timeseries from/to so as not to cause "twisting" - // (twisting = a to time before from time) - let from_ts = ts_from_to.get(&label.id).unwrap().from; - let _to_ts = ts_from_to.get(&label.id).unwrap().to; - // NOTE: we are choosing to essentially close off this timeseries, since we believe - // it is mislabelled. Obs_pgm is essentially saying it should not exist. - Ok((label.id, from_ts, from_ts)) - } else { - // station had data and it overlaps in some way with obs_pgm or the station table - // so we assume we should use the overlapp between the TS from/to and the - // obs_pgm or station deactivation times... - // |------obs_pgm / station------| - // |---timeseries---| - let overlapp = ts_from_to - .get(&label.id) - .unwrap() - .overlap(OpenTimerange { - from: fromtime, - to: totime, - }) - .unwrap(); - Ok((label.id, overlapp.from, overlapp.to)) - } - } else { - warn!("No from/to found for this timeseries {:?}", label.id); - Ok((label.id, None, None)) // would this ever occur? TODO: log? - } - }) - .collect::>(); - - let mut ts_update = vec![]; - while let Some(res) = futures.next().await { - let ts = match res? { - (tsid, Some(fromtime), Some(totime)) => TSupdateTimeseries { - tsid, - fromtime, - totime, - }, - // Skip if a valid totime was not found in stinfosys - _ => continue, - }; - - ts_update.push(ts); - } + .filter_map(|label| { + // Prefer obs_pgm if available, and only use station if no obs_pgm info exists + let stinfo_range = *obs_pgm_ranges + .get(&label.key) + .or(station_ranges.get(&label.key.station_id)) + .unwrap_or(&OpenTimerange { + from: None, + to: None, + }); + // we `?` this one because if it's None, the ts doesn't exist and we can't update it + let data = *data_ranges.get(&label.id)?; + + let overlap = stinfo_range.overlap(data); + + // if the metadata for the timeseries has a to_time, we shouldn't close the ts because it might still + // receive new data + let should_be_closed = stinfo_range.to.is_some(); + + let out = match (overlap, should_be_closed) { + (Some(overlap), true) => overlap, + (Some(overlap), false) => OpenTimerange { + from: overlap.from, + to: None, + }, + (None, true) => OpenTimerange { + from: data.to, + to: data.to, + }, + (None, false) => OpenTimerange { + from: stinfo_range.from.max(stinfo_range.from).max(data.from), + to: None, + }, + }; - Ok(ts_update) + Some((label.id, out)) + }) + .collect() } async fn fetch_obs_pgm_times( diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index 95d1e25a..99a9656e 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -4,9 +4,7 @@ use futures::stream::{FuturesUnordered, StreamExt}; use std::collections::HashMap; use tracing::error; -use crate::{ - get_scalar_paramids, util::stinfosys::fetch_from_to_for_update, Error, FROM_TO_FUTURES_FAILURES, -}; +use crate::{get_scalar_paramids, util::stinfosys::calc_from_tos, Error, FROM_TO_FUTURES_FAILURES}; use lard_egress::patchwork::OpenTimerange; use util::{MetLabel, MetTimeseriesKey, PooledPgConn}; @@ -132,7 +130,7 @@ pub async fn set_from_to_obs_pgm( let ts_from_to = get_from_to_ts(conn, labels.clone()).await?; - let closed = fetch_from_to_for_update(obs_pgm_times, station_times, ts_from_to, labels).await?; + let closed = calc_from_tos(obs_pgm_times, station_times, ts_from_to, labels); let tx = conn.transaction().await?; @@ -143,13 +141,13 @@ pub async fn set_from_to_obs_pgm( ) .await?; - future::join_all(closed.into_iter().map(async |ts| { + future::join_all(closed.into_iter().map(async |(tsid, timerange)| { match tx - .execute(UPDATE_QUERY, &[&ts.fromtime, &ts.totime, &ts.tsid]) + .execute(UPDATE_QUERY, &[&timerange.from, &timerange.to, &tsid]) .await { Ok(_) => (), //info!("Tsid {} updated", ts.tsid), - Err(err) => error!("Could not update tsid {}: {}", ts.tsid, err), + Err(err) => error!("Could not update tsid {}: {}", tsid, err), } })) .await; diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index cf6eb96d..82f8505b 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -124,9 +124,11 @@ async fn test_stations_endpoint_regular() { } // TODO: we should implement an availability endpoint? -async fn get_totime(conn: &PooledPgConn<'_>) -> Vec>> { +async fn get_fromtotime( + conn: &PooledPgConn<'_>, +) -> Vec<(Option>, Option>)> { conn.query( - "SELECT timeseries.totime FROM timeseries \ + "SELECT timeseries.fromtime, timeseries.totime FROM timeseries \ JOIN labels.met \ ON timeseries.id = met.timeseries \ ORDER BY station_id", @@ -135,12 +137,12 @@ async fn get_totime(conn: &PooledPgConn<'_>) -> Vec>> { .await .unwrap() .iter() - .map(|row| row.get(0)) + .map(|row| (row.get(0), row.get(1))) .collect() } #[tokio::test] -async fn test_totime_update() { +async fn test_fromtotime_update() { e2e_test_wrapper(async |db_pools| { let timeseries = vec![ TestData { @@ -170,14 +172,21 @@ async fn test_totime_update() { totime, }; - let totime_1950: DateTime = Utc.with_ymd_and_hms(1950, 1, 1, 11, 0, 0).unwrap(); - let expected = vec![ - // timeseries on station 10001 should be deactivated - Some(totime), - Some(totime), - // timeseries on station 20001 is not, so keeps its own at the end of the data - Some(totime_1950), + // timeseries on station 10001 should be closed based on metadata + ( + Some(Utc.with_ymd_and_hms(1980, 12, 31, 12, 0, 0).unwrap()), + Some(totime), + ), + ( + Some(Utc.with_ymd_and_hms(1980, 12, 31, 12, 0, 0).unwrap()), + Some(totime), + ), + // timeseries on station 20001 is not, so it is left open + ( + Some(Utc.with_ymd_and_hms(1950, 1, 1, 0, 0, 0).unwrap()), + None, + ), ]; for ts in timeseries { @@ -189,8 +198,8 @@ async fn test_totime_update() { let mut conn = db_pools.open.get().await.unwrap(); // totimes should be empty - for totime in get_totime(&conn).await { - assert_eq!(totime, None); + for fromtotimes in get_fromtotime(&conn).await { + assert_eq!(fromtotimes.1, None); // to time } let (obs_pgm_times_map, station_times_map) = @@ -200,11 +209,14 @@ async fn test_totime_update() { .await .unwrap(); - let after = get_totime(&conn).await; + let after = get_fromtotime(&conn).await; + println!("After {:?} timeseries from/to", after); - // Now the totime for station 10001 should be set - for (totime, end_time) in after.into_iter().zip(expected) { - assert_eq!(totime, end_time); + // Now the totime for station 10001 should be set (and the to time for station 20001 should be its first observation time) + for (db, expect) in after.into_iter().zip(expected) { + println!("db {:?} expect {:?}", db, expect); + assert_eq!(db.0, expect.0); + assert_eq!(db.1, expect.1); } }) .await From e95214080fb115cbc7b317092467f7b77a8ba018 Mon Sep 17 00:00:00 2001 From: Louise Carolyn Oram Date: Mon, 1 Dec 2025 14:38:59 +0100 Subject: [PATCH 21/25] test on legacy.data, not data and nonscalar_data --- ingestion/src/util/tsupdate.rs | 22 +++-- integration_tests/tests/end_to_end.rs | 129 +++++++++++++------------- 2 files changed, 78 insertions(+), 73 deletions(-) diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index 99a9656e..c65e0828 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -23,18 +23,23 @@ const OPEN_TIMESERIES_QUERY: &str = "\ FROM labels.met \ JOIN timeseries \ ON met.timeseries = timeseries.id \ - WHERE met.param_id IS NOT NULL \ - AND (timeseries.totime IS NULL \ - OR timeseries.totime < timeseries.fromtime)"; + WHERE met.param_id IS NOT NULL"; // NOTE: the from to in the timeseries table need to be kept updated // so we also need to check the from/to of the underlying data -const MAX_MIN_TIMESERIES_DATA_QUERY: &str = "SELECT timeseries, +const MAX_MIN_TIMESERIES_LEGACY_DATA_QUERY: &str = "SELECT timeseries, + MIN(obstime), \ + MAX(obstime) \ + FROM legacy.data \ + WHERE timeseries = $1 + GROUP BY timeseries"; +// For now data is in legacy data... +const _MAX_MIN_TIMESERIES_DATA_QUERY: &str = "SELECT timeseries, MIN(obstime), \ MAX(obstime) \ FROM data \ WHERE timeseries = $1 GROUP BY timeseries"; -const MAX_MIN_TIMESERIES_NONSCALAR_DATA_QUERY: &str = "SELECT timeseries, +const _MAX_MIN_TIMESERIES_NONSCALAR_DATA_QUERY: &str = "SELECT timeseries, MIN(obstime), \ MAX(obstime) \ FROM nonscalar_data \ @@ -75,11 +80,15 @@ async fn get_from_to_ts( labels: Vec, ) -> Result, Error> { let mut ts_from_to: HashMap = HashMap::new(); - let scalar_list = get_scalar_paramids("../resources/paramconversions.csv").unwrap(); + let _scalar_list = get_scalar_paramids("../resources/paramconversions.csv").unwrap(); let mut futures_ts_from_to = labels .iter() .map(async |label| { + // for now we only have data in legacy.data + conn.query_one(MAX_MIN_TIMESERIES_LEGACY_DATA_QUERY, &[&label.id]) + .await + /* if scalar_list.contains(&label.key.param_id) { // nonscalar conn.query_one(MAX_MIN_TIMESERIES_NONSCALAR_DATA_QUERY, &[&label.id]) @@ -88,6 +97,7 @@ async fn get_from_to_ts( conn.query_one(MAX_MIN_TIMESERIES_DATA_QUERY, &[&label.id]) .await } + */ }) .collect::>() .enumerate(); diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 82f8505b..76d9f68f 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -1,13 +1,17 @@ use bb8_postgres::PostgresConnectionManager; use chrono::{DateTime, Duration, DurationRound, TimeDelta, TimeZone, Utc}; use chronoutil::RelativeDuration; +use rdkafka::producer::FutureProducer; use rove::data_switch::{DataConnector, SpaceSpec, TimeSpec, Timestamp}; use tokio_postgres::NoTls; +use util::DbPools; +use lard_egress::patchwork::PatchworkTables; use lard_egress::{timeseries::Timeseries, LatestResp, TimeseriesResp, TimesliceResp}; use lard_ingestion::{util::tsupdate::set_from_to_obs_pgm, KldataResp}; pub mod common; +use crate::common::legacy::{e2e_test_wrapper_legacy, ingest_raw, IngestData}; use common::{e2e_test_wrapper, mocks::MetadataMock, Param, TestData}; use util::PooledPgConn; @@ -143,82 +147,73 @@ async fn get_fromtotime( #[tokio::test] async fn test_fromtotime_update() { - e2e_test_wrapper(async |db_pools| { - let timeseries = vec![ - TestData { - station_id: 10001, - params: vec![Param::new("KLOBS"), Param::new("TA")], - start_time: Utc.with_ymd_and_hms(1980, 12, 31, 12, 0, 0).unwrap(), - period: Duration::hours(1), - type_id: 503, - len: 14, // metadata should cut off the last part of this that goes into 1981 - }, - TestData { - station_id: 20001, - params: vec![Param::new("TA")], - start_time: Utc.with_ymd_and_hms(1950, 1, 1, 0, 0, 0).unwrap(), - period: Duration::hours(1), - type_id: 501, - len: 12, - }, - ]; - - let fromtime = Utc.with_ymd_and_hms(1980, 12, 1, 0, 0, 0).unwrap(); - let totime: DateTime = Utc.with_ymd_and_hms(1981, 1, 1, 0, 0, 0).unwrap(); + e2e_test_wrapper_legacy( + async |producer: FutureProducer, db_pools: DbPools, tables: PatchworkTables| { + let timeseries = IngestData::new(vec![ + TestData { + station_id: 10001, + params: vec![Param::new("TA")], // Param::new("KLOBS"), NOTE: this will not work while using legacy.data only + start_time: Utc.with_ymd_and_hms(1980, 12, 31, 12, 0, 0).unwrap(), + period: Duration::hours(1), + type_id: 503, + len: 14, // metadata should cut off the last part of this that goes into 1981 + }, + TestData { + station_id: 20001, + params: vec![Param::new("TA")], + start_time: Utc.with_ymd_and_hms(1950, 1, 1, 0, 0, 0).unwrap(), + period: Duration::hours(1), + type_id: 501, + len: 12, + }, + ]); + ingest_raw(×eries, producer, db_pools.clone(), tables).await; - let metadata_mock = MetadataMock { - station: 10001, - fromtime, - totime, - }; + let fromtime = Utc.with_ymd_and_hms(1980, 12, 1, 0, 0, 0).unwrap(); + let totime: DateTime = Utc.with_ymd_and_hms(1981, 1, 1, 0, 0, 0).unwrap(); - let expected = vec![ - // timeseries on station 10001 should be closed based on metadata - ( - Some(Utc.with_ymd_and_hms(1980, 12, 31, 12, 0, 0).unwrap()), - Some(totime), - ), - ( - Some(Utc.with_ymd_and_hms(1980, 12, 31, 12, 0, 0).unwrap()), - Some(totime), - ), - // timeseries on station 20001 is not, so it is left open - ( - Some(Utc.with_ymd_and_hms(1950, 1, 1, 0, 0, 0).unwrap()), - None, - ), - ]; + let metadata_mock = MetadataMock { + station: 10001, + fromtime, + totime, + }; - for ts in timeseries { - let client = reqwest::Client::new(); - let ingestor_resp = ingest_data(&client, ts.obsinn_zeros()).await; - assert_eq!(ingestor_resp.res, 0); - } + let expected = vec![ + // timeseries on station 10001 should be closed based on metadata + ( + Some(Utc.with_ymd_and_hms(1980, 12, 31, 12, 0, 0).unwrap()), + Some(totime), + ), + // timeseries on station 20001 is not, so it is left open + ( + Some(Utc.with_ymd_and_hms(1950, 1, 1, 0, 0, 0).unwrap()), + None, + ), + ]; - let mut conn = db_pools.open.get().await.unwrap(); + let mut conn = db_pools.open.get().await.unwrap(); - // totimes should be empty - for fromtotimes in get_fromtotime(&conn).await { - assert_eq!(fromtotimes.1, None); // to time - } + // totimes should be empty + for fromtotimes in get_fromtotime(&conn).await { + assert_eq!(fromtotimes.1, None); // to time + } - let (obs_pgm_times_map, station_times_map) = - metadata_mock.cache_closed_stinfosys().await.unwrap(); + let (obs_pgm_times_map, station_times_map) = + metadata_mock.cache_closed_stinfosys().await.unwrap(); - set_from_to_obs_pgm(&mut conn, &obs_pgm_times_map, &station_times_map) - .await - .unwrap(); + set_from_to_obs_pgm(&mut conn, &obs_pgm_times_map, &station_times_map) + .await + .unwrap(); - let after = get_fromtotime(&conn).await; - println!("After {:?} timeseries from/to", after); + let after = get_fromtotime(&conn).await; - // Now the totime for station 10001 should be set (and the to time for station 20001 should be its first observation time) - for (db, expect) in after.into_iter().zip(expected) { - println!("db {:?} expect {:?}", db, expect); - assert_eq!(db.0, expect.0); - assert_eq!(db.1, expect.1); - } - }) + // Now the totime for station 10001 should be set (and the to time for station 20001 should be its first observation time) + for (db, expect) in after.into_iter().zip(expected) { + assert_eq!(db.0, expect.0); + assert_eq!(db.1, expect.1); + } + }, + ) .await } From a1ce38b35c78d88a4bcb2bd5ebe0f79a56d80b43 Mon Sep 17 00:00:00 2001 From: Louise Carolyn Oram Date: Tue, 2 Dec 2025 10:42:19 +0100 Subject: [PATCH 22/25] move timerange to util, refactor parsing of times from stinfosys for station and obs_pgm --- Cargo.lock | 1 - egress/src/patchwork.rs | 67 +------------------------ ingestion/Cargo.toml | 1 - ingestion/src/cron.rs | 4 +- ingestion/src/util/stinfosys.rs | 19 ++++--- ingestion/src/util/tsupdate.rs | 26 +--------- integration_tests/tests/common/mocks.rs | 4 +- integration_tests/tests/end_to_end.rs | 4 +- util/src/lib.rs | 66 ++++++++++++++++++++++++ 9 files changed, 84 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6c06f27c..9f787313 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1868,7 +1868,6 @@ dependencies = [ "chronoutil", "csv", "futures", - "lard_egress", "metrics", "metrics-exporter-prometheus", "quick-xml", diff --git a/egress/src/patchwork.rs b/egress/src/patchwork.rs index f429413f..c07ca8ee 100644 --- a/egress/src/patchwork.rs +++ b/egress/src/patchwork.rs @@ -21,7 +21,7 @@ use std::{ }; use tokio_postgres::{Client, NoTls}; use tracing::{error, warn}; -use util::{DbPools, MetLabel, PooledPgConn}; +use util::{ClosedTimerange, DbPools, MetLabel, OpenTimerange, PooledPgConn}; pub const PATCHWORK_FUTURES_FAILURES: &str = "patchwork_futures_failures"; @@ -155,71 +155,6 @@ pub struct PatchworkDatum { quality_code: Option, } -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)] -pub struct ClosedTimerange { - pub from: DateTime, - pub to: DateTime, -} - -impl ClosedTimerange { - pub fn new(from: DateTime, to: DateTime) -> Self { - ClosedTimerange { from, to } - } - - pub fn overlap(&self, other: Self) -> Option { - let from = self.from.max(other.from); - let to = self.to.min(other.to); - - // If they overlap return the new timerange - (from < to).then_some(ClosedTimerange { from, to }) - } -} - -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)] -pub struct OpenTimerange { - pub from: Option>, - pub to: Option>, -} - -impl OpenTimerange { - pub fn new(from: Option>, to: Option>) -> Self { - OpenTimerange { from, to } - } - - /// Used to cut the priorities to cover ranges that actually matter to a particular timeseries - /// Takes the from and to times of the timeseries as well as the from and to of the priority range - /// Returns an option, since it could be they do not overlapp at all (and thus it returns empty) - pub fn overlap(&self, other: Self) -> Option { - let fromtime = match (self.from, other.from) { - (Some(lhs), Some(rhs)) => Some(lhs.max(rhs)), // return the later one - (Some(lhs), None) => Some(lhs), - (None, Some(rhs)) => Some(rhs), - (None, None) => None, - }; - let totime = match (self.to, other.to) { - (Some(lhs), Some(rhs)) => Some(lhs.min(rhs)), // return the earlier one - (Some(lhs), None) => Some(lhs), - (None, Some(rhs)) => Some(rhs), - (None, None) => None, - }; - - match (fromtime, totime) { - // If both ends are closed and the ranges overlap return the new timerange - (Some(from), Some(to)) => { - if from >= to { - None - } else { - Some(OpenTimerange { - from: Some(from), - to: Some(to), - }) - } - } - (from, to) => Some(OpenTimerange { from, to }), - } - } -} - #[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] pub struct Fill { // TODO: I'm pretty sure this should never be NULL? In case we can put an Option diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml index 8b384960..2bc77a30 100644 --- a/ingestion/Cargo.toml +++ b/ingestion/Cargo.toml @@ -17,7 +17,6 @@ bytes.workspace = true chrono.workspace = true chronoutil.workspace = true util = { path = "../util" } -lard_egress = { path = "../egress" } csv.workspace = true futures.workspace = true metrics.workspace = true diff --git a/ingestion/src/cron.rs b/ingestion/src/cron.rs index a9309977..aa8b1672 100644 --- a/ingestion/src/cron.rs +++ b/ingestion/src/cron.rs @@ -42,8 +42,8 @@ pub async fn refresh_from_to((stinfosys, pools): &(Stinfosys, DbPools)) { let (obs_pgm_times_map, station_times_map) = stinfosys.cache_closed_stinfosys().await.unwrap(); let (open_res, restricted_res) = tokio::join!( - tsupdate::set_from_to_obs_pgm(&mut open_conn, &obs_pgm_times_map, &station_times_map), - tsupdate::set_from_to_obs_pgm(&mut restricted_conn, &obs_pgm_times_map, &station_times_map), + tsupdate::update_from_to(&mut open_conn, &obs_pgm_times_map, &station_times_map), + tsupdate::update_from_to(&mut restricted_conn, &obs_pgm_times_map, &station_times_map), ); if let Err(err) = open_res { diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index eff0397e..fff2d2c2 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -8,8 +8,7 @@ use crate::{ util::levels::{param_get_level, LevelTable}, Error, }; -use lard_egress::patchwork::OpenTimerange; -use util::{MetLabel, MetTimeseriesKey}; +use util::{MetLabel, MetTimeseriesKey, OpenTimerange}; pub struct Stinfosys { conn_string: String, @@ -141,13 +140,13 @@ async fn fetch_obs_pgm_times( type_id: row.get(4), }; - let fromtime: NaiveDateTime = row.get(5); - let totime: NaiveDateTime = row.get(6); + let fromtime: Option = row.get(5); + let totime: Option = row.get(6); map.insert( key, OpenTimerange { - from: Some(fromtime.and_utc()), - to: Some(totime.and_utc()), + from: fromtime.map(|t| t.and_utc()), + to: totime.map(|t| t.and_utc()), }, ); } @@ -176,14 +175,14 @@ async fn fetch_station_times(conn: &Client) -> Result = row.get(1); + let totime: Option = row.get(2); ( row.get(0), OpenTimerange { - from: Some(fromtime.and_utc()), - to: Some(totime.and_utc()), + from: fromtime.map(|t| t.and_utc()), + to: totime.map(|t| t.and_utc()), }, ) }) diff --git a/ingestion/src/util/tsupdate.rs b/ingestion/src/util/tsupdate.rs index c65e0828..64eaa61b 100644 --- a/ingestion/src/util/tsupdate.rs +++ b/ingestion/src/util/tsupdate.rs @@ -1,12 +1,10 @@ -use chrono::{DateTime, Utc}; use futures::future; use futures::stream::{FuturesUnordered, StreamExt}; use std::collections::HashMap; use tracing::error; use crate::{get_scalar_paramids, util::stinfosys::calc_from_tos, Error, FROM_TO_FUTURES_FAILURES}; -use lard_egress::patchwork::OpenTimerange; -use util::{MetLabel, MetTimeseriesKey, PooledPgConn}; +use util::{MetLabel, MetTimeseriesKey, OpenTimerange, PooledPgConn}; // TODO: remove the WHERE when we remove/prevent NULL param IDs in the table // NOTE: In addition to finding open timeseries, we also find the timeseries @@ -55,26 +53,6 @@ const UPDATE_QUERY: &str = "\ deactivated = false \ WHERE id = $3"; -#[derive(Debug)] -pub struct TSupdateTimeseries { - /// Timeseries to be updated - pub tsid: i64, - /// Fromtime value found in the metadata source - pub fromtime: DateTime, - /// Totime value found in the metadata source - pub totime: DateTime, -} - -impl TSupdateTimeseries { - pub fn new(tsid: i64, fromtime: DateTime, totime: DateTime) -> TSupdateTimeseries { - TSupdateTimeseries { - tsid, - fromtime, - totime, - } - } -} - async fn get_from_to_ts( conn: &mut PooledPgConn<'_>, labels: Vec, @@ -117,7 +95,7 @@ async fn get_from_to_ts( Ok(ts_from_to) } -pub async fn set_from_to_obs_pgm( +pub async fn update_from_to( conn: &mut PooledPgConn<'_>, obs_pgm_times: &HashMap, station_times: &HashMap, diff --git a/integration_tests/tests/common/mocks.rs b/integration_tests/tests/common/mocks.rs index 3024b734..317e8a02 100644 --- a/integration_tests/tests/common/mocks.rs +++ b/integration_tests/tests/common/mocks.rs @@ -4,11 +4,11 @@ use std::{ collections::HashMap, sync::{Arc, RwLock}, }; -use util::MetTimeseriesKey; +use util::{MetTimeseriesKey, OpenTimerange}; use chrono::{Duration, TimeZone}; -use lard_egress::patchwork::{MessagePriority, MessagePriorityDefaultTable, OpenTimerange}; +use lard_egress::patchwork::{MessagePriority, MessagePriorityDefaultTable}; use lard_ingestion::util::{ levels::{self, Level, LevelTable}, permissions::{ParamPermit, ParamPermitTable, StationPermitTable}, diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 76d9f68f..3e9956f5 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -8,7 +8,7 @@ use util::DbPools; use lard_egress::patchwork::PatchworkTables; use lard_egress::{timeseries::Timeseries, LatestResp, TimeseriesResp, TimesliceResp}; -use lard_ingestion::{util::tsupdate::set_from_to_obs_pgm, KldataResp}; +use lard_ingestion::{util::tsupdate::update_from_to, KldataResp}; pub mod common; use crate::common::legacy::{e2e_test_wrapper_legacy, ingest_raw, IngestData}; @@ -201,7 +201,7 @@ async fn test_fromtotime_update() { let (obs_pgm_times_map, station_times_map) = metadata_mock.cache_closed_stinfosys().await.unwrap(); - set_from_to_obs_pgm(&mut conn, &obs_pgm_times_map, &station_times_map) + update_from_to(&mut conn, &obs_pgm_times_map, &station_times_map) .await .unwrap(); diff --git a/util/src/lib.rs b/util/src/lib.rs index e7240bc1..fa1e8ecb 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -1,5 +1,6 @@ use bb8::PooledConnection; use bb8_postgres::PostgresConnectionManager; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tokio::signal; @@ -67,6 +68,71 @@ impl MetLabel { } } +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ClosedTimerange { + pub from: DateTime, + pub to: DateTime, +} + +impl ClosedTimerange { + pub fn new(from: DateTime, to: DateTime) -> Self { + ClosedTimerange { from, to } + } + + pub fn overlap(&self, other: Self) -> Option { + let from = self.from.max(other.from); + let to = self.to.min(other.to); + + // If they overlap return the new timerange + (from < to).then_some(ClosedTimerange { from, to }) + } +} + +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)] +pub struct OpenTimerange { + pub from: Option>, + pub to: Option>, +} + +impl OpenTimerange { + pub fn new(from: Option>, to: Option>) -> Self { + OpenTimerange { from, to } + } + + /// Used to cut the priorities to cover ranges that actually matter to a particular timeseries + /// Takes the from and to times of the timeseries as well as the from and to of the priority range + /// Returns an option, since it could be they do not overlapp at all (and thus it returns empty) + pub fn overlap(&self, other: Self) -> Option { + let fromtime = match (self.from, other.from) { + (Some(lhs), Some(rhs)) => Some(lhs.max(rhs)), // return the later one + (Some(lhs), None) => Some(lhs), + (None, Some(rhs)) => Some(rhs), + (None, None) => None, + }; + let totime = match (self.to, other.to) { + (Some(lhs), Some(rhs)) => Some(lhs.min(rhs)), // return the earlier one + (Some(lhs), None) => Some(lhs), + (None, Some(rhs)) => Some(rhs), + (None, None) => None, + }; + + match (fromtime, totime) { + // If both ends are closed and the ranges overlap return the new timerange + (Some(from), Some(to)) => { + if from >= to { + None + } else { + Some(OpenTimerange { + from: Some(from), + to: Some(to), + }) + } + } + (from, to) => Some(OpenTimerange { from, to }), + } + } +} + /// Type for refreshing caches pub struct Cron { pub state: State, From fd254db49b644dc83eae30abbdbfecdcdd83ed12 Mon Sep 17 00:00:00 2001 From: Louise Carolyn Oram Date: Tue, 2 Dec 2025 10:50:17 +0100 Subject: [PATCH 23/25] add some comments about obs_pgm and station table use --- ingestion/src/util/stinfosys.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index fff2d2c2..b9755634 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -100,6 +100,8 @@ pub fn calc_from_tos( .collect() } +/// Obs_pgm stands for observation program. This contains information about what is expected to send data. +/// We use it to determine if a timeseries should have a to_time (i.e. is closed) or not. async fn fetch_obs_pgm_times( levels: LevelTable, conn: &Client, @@ -154,6 +156,8 @@ async fn fetch_obs_pgm_times( Ok(map) } +/// This is metadata about when a station existed. If the obs_pgm does not have information +/// about a timeseries, we fall back to this information. async fn fetch_station_times(conn: &Client) -> Result { // The funny looking ARRAY_AGG is needed because each station can have multiple from/to times. // For example, the timeseries might have been "reset" after a change of the station position, From 3be5ecfb57798d1e0987135057f84d62c67f9c95 Mon Sep 17 00:00:00 2001 From: Louise Carolyn Oram Date: Thu, 4 Dec 2025 10:38:59 +0100 Subject: [PATCH 24/25] parse only totime as option from stinfosys --- ingestion/src/util/stinfosys.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index b9755634..b759d738 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -142,12 +142,11 @@ async fn fetch_obs_pgm_times( type_id: row.get(4), }; - let fromtime: Option = row.get(5); let totime: Option = row.get(6); map.insert( key, OpenTimerange { - from: fromtime.map(|t| t.and_utc()), + from: row.get(5), to: totime.map(|t| t.and_utc()), }, ); @@ -179,13 +178,11 @@ async fn fetch_station_times(conn: &Client) -> Result = row.get(1); let totime: Option = row.get(2); - ( row.get(0), OpenTimerange { - from: fromtime.map(|t| t.and_utc()), + from: row.get(1), to: totime.map(|t| t.and_utc()), }, ) From 38f106b4bc5d060c0d1a0fe2737b98d99368e31b Mon Sep 17 00:00:00 2001 From: Louise Carolyn Oram Date: Fri, 5 Dec 2025 13:56:00 +0100 Subject: [PATCH 25/25] remove restriction of finding only closed metadata from stinfosys --- ingestion/src/util/stinfosys.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ingestion/src/util/stinfosys.rs b/ingestion/src/util/stinfosys.rs index b759d738..b761ee2c 100644 --- a/ingestion/src/util/stinfosys.rs +++ b/ingestion/src/util/stinfosys.rs @@ -122,8 +122,7 @@ async fn fetch_obs_pgm_times( MIN(fromtime), \ (ARRAY_AGG(totime ORDER BY totime DESC NULLS FIRST))[1] \ FROM obs_pgm \ - GROUP BY stationid, paramid, hlevel, nsensor, priority_messageid \ - HAVING (ARRAY_AGG(totime ORDER BY totime DESC NULLS FIRST))[1] IS NOT NULL"; + GROUP BY stationid, paramid, hlevel, nsensor, priority_messageid"; let rows = conn.query(OBS_PGM_QUERY, &[]).await?; @@ -170,8 +169,7 @@ async fn fetch_station_times(conn: &Client) -> Result