Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d988053
add structures to get the obspgm and station fromtimes
lcoram Nov 5, 2025
b2134b7
first pass at logic for setting appropriate from/to for deactivated
lcoram Nov 6, 2025
faedff7
add sql and function for fixing twisted from/to times
lcoram Nov 6, 2025
6bbf03f
refactor to put from / to in better order
lcoram Nov 6, 2025
94eed71
code to call the untwisting function
lcoram Nov 6, 2025
0ea8a8d
fix test
lcoram Nov 6, 2025
3c64c91
set deactivated to false when updating
lcoram Nov 8, 2025
86069a5
remove untwisting code, but add OR case to open timeseries query
lcoram Nov 10, 2025
a9dfed9
invert from/to in the test
lcoram Nov 10, 2025
7caca7e
close off timeseries that are outside obs_pgm range
lcoram Nov 12, 2025
a60a961
get the from/to for the timeseries from lard, and take that into acco…
lcoram Nov 13, 2025
601f642
refactor the if/else logic for deciding which from/to to use
lcoram Nov 13, 2025
4495abc
add a note about sql for getting ts from lard
lcoram Nov 14, 2025
a80b1a7
use opentimerange from patchwork
lcoram Nov 16, 2025
56b679b
simplify code for choosing from/to
lcoram Nov 19, 2025
6a39f43
get from/to for obs_pgm and station in one call each
lcoram Nov 20, 2025
a16bf60
better naming
lcoram Nov 21, 2025
8433e2a
take into account the max/min obstime of the underlying data for from…
lcoram Nov 24, 2025
3f7011d
get the paramids that are scalar, for use to make sure use right sql …
lcoram Nov 24, 2025
5b7e1a5
ingrids code suggestion, and related cleanup
lcoram Dec 1, 2025
e952140
test on legacy.data, not data and nonscalar_data
lcoram Dec 1, 2025
a1ce38b
move timerange to util, refactor parsing of times from stinfosys for …
lcoram Dec 2, 2025
fd254db
add some comments about obs_pgm and station table use
lcoram Dec 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 1 addition & 66 deletions egress/src/patchwork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
};
use tokio_postgres::{Client, NoTls};
use tracing::{error, warn};
use util::{DbPools, MetLabel, PooledPgConn};
use util::{ClosedTimerange, DbPools, MetLabel, OpenTimerange, PooledPgConn};

pub const PATCHWORK_FUTURES_FAILURES: &str = "patchwork_futures_failures";

Expand Down Expand Up @@ -155,71 +155,6 @@ pub struct PatchworkDatum {
quality_code: Option<i32>,
}

#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ClosedTimerange {
pub from: DateTime<Utc>,
pub to: DateTime<Utc>,
}

impl ClosedTimerange {
pub fn new(from: DateTime<Utc>, to: DateTime<Utc>) -> Self {
ClosedTimerange { from, to }
}

pub fn overlap(&self, other: Self) -> Option<Self> {
let from = self.from.max(other.from);
let to = self.to.min(other.to);

// If they overlap return the new timerange
(from < to).then_some(ClosedTimerange { from, to })
}
}

#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
pub struct OpenTimerange {
pub from: Option<DateTime<Utc>>,
pub to: Option<DateTime<Utc>>,
}

