Skip to content

Commit 38f1102

Browse files
committed
add structures to get the obspgm and station fromtimes
1 parent 8afb2e7 commit 38f1102

File tree

5 files changed

+154
-17
lines changed

5 files changed

+154
-17
lines changed

ingestion/src/cron.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,24 @@ pub async fn refresh_deactivated((stinfosys, pools): &(Stinfosys, DbPools)) {
3939
let mut open_conn = pools.open.get().await.unwrap();
4040
let mut restricted_conn = pools.restricted.get().await.unwrap();
4141

42-
let (station_totime, obs_pgm_totime) = stinfosys.cache_deactivated_stinfosys().await.unwrap();
42+
let (station_totime, station_fromtime, obs_pgm_totime, obs_pgm_fromtime) =
43+
stinfosys.cache_deactivated_stinfosys().await.unwrap();
4344

4445
let (open_res, restricted_res) = tokio::join!(
45-
tsupdate::set_deactivated(&mut open_conn, &obs_pgm_totime, &station_totime),
46-
tsupdate::set_deactivated(&mut restricted_conn, &obs_pgm_totime, &station_totime),
46+
tsupdate::set_deactivated(
47+
&mut open_conn,
48+
&obs_pgm_totime,
49+
&obs_pgm_fromtime,
50+
&station_totime,
51+
&station_fromtime
52+
),
53+
tsupdate::set_deactivated(
54+
&mut restricted_conn,
55+
&obs_pgm_totime,
56+
&obs_pgm_fromtime,
57+
&station_totime,
58+
&station_fromtime
59+
),
4760
);
4861

4962
if let Err(err) = open_res {

ingestion/src/util/stinfosys.rs

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ pub struct Stinfosys {
2121
}
2222

2323
type StationTotimeMap = HashMap<i32, DateTime<Utc>>;
24+
type StationFromtimeMap = HashMap<i32, DateTime<Utc>>;
2425
type ObsPgmTotimeMap = HashMap<MetTimeseriesKey, DateTime<Utc>>;
26+
type ObsPgmFromtimeMap = HashMap<MetTimeseriesKey, DateTime<Utc>>;
2527

2628
impl Stinfosys {
2729
pub fn new(conn_string: String, levels: LevelTable) -> Self {
@@ -36,6 +38,8 @@ impl Stinfosys {
3638
) -> Result<
3739
(
3840
HashMap<i32, DateTime<Utc>>,
41+
HashMap<i32, DateTime<Utc>>,
42+
HashMap<MetTimeseriesKey, DateTime<Utc>>,
3943
HashMap<MetTimeseriesKey, DateTime<Utc>>,
4044
),
4145
Error,
@@ -49,37 +53,61 @@ impl Stinfosys {
4953
});
5054

5155
// Fetch all deactivated timeseries in Stinfosys
52-
let (station_totime, obs_pgm_totime) = tokio::try_join!(
56+
let (station_totime, station_fromtime, obs_pgm_totime, obs_pgm_fromtime) = tokio::try_join!(
5357
fetch_station_totime(&client),
58+
fetch_station_fromtime(&client),
5459
fetch_obs_pgm_totime(self.levels.clone(), &client),
60+
fetch_obs_pgm_fromtime(self.levels.clone(), &client),
5561
)?;
5662

57-
Ok((station_totime, obs_pgm_totime))
63+
Ok((
64+
station_totime,
65+
station_fromtime,
66+
obs_pgm_totime,
67+
obs_pgm_fromtime,
68+
))
5869
}
5970
}
6071

6172
pub async fn fetch_deactivated(
6273
obs_pgm_totime: &HashMap<MetTimeseriesKey, DateTime<Utc>>,
74+
obs_pgm_fromtime: &HashMap<MetTimeseriesKey, DateTime<Utc>>,
6375
station_totime: &HashMap<i32, DateTime<Utc>>,
76+
station_fromtime: &HashMap<i32, DateTime<Utc>>,
6477
labels: Vec<MetLabel>,
6578
) -> Result<Vec<DeactivatedTimeseries>, Error> {
6679
let mut futures = labels
6780
.iter()
6881
.map(async |label| -> Result<_, Error> {
69-
// Prefer obs_pgm if available
82+
// TODO: Figure out how best to set these when obs_pgm makes no sense ...
83+
// particularly when it does not overlap at all with the time range
84+
// |------obs_pgm------|
85+
// |--station--|
86+
// but also if the from or to from obs_pgm actually open beyond when the
87+
// station had data aka:
88+
// |------obs_pgm------|
89+
// |--station--|
7090
let totime = obs_pgm_totime
7191
.get(&label.key)
7292
.or(station_totime.get(&label.key.station_id))
7393
.copied();
94+
let fromtime = obs_pgm_fromtime
95+
.get(&label.key)
96+
.or(station_fromtime.get(&label.key.station_id))
97+
.copied();
7498

75-
Ok((label.id, totime))
99+
Ok((label.id, fromtime, totime))
76100
})
77101
.collect::<FuturesUnordered<_>>();
78102

79103
let mut deactivated = vec![];
80104
while let Some(res) = futures.next().await {
81105
let ts = match res? {
82-
(tsid, Some(totime)) => DeactivatedTimeseries { tsid, totime },
106+
(tsid, Some(fromtime), Some(totime)) => DeactivatedTimeseries {
107+
tsid,
108+
fromtime,
109+
totime,
110+
},
83111
// Skip if a valid totime was not found in stinfosys
84112
_ => continue,
85113
};
@@ -133,6 +161,46 @@ async fn fetch_obs_pgm_totime(levels: LevelTable, conn: &Client) -> Result<ObsPg
133161
Ok(map)
134162
}
135163

164+
async fn fetch_obs_pgm_fromtime(
165+
levels: LevelTable,
166+
conn: &Client,
167+
) -> Result<ObsPgmFromtimeMap, Error> {
168+
const OBS_PGM_QUERY: &str = "\
169+
SELECT \
170+
stationid, \
171+
paramid, \
172+
hlevel, \
173+
nsensor, \
174+
priority_messageid, \
175+
(ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] \
176+
FROM obs_pgm \
177+
GROUP BY stationid, paramid, hlevel, nsensor, priority_messageid \
178+
HAVING (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] IS NOT NULL";
179+
180+
let rows = conn.query(OBS_PGM_QUERY, &[]).await?;
181+
182+
let mut map = ObsPgmFromtimeMap::new();
183+
for row in rows {
184+
let param_id: i32 = row.get(1);
185+
186+
let level = row.get(2);
187+
let level = param_get_level(levels.clone(), param_id, level)?;
188+
189+
let key = MetTimeseriesKey {
190+
station_id: row.get(0),
191+
param_id,
192+
level,
193+
sensor: row.get(3),
194+
type_id: row.get(4),
195+
};
196+
197+
let totime: NaiveDateTime = row.get(5);
198+
map.insert(key, totime.and_utc());
199+
}
200+
201+
Ok(map)
202+
}
203+
136204
async fn fetch_station_totime(conn: &Client) -> Result<StationTotimeMap, Error> {
137205
// The funny looking ARRAY_AGG is needed because each station can have multiple from/to times.
138206
// For example, the timeseries might have been "reset" after a change of the station position,
@@ -159,3 +227,24 @@ async fn fetch_station_totime(conn: &Client) -> Result<StationTotimeMap, Error>
159227
})
160228
.collect())
161229
}
230+
231+
async fn fetch_station_fromtime(conn: &Client) -> Result<StationFromtimeMap, Error> {
232+
const STATION_QUERY: &str = "\
233+
SELECT \
234+
stationid, \
235+
(ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] \
236+
FROM station \
237+
GROUP BY stationid \
238+
HAVING (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] IS NOT NULL";
239+
240+
let rows = conn.query(STATION_QUERY, &[]).await?;
241+
242+
Ok(rows
243+
.iter()
244+
.map(|row| {
245+
let totime: NaiveDateTime = row.get(1);
246+
247+
(row.get(0), totime.and_utc())
248+
})
249+
.collect())
250+
}

