Skip to content

Commit 2483f41

Browse files
committed
get from/to for obs_pgm and station in one call each
1 parent db18b0b commit 2483f41

File tree

5 files changed

+78
-160
lines changed

5 files changed

+78
-160
lines changed

ingestion/src/cron.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,24 +39,11 @@ pub async fn refresh_deactivated((stinfosys, pools): &(Stinfosys, DbPools)) {
3939
let mut open_conn = pools.open.get().await.unwrap();
4040
let mut restricted_conn = pools.restricted.get().await.unwrap();
4141

42-
let (obs_pgm_fromtime, obs_pgm_totime, station_fromtime, station_totime) =
43-
stinfosys.cache_deactivated_stinfosys().await.unwrap();
42+
let (obs_pgm_times, station_times) = stinfosys.cache_deactivated_stinfosys().await.unwrap();
4443

4544
let (open_res, restricted_res) = tokio::join!(
46-
tsupdate::set_from_to_obs_pgm(
47-
&mut open_conn,
48-
&obs_pgm_fromtime,
49-
&obs_pgm_totime,
50-
&station_fromtime,
51-
&station_totime
52-
),
53-
tsupdate::set_from_to_obs_pgm(
54-
&mut restricted_conn,
55-
&obs_pgm_fromtime,
56-
&obs_pgm_totime,
57-
&station_fromtime,
58-
&station_totime
59-
),
45+
tsupdate::set_from_to_obs_pgm(&mut open_conn, &obs_pgm_times, &station_times),
46+
tsupdate::set_from_to_obs_pgm(&mut restricted_conn, &obs_pgm_times, &station_times),
6047
);
6148

6249
if let Err(err) = open_res {

ingestion/src/util/stinfosys.rs

Lines changed: 53 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::HashMap;
22

3-
use chrono::{DateTime, NaiveDateTime, Utc};
3+
use chrono::NaiveDateTime;
44
use futures::{stream::FuturesUnordered, StreamExt};
55
use tokio_postgres::{Client, NoTls};
66
use tracing::error;
@@ -20,10 +20,8 @@ pub struct Stinfosys {
2020
levels: LevelTable,
2121
}
2222

23-
type StationTotimeMap = HashMap<i32, DateTime<Utc>>;
24-
type StationFromtimeMap = HashMap<i32, DateTime<Utc>>;
25-
type ObsPgmTotimeMap = HashMap<MetTimeseriesKey, DateTime<Utc>>;
26-
type ObsPgmFromtimeMap = HashMap<MetTimeseriesKey, DateTime<Utc>>;
23+
type StationFromTotimeMap = HashMap<i32, OpenTimerange>;
24+
type ObsPgmFromTotimeMap = HashMap<MetTimeseriesKey, OpenTimerange>;
2725

2826
impl Stinfosys {
2927
pub fn new(conn_string: String, levels: LevelTable) -> Self {
@@ -37,10 +35,8 @@ impl Stinfosys {
3735
&self,
3836
) -> Result<
3937
(
40-
HashMap<MetTimeseriesKey, DateTime<Utc>>,
41-
HashMap<MetTimeseriesKey, DateTime<Utc>>,
42-
HashMap<i32, DateTime<Utc>>,
43-
HashMap<i32, DateTime<Utc>>,
38+
HashMap<MetTimeseriesKey, OpenTimerange>,
39+
HashMap<i32, OpenTimerange>,
4440
),
4541
Error,
4642
> {
@@ -53,27 +49,18 @@ impl Stinfosys {
5349
});
5450

5551
// Fetch all deactivated timeseries in Stinfosys
56-
let (obs_pgm_fromtime, obs_pgm_totime, station_fromtime, station_totime) = tokio::try_join!(
57-
fetch_obs_pgm_fromtime(self.levels.clone(), &client),
58-
fetch_obs_pgm_totime(self.levels.clone(), &client),
59-
fetch_station_fromtime(&client),
60-
fetch_station_totime(&client),
52+
let (obs_pgm_times, station_times) = tokio::try_join!(
53+
fetch_obs_pgm_times(self.levels.clone(), &client),
54+
fetch_station_times(&client),
6155
)?;
6256

63-
Ok((
64-
obs_pgm_fromtime,
65-
obs_pgm_totime,
66-
station_fromtime,
67-
station_totime,
68-
))
57+
Ok((obs_pgm_times, station_times))
6958
}
7059
}
7160

7261
pub async fn fetch_from_to_for_update(
73-
obs_pgm_fromtime: &HashMap<MetTimeseriesKey, DateTime<Utc>>,
74-
obs_pgm_totime: &HashMap<MetTimeseriesKey, DateTime<Utc>>,
75-
station_fromtime: &HashMap<i32, DateTime<Utc>>,
76-
station_totime: &HashMap<i32, DateTime<Utc>>,
62+
obs_pgm_times: &HashMap<MetTimeseriesKey, OpenTimerange>,
63+
station_times: &HashMap<i32, OpenTimerange>,
7764
ts_from_to: HashMap<i64, OpenTimerange>,
7865
labels: Vec<MetLabel>,
7966
) -> Result<Vec<TSupdateTimeseries>, Error> {
@@ -83,15 +70,21 @@ pub async fn fetch_from_to_for_update(
8370
// check we have this key for the TS
8471
if ts_from_to.contains_key(&label.id) {
8572
// use obs_pgm if exists, or else station if exists, or else will be none
86-
let fromtime = obs_pgm_fromtime
87-
.get(&label.key)
88-
.or(station_fromtime.get(&label.key.station_id))
89-
.copied();
90-
91-
let totime = obs_pgm_totime
92-
.get(&label.key)
93-
.or(station_totime.get(&label.key.station_id))
94-
.copied();
73+
let fromtime = match obs_pgm_times.get(&label.key) {
74+
Some(pgm_fromto) => pgm_fromto.from,
75+
None => match station_times.get(&label.key.station_id) {
76+
Some(station_fromto) => station_fromto.from,
77+
None => None,
78+
},
79+
};
80+
81+
let totime = match obs_pgm_times.get(&label.key) {
82+
Some(pgm_fromto) => pgm_fromto.to,
83+
None => match station_times.get(&label.key.station_id) {
84+
Some(station_fromto) => station_fromto.to,
85+
None => None,
86+
},
87+
};
9588

9689
if fromtime.is_none() && totime.is_none() {
9790
// no metadata, keep the ts from/to
@@ -160,7 +153,10 @@ pub async fn fetch_from_to_for_update(
160153
Ok(ts_update)
161154
}
162155

163-
async fn fetch_obs_pgm_totime(levels: LevelTable, conn: &Client) -> Result<ObsPgmTotimeMap, Error> {
156+
async fn fetch_obs_pgm_times(
157+
levels: LevelTable,
158+
conn: &Client,
159+
) -> Result<ObsPgmFromTotimeMap, Error> {
164160
// The funny looking ARRAY_AGG is needed because each timeseries can have multiple from/to times.
165161
// Most likely related to the fact that stations in the `station` tables can also have
166162
// multiple entries, see [fetch_station_totime]
@@ -174,54 +170,15 @@ async fn fetch_obs_pgm_totime(levels: LevelTable, conn: &Client) -> Result<ObsPg
174170
hlevel, \
175171
nsensor, \
176172
priority_messageid, \
173+
MIN(fromtime), \
177174
(ARRAY_AGG(totime ORDER BY totime DESC NULLS FIRST))[1] \
178175
FROM obs_pgm \
179176
GROUP BY stationid, paramid, hlevel, nsensor, priority_messageid \
180177
HAVING (ARRAY_AGG(totime ORDER BY totime DESC NULLS FIRST))[1] IS NOT NULL";
181178

182179
let rows = conn.query(OBS_PGM_QUERY, &[]).await?;
183180

184-
let mut map = ObsPgmTotimeMap::new();
185-
for row in rows {
186-
let param_id: i32 = row.get(1);
187-
188-
let level = row.get(2);
189-
let level = param_get_level(levels.clone(), param_id, level)?;
190-
191-
let key = MetTimeseriesKey {
192-
station_id: row.get(0),
193-
param_id,
194-
level,
195-
sensor: row.get(3),
196-
type_id: row.get(4),
197-
};
198-
199-
let totime: NaiveDateTime = row.get(5);
200-
map.insert(key, totime.and_utc());
201-
}
202-
203-
Ok(map)
204-
}
205-
206-
async fn fetch_obs_pgm_fromtime(
207-
levels: LevelTable,
208-
conn: &Client,
209-
) -> Result<ObsPgmFromtimeMap, Error> {
210-
const OBS_PGM_QUERY: &str = "\
211-
SELECT \
212-
stationid, \
213-
paramid, \
214-
hlevel, \
215-
nsensor, \
216-
priority_messageid, \
217-
(ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] \
218-
FROM obs_pgm \
219-
GROUP BY stationid, paramid, hlevel, nsensor, priority_messageid \
220-
HAVING (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] IS NOT NULL";
221-
222-
let rows = conn.query(OBS_PGM_QUERY, &[]).await?;
223-
224-
let mut map = ObsPgmFromtimeMap::new();
181+
let mut map = ObsPgmFromTotimeMap::new();
225182
for row in rows {
226183
let param_id: i32 = row.get(1);
227184

@@ -236,14 +193,21 @@ async fn fetch_obs_pgm_fromtime(
236193
type_id: row.get(4),
237194
};
238195

239-
let totime: NaiveDateTime = row.get(5);
240-
map.insert(key, totime.and_utc());
196+
let fromtime: NaiveDateTime = row.get(5);
197+
let totime: NaiveDateTime = row.get(6);
198+
map.insert(
199+
key,
200+
OpenTimerange {
201+
from: Some(fromtime.and_utc()),
202+
to: Some(totime.and_utc()),
203+
},
204+
);
241205
}
242206

243207
Ok(map)
244208
}
245209

246-
async fn fetch_station_totime(conn: &Client) -> Result<StationTotimeMap, Error> {
210+
async fn fetch_station_times(conn: &Client) -> Result<StationFromTotimeMap, Error> {
247211
// The funny looking ARRAY_AGG is needed because each station can have multiple from/to times.
248212
// For example, the timeseries might have been "reset" after a change of the station position,
249213
// even though the station ID did not change.
@@ -253,6 +217,7 @@ async fn fetch_station_totime(conn: &Client) -> Result<StationTotimeMap, Error>
253217
const STATION_QUERY: &str = "\
254218
SELECT \
255219
stationid, \
220+
MIN(fromtime), \
256221
(ARRAY_AGG(totime ORDER BY totime DESC NULLS FIRST))[1] \
257222
FROM station \
258223
GROUP BY stationid \
@@ -263,30 +228,16 @@ async fn fetch_station_totime(conn: &Client) -> Result<StationTotimeMap, Error>
263228
Ok(rows
264229
.iter()
265230
.map(|row| {
266-
let totime: NaiveDateTime = row.get(1);
267-
268-
(row.get(0), totime.and_utc())
269-
})
270-
.collect())
271-
}
272-
273-
async fn fetch_station_fromtime(conn: &Client) -> Result<StationFromtimeMap, Error> {
274-
const STATION_QUERY: &str = "\
275-
SELECT \
276-
stationid, \
277-
(ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] \
278-
FROM station \
279-
GROUP BY stationid \
280-
HAVING (ARRAY_AGG(fromtime ORDER BY fromtime ASC))[1] IS NOT NULL";
281-
282-
let rows = conn.query(STATION_QUERY, &[]).await?;
283-
284-
Ok(rows
285-
.iter()
286-
.map(|row| {
287-
let totime: NaiveDateTime = row.get(1);
288-
289-
(row.get(0), totime.and_utc())
231+
let fromtime: NaiveDateTime = row.get(1);
232+
let totime: NaiveDateTime = row.get(2);
233+
234+
(
235+
row.get(0),
236+
OpenTimerange {
237+
from: Some(fromtime.and_utc()),
238+
to: Some(totime.and_utc()),
239+
},
240+
)
290241
})
291242
.collect())
292243
}

ingestion/src/util/tsupdate.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,8 @@ impl TSupdateTimeseries {
6060

6161
pub async fn set_from_to_obs_pgm(
6262
conn: &mut PooledPgConn<'_>,
63-
obs_pgm_fromtime: &HashMap<MetTimeseriesKey, DateTime<Utc>>,
64-
obs_pgm_totime: &HashMap<MetTimeseriesKey, DateTime<Utc>>,
65-
station_fromtime: &HashMap<i32, DateTime<Utc>>,
66-
station_totime: &HashMap<i32, DateTime<Utc>>,
63+
obs_pgm_times: &HashMap<MetTimeseriesKey, OpenTimerange>,
64+
station_times: &HashMap<i32, OpenTimerange>,
6765
) -> Result<(), Error> {
6866
let tx = conn.transaction().await?;
6967

@@ -95,15 +93,8 @@ pub async fn set_from_to_obs_pgm(
9593
ts_from_to.insert(row.get(0), OpenTimerange::new(row.get(6), row.get(7)));
9694
});
9795

98-
let deactivated = fetch_from_to_for_update(
99-
obs_pgm_fromtime,
100-
obs_pgm_totime,
101-
station_fromtime,
102-
station_totime,
103-
ts_from_to,
104-
labels,
105-
)
106-
.await?;
96+
let deactivated =
97+
fetch_from_to_for_update(obs_pgm_times, station_times, ts_from_to, labels).await?;
10798

10899
future::join_all(deactivated.into_iter().map(async |ts| {
109100
match tx

integration_tests/tests/common/mocks.rs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,23 @@ impl MetadataMock {
2525
&self,
2626
) -> Result<
2727
(
28-
HashMap<i32, DateTime<Utc>>,
29-
HashMap<i32, DateTime<Utc>>,
30-
HashMap<MetTimeseriesKey, DateTime<Utc>>,
31-
HashMap<MetTimeseriesKey, DateTime<Utc>>,
28+
HashMap<MetTimeseriesKey, OpenTimerange>,
29+
HashMap<i32, OpenTimerange>,
3230
),
3331
lard_ingestion::Error,
3432
> {
35-
let mut station_totime = HashMap::new();
36-
station_totime.insert(self.station, self.totime);
37-
38-
let mut station_fromtime = HashMap::new();
39-
station_fromtime.insert(self.station, self.fromtime);
40-
41-
let obs_pgm_totime: HashMap<MetTimeseriesKey, DateTime<Utc>> = HashMap::new();
42-
let obs_pgm_fromtime: HashMap<MetTimeseriesKey, DateTime<Utc>> = HashMap::new();
43-
44-
Ok((
45-
station_fromtime,
46-
station_totime,
47-
obs_pgm_fromtime,
48-
obs_pgm_totime,
49-
))
33+
let mut station_times = HashMap::new();
34+
station_times.insert(
35+
self.station,
36+
OpenTimerange {
37+
from: Some(self.fromtime),
38+
to: Some(self.totime),
39+
},
40+
);
41+
42+
let obs_pgm_times: HashMap<MetTimeseriesKey, OpenTimerange> = HashMap::new();
43+
44+
Ok((obs_pgm_times, station_times))
5045
}
5146
}
5247

integration_tests/tests/end_to_end.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -192,18 +192,12 @@ async fn test_totime_update() {
192192
assert_eq!(totime, None);
193193
}
194194

195-
let (station_fromtime, station_totime, obs_pgm_fromtime, obs_pgm_totime) =
195+
let (obs_pgm_times, station_times) =
196196
metadata_mock.cache_deactivated_stinfosys().await.unwrap();
197197

198-
set_from_to_obs_pgm(
199-
&mut conn,
200-
&obs_pgm_fromtime,
201-
&obs_pgm_totime,
202-
&station_fromtime,
203-
&station_totime,
204-
)
205-
.await
206-
.unwrap();
198+
set_from_to_obs_pgm(&mut conn, &obs_pgm_times, &station_times)
199+
.await
200+
.unwrap();
207201

208202
let after = get_totime(&conn).await;
209203

0 commit comments

Comments
 (0)