impl OpenTimerange {
pub fn new(from: Option<DateTime<Utc>>, to: Option<DateTime<Utc>>) -> Self {
OpenTimerange { from, to }
}

/// Used to cut the priorities to cover ranges that actually matter to a particular timeseries
/// Takes the from and to times of the timeseries as well as the from and to of the priority range
/// Returns an option, since it could be they do not overlapp at all (and thus it returns empty)
fn overlap(&self, other: Self) -> Option<Self> {
let fromtime = match (self.from, other.from) {
(Some(lhs), Some(rhs)) => Some(lhs.max(rhs)), // return the later one
(Some(lhs), None) => Some(lhs),
(None, Some(rhs)) => Some(rhs),
(None, None) => None,
};
let totime = match (self.to, other.to) {
(Some(lhs), Some(rhs)) => Some(lhs.min(rhs)), // return the earlier one
(Some(lhs), None) => Some(lhs),
(None, Some(rhs)) => Some(rhs),
(None, None) => None,
};

match (fromtime, totime) {
// If both ends are closed and the ranges overlap return the new timerange
(Some(from), Some(to)) => {
if from >= to {
None
} else {
Some(OpenTimerange {
from: Some(from),
to: Some(to),
})
}
}
(from, to) => Some(OpenTimerange { from, to }),
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
pub struct Fill {
// TODO: I'm pretty sure this should never be NULL? In case we can put an Option
Expand Down
10 changes: 5 additions & 5 deletions ingestion/src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ pub async fn refresh_levels((stinfo_conn_string, level_table): &(String, LevelTa
*tables = new_level_table;
}

pub async fn refresh_deactivated((stinfosys, pools): &(Stinfosys, DbPools)) {
info!("Updating timeseries totime");
pub async fn refresh_from_to((stinfosys, pools): &(Stinfosys, DbPools)) {
info!("Updating timeseries fromtime & totime");

// TODO: add retries instead of panicking?
let mut open_conn = pools.open.get().await.unwrap();
let mut restricted_conn = pools.restricted.get().await.unwrap();

let (station_totime, obs_pgm_totime) = stinfosys.cache_deactivated_stinfosys().await.unwrap();
let (obs_pgm_times_map, station_times_map) = stinfosys.cache_closed_stinfosys().await.unwrap();

let (open_res, restricted_res) = tokio::join!(
tsupdate::set_deactivated(&mut open_conn, &obs_pgm_totime, &station_totime),
tsupdate::set_deactivated(&mut restricted_conn, &obs_pgm_totime, &station_totime),
tsupdate::update_from_to(&mut open_conn, &obs_pgm_times_map, &station_times_map),
tsupdate::update_from_to(&mut restricted_conn, &obs_pgm_times_map, &station_times_map),
);

if let Err(err) = open_res {
Expand Down
19 changes: 19 additions & 0 deletions ingestion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub const KAFKA_CHECKED_MESSAGES_RECEIVED: &str = "kafka_checked_messages_receiv
pub const KAFKA_CHECKED_FAILURES: &str = "kafka_checked_failures";
pub const SCALAR_DATAPOINTS: &str = "scalar_datapoints";
pub const NONSCALAR_DATAPOINTS: &str = "nonscalar_datapoints";
pub const FROM_TO_FUTURES_FAILURES: &str = "from_to_futures_failures";

/// Gets an environment variable, providing more details than calling std::env::var() directly.
pub fn getenv(key: &str) -> Result<String, Error> {
Expand Down Expand Up @@ -500,6 +501,24 @@ pub fn get_conversions(filename: &str) -> Result<ParamConversions, Error> {
))
}

pub fn get_scalar_paramids(filename: &str) -> Result<Vec<i32>, Error> {
Ok(csv::Reader::from_path(filename)
.unwrap()
.into_records()
.filter_map(|record_result| match record_result {
Ok(record) => {
if record.get(3).unwrap() == "f" {
let paramid = record.get(0).unwrap().parse::<i32>().unwrap();
Some(paramid)
} else {
None
}
}
Err(_) => None,
})
.collect())
}

/// Middleware function that runs around a request, so we can record how long it took
async fn track_request_duration(req: Request, next: Next) -> impl IntoResponse {
let start = std::time::Instant::now();
Expand Down
10 changes: 6 additions & 4 deletions ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use lard_ingestion::{
cron::{self},
get_conversions, getenv, legacy,
util::{levels, permissions, stinfosys::Stinfosys},
Error, HTTP_REQUESTS_DURATION_SECONDS, KAFKA_CHECKED_FAILURES, KAFKA_CHECKED_MESSAGES_RECEIVED,
KAFKA_RAW_FAILURES, KAFKA_RAW_MESSAGES_RECEIVED, KLDATA_FAILURES, KLDATA_MESSAGES_RECEIVED,
NONSCALAR_DATAPOINTS, QC_FAILURES, SCALAR_DATAPOINTS,
Error, FROM_TO_FUTURES_FAILURES, HTTP_REQUESTS_DURATION_SECONDS, KAFKA_CHECKED_FAILURES,
KAFKA_CHECKED_MESSAGES_RECEIVED, KAFKA_RAW_FAILURES, KAFKA_RAW_MESSAGES_RECEIVED,
KLDATA_FAILURES, KLDATA_MESSAGES_RECEIVED, NONSCALAR_DATAPOINTS, QC_FAILURES,
SCALAR_DATAPOINTS,
};
use util::{Cron, DbPools};

Expand Down Expand Up @@ -81,7 +82,7 @@ async fn main() -> Result<(), Error> {
Stinfosys::new(stinfo_conn_string, level_table.clone()),
db_pools.clone(),
),
action: cron::refresh_deactivated,
action: cron::refresh_from_to,
interval: tokio::time::interval(tokio::time::Duration::from_secs(6 * 3600)),
}
.run_forever(),
Expand Down Expand Up @@ -114,6 +115,7 @@ async fn main() -> Result<(), Error> {
let _ = metrics::counter!(KAFKA_CHECKED_FAILURES);
let _ = metrics::counter!(SCALAR_DATAPOINTS);
let _ = metrics::counter!(NONSCALAR_DATAPOINTS);
let _ = metrics::counter!(FROM_TO_FUTURES_FAILURES);

// non kvalobs-dependent ingestion
#[cfg(feature = "next")]
Expand Down
137 changes: 85 additions & 52 deletions ingestion/src/util/stinfosys.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
use std::collections::HashMap;

use chrono::{DateTime, NaiveDateTime, Utc};
use futures::{stream::FuturesUnordered, StreamExt};
use chrono::NaiveDateTime;
use tokio_postgres::{Client, NoTls};
use tracing::error;

use util::{MetLabel, MetTimeseriesKey};

use crate::{
util::{
levels::{param_get_level, LevelTable},
tsupdate::DeactivatedTimeseries,
},
util::levels::{param_get_level, LevelTable},
Error,
};
use util::{MetLabel, MetTimeseriesKey, OpenTimerange};

pub struct Stinfosys {
conn_string: String,
levels: LevelTable,
}

type StationTotimeMap = HashMap<i32, DateTime<Utc>>;
type ObsPgmTotimeMap = HashMap<MetTimeseriesKey, DateTime<Utc>>;
type StationFromTotimeMap = HashMap<i32, OpenTimerange>;
type ObsPgmFromTotimeMap = HashMap<MetTimeseriesKey, OpenTimerange>;

impl Stinfosys {
pub fn new(conn_string: String, levels: LevelTable) -> Self {
Expand All @@ -31,12 +26,12 @@ impl Stinfosys {
}
}

pub async fn cache_deactivated_stinfosys(
pub async fn cache_closed_stinfosys(
&self,
) -> Result<
(
HashMap<i32, DateTime<Utc>>,
HashMap<MetTimeseriesKey, DateTime<Utc>>,
HashMap<MetTimeseriesKey, OpenTimerange>,
HashMap<i32, OpenTimerange>,
),
Error,
> {
Expand All @@ -48,49 +43,69 @@ impl Stinfosys {
}
});

// Fetch all deactivated timeseries in Stinfosys
let (station_totime, obs_pgm_totime) = tokio::try_join!(
fetch_station_totime(&client),
fetch_obs_pgm_totime(self.levels.clone(), &client),
// Fetch all closed timeseries in Stinfosys
let (obs_pgm_times, station_times) = tokio::try_join!(
fetch_obs_pgm_times(self.levels.clone(), &client),
fetch_station_times(&client),
)?;

Ok((station_totime, obs_pgm_totime))
Ok((obs_pgm_times, station_times))
}
}

pub async fn fetch_deactivated(
obs_pgm_totime: &HashMap<MetTimeseriesKey, DateTime<Utc>>,
station_totime: &HashMap<i32, DateTime<Utc>>,
pub fn calc_from_tos(
obs_pgm_ranges: &HashMap<MetTimeseriesKey, OpenTimerange>,
station_ranges: &HashMap<i32, OpenTimerange>,
data_ranges: HashMap<i64, OpenTimerange>,
labels: Vec<MetLabel>,
) -> Result<Vec<DeactivatedTimeseries>, Error> {
let mut futures = labels
) -> Vec<(i64, OpenTimerange)> {
labels
.iter()
.map(async |label| -> Result<_, Error> {
// Prefer obs_pgm if available
let totime = obs_pgm_totime
.filter_map(|label| {
// Prefer obs_pgm if available, and only use station if no obs_pgm info exists
let stinfo_range = *obs_pgm_ranges
.get(&label.key)
.or(station_totime.get(&label.key.station_id))
.copied();

Ok((label.id, totime))
.or(station_ranges.get(&label.key.station_id))
.unwrap_or(&OpenTimerange {
from: None,
to: None,
});
// we `?` this one because if it's None, the ts doesn't exist and we can't update it
let data = *data_ranges.get(&label.id)?;

let overlap = stinfo_range.overlap(data);

// if the metadata for the timeseries has a to_time, we shouldn't close the ts because it might still
// receive new data
let should_be_closed = stinfo_range.to.is_some();

let out = match (overlap, should_be_closed) {
(Some(overlap), true) => overlap,
(Some(overlap), false) => OpenTimerange {
from: overlap.from,
to: None,
},
(None, true) => OpenTimerange {
from: data.to,
to: data.to,
},
(None, false) => OpenTimerange {
from: stinfo_range.from.max(stinfo_range.from).max(data.from),
to: None,
},
};

Some((label.id, out))
})
.collect::<FuturesUnordered<_>>();

let mut deactivated = vec![];
while let Some(res) = futures.next().await {
let ts = match res? {
(tsid, Some(totime)) => DeactivatedTimeseries { tsid, totime },
// Skip if a valid totime was not found in stinfosys
_ => continue,
};

deactivated.push(ts);
}

Ok(deactivated)
.collect()
}

async fn fetch_obs_pgm_totime(levels: LevelTable, conn: &Client) -> Result<ObsPgmTotimeMap, Error> {
/// Obs_pgm stands for observation program. This contains information about what is expected to send data.
/// We use it to determine if a timeseries should have a to_time (i.e. is closed) or not.
async fn fetch_obs_pgm_times(
levels: LevelTable,
conn: &Client,
) -> Result<ObsPgmFromTotimeMap, Error> {
// The funny looking ARRAY_AGG is needed because each timeseries can have multiple from/to times.
// Most likely related to the fact that stations in the `station` tables can also have
// multiple entries, see [fetch_station_totime]
Expand All @@ -104,14 +119,15 @@ async fn fetch_obs_pgm_totime(levels: LevelTable, conn: &Client) -> Result<ObsPg
hlevel, \
nsensor, \
priority_messageid, \
MIN(fromtime), \
(ARRAY_AGG(totime ORDER BY totime DESC NULLS FIRST))[1] \
FROM obs_pgm \
GROUP BY stationid, paramid, hlevel, nsensor, priority_messageid \
HAVING (ARRAY_AGG(totime ORDER BY totime DESC NULLS FIRST))[1] IS NOT NULL";

let rows = conn.query(OBS_PGM_QUERY, &[]).await?;

let mut map = ObsPgmTotimeMap::new();
let mut map = ObsPgmFromTotimeMap::new();
for row in rows {
let param_id: i32 = row.get(1);

Expand All @@ -126,14 +142,23 @@ async fn fetch_obs_pgm_totime(levels: LevelTable, conn: &Client) -> Result<ObsPg
type_id: row.get(4),
};

let totime: NaiveDateTime = row.get(5);
map.insert(key, totime.and_utc());
let fromtime: Option<NaiveDateTime> = row.get(5);
let totime: Option<NaiveDateTime> = row.get(6);
map.insert(
key,
OpenTimerange {
from: fromtime.map(|t| t.and_utc()),
to: totime.map(|t| t.and_utc()),
},
);
}

Ok(map)
}

async fn fetch_station_totime(conn: &Client) -> Result<StationTotimeMap, Error> {
/// This is metadata about when a station existed. If the obs_pgm does not have information
/// about a timeseries, we fall back to this information.
async fn fetch_station_times(conn: &Client) -> Result<StationFromTotimeMap, Error> {
// The funny looking ARRAY_AGG is needed because each station can have multiple from/to times.
// For example, the timeseries might have been "reset" after a change of the station position,
// even though the station ID did not change.
Expand All @@ -143,6 +168,7 @@ async fn fetch_station_totime(conn: &Client) -> Result<StationTotimeMap, Error>
const STATION_QUERY: &str = "\
SELECT \
stationid, \
MIN(fromtime), \
(ARRAY_AGG(totime ORDER BY totime DESC NULLS FIRST))[1] \
FROM station \
GROUP BY stationid \
Expand All @@ -153,9 +179,16 @@ async fn fetch_station_totime(conn: &Client) -> Result<StationTotimeMap, Error>
Ok(rows
.iter()
.map(|row| {
let totime: NaiveDateTime = row.get(1);

(row.get(0), totime.and_utc())
let fromtime: Option<NaiveDateTime> = row.get(1);
let totime: Option<NaiveDateTime> = row.get(2);

(
row.get(0),
OpenTimerange {
from: fromtime.map(|t| t.and_utc()),
to: totime.map(|t| t.and_utc()),
},
)
})
.collect())
}
Loading
Loading