ingestion/src/util/tsupdate.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,25 @@ const OPEN_TIMESERIES_QUERY: &str = "\
2525
const UPDATE_QUERY: &str = "\
2626
UPDATE public.timeseries SET \
2727
totime = $1, \
28+
fromtime = $2, \
2829
deactivated = true \
29-
WHERE id = $2";
30+
WHERE id = $3";
3031

3132
pub struct DeactivatedTimeseries {
3233
/// Timeseries to be updated
3334
pub tsid: i64,
35+
/// Fromtime value found in the metadata source
36+
pub fromtime: DateTime<Utc>,
3437
/// Totime value found in the metadata source
3538
pub totime: DateTime<Utc>,
3639
}
3740

3841
pub async fn set_deactivated(
3942
conn: &mut PooledPgConn<'_>,
4043
obs_pgm_totime: &HashMap<MetTimeseriesKey, DateTime<Utc>>,
44+
obs_pgm_fromtime: &HashMap<MetTimeseriesKey, DateTime<Utc>>,
4145
station_totime: &HashMap<i32, DateTime<Utc>>,
46+
station_fromtime: &HashMap<i32, DateTime<Utc>>,
4247
) -> Result<(), Error> {
4348
let tx = conn.transaction().await?;
4449

@@ -65,10 +70,20 @@ pub async fn set_deactivated(
6570
})
6671
.collect();
6772

