Skip to content

Commit 7c3fb12

Browse files
committed
Add load and flush during startup/shutdown
1 parent af0a9b5 commit 7c3fb12

File tree

3 files changed

+52
-98
lines changed

3 files changed

+52
-98
lines changed

src/main.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ async fn main() -> Result<(), Error> {
8787
Err(err) => error!("Failed to run full vacuum on startup: {:?}", err),
8888
}
8989
}
90+
info!("Loading metadata state from sqlite");
91+
store.load_metadata().await?;
92+
info!("Loading metadata complete");
93+
9094
// Get startup time after migrations and vacuum
9195
let startup_time = Utc::now();
9296

@@ -236,5 +240,9 @@ async fn main() -> Result<(), Error> {
236240
.on_completion(log_task_completion("maintenance_task", maintenance_task))
237241
.await;
238242

243+
info!("Flushing metadata to sqlite");
244+
store.flush_metadata().await?;
245+
info!("Shutdown complete");
246+
239247
Ok(())
240248
}

src/store/inflight_activation.rs

Lines changed: 29 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,28 @@ impl InflightActivationStore {
372372
)))
373373
}
374374

375+
/// Load state into the metadata_store from sqlite.
376+
/// Used during application startup to rebuild in-memory state.
377+
#[instrument(skip_all)]
378+
pub async fn load_metadata(&self) -> Result<(), Error> {
379+
self.metadata_store
380+
.lock()
381+
.await
382+
.load_from_sqlite(&self.read_pool)
383+
.await
384+
}
385+
386+
/// Flush any pending state in metadata_store into sqlite
387+
pub async fn flush_metadata(&self) -> Result<(), Error> {
388+
let atomic = self.write_pool.begin().await?;
389+
let res = self.metadata_store.lock().await.commit(atomic).await;
390+
391+
match res {
392+
Ok(_) => Ok(()),
393+
Err(err) => Err(err.into()),
394+
}
395+
}
396+
375397
#[instrument(skip_all)]
376398
pub async fn store(&self, batch: Vec<InflightActivation>) -> Result<QueryResult, Error> {
377399
if batch.is_empty() {
@@ -383,66 +405,9 @@ impl InflightActivationStore {
383405
.map(ActivationMetadata::try_from)
384406
.collect::<Result<Vec<ActivationMetadata>, _>>()?;
385407

386-
let rows = batch
387-
.clone()
388-
.into_iter()
389-
.map(TableRow::try_from)
390-
.collect::<Result<Vec<TableRow>, _>>()?;
391-
392-
let mut query_builder = QueryBuilder::<Sqlite>::new(
393-
"
394-
INSERT INTO inflight_taskactivations
395-
(
396-
id,
397-
activation,
398-
partition,
399-
offset,
400-
added_at,
401-
received_at,
402-
processing_attempts,
403-
expires_at,
404-
delay_until,
405-
processing_deadline_duration,
406-
processing_deadline,
407-
status,
408-
at_most_once,
409-
namespace,
410-
taskname,
411-
on_attempts_exceeded
412-
)
413-
",
414-
);
415-
let query = query_builder
416-
.push_values(rows, |mut b, row| {
417-
b.push_bind(row.id);
418-
b.push_bind(row.activation);
419-
b.push_bind(row.partition);
420-
b.push_bind(row.offset);
421-
b.push_bind(row.added_at.timestamp());
422-
b.push_bind(row.received_at.timestamp());
423-
b.push_bind(row.processing_attempts);
424-
b.push_bind(row.expires_at.map(|t| Some(t.timestamp())));
425-
b.push_bind(row.delay_until.map(|t| Some(t.timestamp())));
426-
b.push_bind(row.processing_deadline_duration);
427-
if let Some(deadline) = row.processing_deadline {
428-
b.push_bind(deadline.timestamp());
429-
} else {
430-
// Add a literal null
431-
b.push("null");
432-
}
433-
b.push_bind(row.status);
434-
b.push_bind(row.at_most_once);
435-
b.push_bind(row.namespace);
436-
b.push_bind(row.taskname);
437-
b.push_bind(row.on_attempts_exceeded as i32);
438-
})
439-
.push(" ON CONFLICT(id) DO NOTHING")
440-
.build();
441408
let mut atomic = self.write_pool.begin().await?;
442-
let meta_result = Ok(query.execute(&mut *atomic).await?.into());
443409

444-
// insert into the separate stores.
445-
// TODO these queries should use one loop.
410+
// Insert into the blob store and metadata
446411
let mut query_builder =
447412
QueryBuilder::<Sqlite>::new("INSERT INTO activation_blobs (id, activation) ");
448413
let query = query_builder
@@ -452,7 +417,7 @@ impl InflightActivationStore {
452417
})
453418
.push(" ON CONFLICT(id) DO NOTHING")
454419
.build();
455-
query.execute(&mut *atomic).await?;
420+
let blob_result = query.execute(&mut *atomic).await?;
456421

457422
{
458423
// append metadata to memory store and flush to sqlite.
@@ -480,7 +445,7 @@ impl InflightActivationStore {
480445
}
481446
metrics::histogram!("store.checkpoint.duration").record(checkpoint_timer.elapsed());
482447

483-
meta_result
448+
Ok(blob_result.into())
484449
}
485450

486451
#[instrument(skip_all)]
@@ -603,10 +568,6 @@ impl InflightActivationStore {
603568
#[instrument(skip_all)]
604569
pub async fn delete_activation(&self, id: &str) -> Result<(), Error> {
605570
self.metadata_store.lock().await.delete(id);
606-
sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1")
607-
.bind(id)
608-
.execute(&self.write_pool)
609-
.await?;
610571
sqlx::query("DELETE FROM activation_blobs WHERE id = $1")
611572
.bind(id)
612573
.execute(&self.write_pool)
@@ -652,9 +613,6 @@ impl InflightActivationStore {
652613

653614
pub async fn clear(&self) -> Result<(), Error> {
654615
let mut atomic = self.write_pool.begin().await?;
655-
sqlx::query("DELETE FROM inflight_taskactivations")
656-
.execute(&mut *atomic)
657-
.await?;
658616
sqlx::query("DELETE FROM activation_blobs")
659617
.execute(&mut *atomic)
660618
.await?;
@@ -689,24 +647,8 @@ impl InflightActivationStore {
689647
/// These tasks are set to status=failure and will be handled by handle_failed_tasks accordingly.
690648
#[instrument(skip_all)]
691649
pub async fn handle_processing_attempts(&self) -> Result<u64, Error> {
692-
// TODO remove this method? It is a no-op with metadata_store
693-
// as processing attempts are handled in get_pending_activation
694-
let processing_attempts_result = sqlx::query(
695-
"UPDATE inflight_taskactivations
696-
SET status = $1
697-
WHERE processing_attempts >= $2 AND status = $3",
698-
)
699-
.bind(InflightActivationStatus::Failure)
700-
.bind(self.config.max_processing_attempts as i32)
701-
.bind(InflightActivationStatus::Pending)
702-
.execute(&self.write_pool)
703-
.await;
704-
705-
if let Ok(query_res) = processing_attempts_result {
706-
return Ok(query_res.rows_affected());
707-
}
708-
709-
Err(anyhow!("Could not update tasks past processing_deadline"))
650+
// TODO This is a no-op with metadata_store
651+
Ok(0)
710652
}
711653

712654
/// Perform upkeep work for tasks that are past expires_at deadlines
@@ -717,17 +659,8 @@ impl InflightActivationStore {
717659
/// The number of impacted records is returned in a Result.
718660
#[instrument(skip_all)]
719661
pub async fn handle_expires_at(&self) -> Result<u64, Error> {
720-
// TODO: Remove this? This is a no-op with the metadata store.
721-
let now = Utc::now();
722-
let query = sqlx::query(
723-
"DELETE FROM inflight_taskactivations WHERE status = $1 AND expires_at IS NOT NULL AND expires_at < $2",
724-
)
725-
.bind(InflightActivationStatus::Pending)
726-
.bind(now.timestamp())
727-
.execute(&self.write_pool)
728-
.await?;
729-
730-
Ok(query.rows_affected())
662+
// TODO: This is a no-op with the metadata store.
663+
Ok(0)
731664
}
732665

733666
/// Perform upkeep work for tasks that are past delay_until deadlines

src/store/metadata_store.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
use chrono::{DateTime, Duration, Utc};
77
use sentry::capture_message;
88
use sentry_protos::taskbroker::v1::OnAttemptsExceeded;
9-
use sqlx::{QueryBuilder, Sqlite, Transaction, sqlite::SqliteQueryResult};
9+
use sqlx::{QueryBuilder, Sqlite, SqlitePool, Transaction, sqlite::SqliteQueryResult};
1010

1111
use crate::store::inflight_activation::InflightActivationStatus;
1212
use crate::store::records::{ActivationMetadata, TimestampEntry};
@@ -78,6 +78,20 @@ impl MetadataStore {
7878
}
7979
}
8080

81+
/// Load metadata state from sqlite into memory.
82+
pub async fn load_from_sqlite(&mut self, connection: &SqlitePool) -> anyhow::Result<()> {
83+
let load_query: Vec<ActivationMetadata> =
84+
sqlx::query_as("SELECT * FROM activation_metadata")
85+
.fetch_all(connection)
86+
.await?
87+
.into_iter()
88+
.collect();
89+
for row in load_query.into_iter() {
90+
self.upsert(row)?;
91+
}
92+
Ok(())
93+
}
94+
8195
/// Insert or update an activation metadata record
8296
pub fn upsert(&mut self, metadata: ActivationMetadata) -> anyhow::Result<()> {
8397
// File by status
@@ -377,7 +391,6 @@ impl MetadataStore {
377391
self.add_to_failed(entry.id.clone());
378392
continue;
379393
}
380-
// TODO: Check if the entry is killed
381394

382395
// Filter by namespace if provided.
383396
if let Some(namespace) = namespace {

0 commit comments

Comments
 (0)