Skip to content

Commit 07bde81

Browse files
committed
Use unnest in bookie
1 parent 52c5b8d commit 07bde81

File tree

1 file changed

+73
-36
lines changed

1 file changed

+73
-36
lines changed

crates/corro-types/src/agent.rs

Lines changed: 73 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414
time::{Duration, Instant},
1515
};
1616

17-
use antithesis_sdk::{assert_always, assert_unreachable};
17+
use antithesis_sdk::assert_unreachable;
1818
use arc_swap::ArcSwap;
1919
use camino::Utf8PathBuf;
2020
use compact_str::{CompactString, ToCompactString};
@@ -50,8 +50,8 @@ use crate::{
5050
pubsub::SubsManager,
5151
schema::Schema,
5252
sqlite::{
53-
rusqlite_to_crsqlite, rusqlite_to_crsqlite_write, setup_conn, CrConn, Migration,
54-
SqlitePool, SqlitePoolError,
53+
rusqlite_to_crsqlite, rusqlite_to_crsqlite_write, setup_conn, unnest_param, CrConn,
54+
Migration, SqlitePool, SqlitePoolError,
5555
},
5656
updates::UpdatesManager,
5757
};
@@ -1135,51 +1135,88 @@ impl VersionsSnapshot {
11351135
trace!(actor_id = %self.actor_id, "new: {:?}", changes.insert_set);
11361136

11371137
// those are actual ranges we had stored and will change, remove them from the DB
1138-
for range in std::mem::take(&mut changes.remove_ranges) {
1139-
debug!(actor_id = %self.actor_id, "deleting {range:?}");
1138+
{
1139+
let remove_ranges = std::mem::take(&mut changes.remove_ranges);
1140+
let actors = unnest_param(remove_ranges.iter().map(|_| self.actor_id));
1141+
let starts = unnest_param(remove_ranges.iter().map(|r| r.start()));
1142+
let ends = unnest_param(remove_ranges.iter().map(|r| r.end()));
1143+
// TODO: use returning to discover which ranges were actually deleted
11401144
let count = conn
1141-
.prepare_cached("DELETE FROM __corro_bookkeeping_gaps WHERE actor_id = :actor_id AND start = :start AND end = :end")?
1145+
.prepare_cached(
1146+
"
1147+
DELETE FROM __corro_bookkeeping_gaps WHERE (actor_id, start, end)
1148+
IN (SELECT value0, value1, value2 FROM unnest(:actors, :starts, :ends))
1149+
",
1150+
)?
11421151
.execute(named_params! {
1143-
":actor_id": self.actor_id,
1144-
":start": range.start(),
1145-
":end": range.end()
1152+
":actors": actors,
1153+
":starts": starts,
1154+
":ends": ends,
11461155
})?;
1147-
if count != 1 {
1148-
warn!(actor_id = %self.actor_id, "did not delete gap from db: {range:?}");
1156+
if count != remove_ranges.len() {
1157+
warn!(actor_id = %self.actor_id, "did not delete some gaps from db: {remove_ranges:?}");
1158+
let details: serde_json::Value = json!({"count": count, "ranges": remove_ranges});
1159+
assert_unreachable!("ineffective deletion of gaps in-db", &details);
11491160
}
1150-
let details = json!({"count": count, "range": range});
1151-
assert_always!(count == 1, "ineffective deletion of gaps in-db", &details);
1152-
for version in CrsqlDbVersionRange::from(&range) {
1153-
self.partials.remove(&version);
1161+
1162+
for range in remove_ranges {
1163+
for version in CrsqlDbVersionRange::from(&range) {
1164+
self.partials.remove(&version);
1165+
}
1166+
self.needed.remove(range);
11541167
}
1155-
self.needed.remove(range);
11561168
}
11571169

1158-
for range in std::mem::take(&mut changes.insert_set) {
1159-
debug!(actor_id = %self.actor_id, "inserting {range:?}");
1160-
let res = conn
1170+
{
1171+
let insert_set = std::mem::take(&mut changes.insert_set);
1172+
let actors = unnest_param(insert_set.iter().map(|_| self.actor_id));
1173+
let starts = unnest_param(insert_set.iter().map(|r| r.start()));
1174+
let ends = unnest_param(insert_set.iter().map(|r| r.end()));
1175+
debug!(actor_id = %self.actor_id, "inserting {insert_set:?}");
1176+
// TODO: use returning to discover which ranges were actually inserted
1177+
let count = conn
11611178
.prepare_cached(
1162-
"INSERT INTO __corro_bookkeeping_gaps VALUES (:actor_id, :start, :end)",
1179+
"
1180+
INSERT OR IGNORE INTO __corro_bookkeeping_gaps (actor_id, start, end)
1181+
SELECT value0, value1, value2 FROM unnest(:actors, :starts, :ends)
1182+
",
11631183
)?
11641184
.execute(named_params! {
1165-
":actor_id": self.actor_id,
1166-
":start": range.start(),
1167-
":end": range.end()
1168-
});
1169-
1170-
if let Err(e) = res {
1171-
let (actor_id, start, end) : (ActorId, CrsqlDbVersion, CrsqlDbVersion) = conn.query_row("SELECT actor_id, start, end FROM __corro_bookkeeping_gaps WHERE actor_id = :actor_id AND start = :start", named_params! {
1172-
":actor_id": self.actor_id,
1173-
":start": range.start(),
1174-
}, |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
1175-
1176-
warn!("already had gaps entry! actor_id: {actor_id}, start: {start}, end: {end}");
1177-
let details = json!({"actor_id": actor_id, "start": start, "end": end});
1178-
assert_unreachable!("gaps entry present", &details);
1185+
":actors": actors,
1186+
":starts": starts,
1187+
":ends": ends,
1188+
})?;
1189+
if count != insert_set.len() {
1190+
warn!(actor_id = %self.actor_id, "did not insert some gaps into db: {insert_set:?}");
1191+
1192+
let existing: Vec<(ActorId, CrsqlDbVersion, CrsqlDbVersion)> = conn
1193+
.prepare_cached(
1194+
"
1195+
SELECT actor_id, start, end FROM __corro_bookkeeping_gaps
1196+
WHERE (actor_id, start)
1197+
IN (SELECT value0, value1 FROM unnest(:actors, :starts))",
1198+
)?
1199+
.query_map(
1200+
named_params! {
1201+
":actors": actors,
1202+
":starts": starts,
1203+
},
1204+
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1205+
)?
1206+
.collect::<rusqlite::Result<Vec<_>>>()?;
1207+
1208+
warn!("already had gaps entries! existing: {existing:?}");
1209+
let details: serde_json::Value =
1210+
json!({"count": count, "insert_set": insert_set, "existing": existing});
1211+
assert_unreachable!("ineffective insertion of gaps in-db", &details);
1212+
return Err(rusqlite::Error::ModuleError(
1213+
"Gaps entries already present in DB".to_string(),
1214+
));
1215+
}
11791216

1180-
return Err(e);
1217+
for range in insert_set {
1218+
self.needed.insert(range);
11811219
}
1182-
self.needed.insert(range);
11831220
}
11841221

11851222
self.max = changes.max.take();

0 commit comments

Comments
 (0)