68-
let deactivated = fetch_deactivated(obs_pgm_totime, station_totime, labels).await?;
73+
let deactivated = fetch_deactivated(
74+
obs_pgm_totime,
75+
obs_pgm_fromtime,
76+
station_totime,
77+
station_fromtime,
78+
labels,
79+
)
80+
.await?;
6981

7082
future::join_all(deactivated.into_iter().map(async |ts| {
71-
match tx.execute(UPDATE_QUERY, &[&ts.totime, &ts.tsid]).await {
83+
match tx
84+
.execute(UPDATE_QUERY, &[&ts.totime, &ts.fromtime, &ts.tsid])
85+
.await
86+
{
7287
Ok(_) => (), //info!("Tsid {} updated", ts.tsid),
7388
Err(err) => error!("Could not update tsid {}: {}", ts.tsid, err),
7489
}

integration_tests/tests/common/mocks.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use lard_ingestion::util::{
1616

1717
pub struct MetadataMock {
1818
pub station: i32,
19+
pub fromtime: DateTime<Utc>,
1920
pub totime: DateTime<Utc>,
2021
}
2122

@@ -25,16 +26,27 @@ impl MetadataMock {
2526
) -> Result<
2627
(
2728
HashMap<i32, DateTime<Utc>>,
29+
HashMap<i32, DateTime<Utc>>,
30+
HashMap<MetTimeseriesKey, DateTime<Utc>>,
2831
HashMap<MetTimeseriesKey, DateTime<Utc>>,
2932
),
3033
lard_ingestion::Error,
3134
> {
3235
let mut station_totime = HashMap::new();
3336
station_totime.insert(self.station, self.totime);
3437

35-
let obs_pgm_totime: HashMap<MetTimeseriesKey, DateTime<Utc>> = HashMap::new();
38+
let mut station_fromtime = HashMap::new();
39+
station_fromtime.insert(self.station, self.fromtime);
3640

37-
Ok((station_totime, obs_pgm_totime))
41+
let obs_pgm_totime: HashMap<MetTimeseriesKey, DateTime<Utc>> = HashMap::new();
42+
let obs_pgm_fromtime: HashMap<MetTimeseriesKey, DateTime<Utc>> = HashMap::new();
43+
44+
Ok((
45+
station_totime,
46+
station_fromtime,
47+
obs_pgm_totime,
48+
obs_pgm_fromtime,
49+
))
3850
}
3951
}
4052

integration_tests/tests/end_to_end.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,12 @@ async fn test_totime_update() {
162162
},
163163
];
164164

165+
let fromtime = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
165166
let totime = Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap();
166167

167168
let metadata_mock = MetadataMock {
168169
station: 10001,
170+
fromtime,
169171
totime,
170172
};
171173

@@ -190,12 +192,18 @@ async fn test_totime_update() {
190192
assert_eq!(totime, None);
191193
}
192194

193-
let (station_totime, obs_pgm_totime) =
195+
let (station_totime, station_fromtime, obs_pgm_totime, obs_pgm_fromtime) =
194196
metadata_mock.cache_deactivated_stinfosys().await.unwrap();
195197

196-
set_deactivated(&mut conn, &obs_pgm_totime, &station_totime)
197-
.await
198-
.unwrap();
198+
set_deactivated(
199+
&mut conn,
200+
&obs_pgm_totime,
201+
&obs_pgm_fromtime,
202+
&station_totime,
203+
&station_fromtime,
204+
)
205+
.await
206+
.unwrap();
199207

200208
let after = get_totime(&conn).await;
201209

0 commit comments

Comments
 (0)