@@ -372,6 +372,28 @@ impl InflightActivationStore {
372372 ) ) )
373373 }
374374
375+ /// Load state into the metadata_store from sqlite.
376+ /// Used during application startup to rebuild in-memory state.
377+ #[ instrument( skip_all) ]
378+ pub async fn load_metadata ( & self ) -> Result < ( ) , Error > {
379+ self . metadata_store
380+ . lock ( )
381+ . await
382+ . load_from_sqlite ( & self . read_pool )
383+ . await
384+ }
385+
386+ /// Flush any pending state in metadata_store into sqlite
387+ pub async fn flush_metadata ( & self ) -> Result < ( ) , Error > {
388+ let atomic = self . write_pool . begin ( ) . await ?;
389+ let res = self . metadata_store . lock ( ) . await . commit ( atomic) . await ;
390+
391+ match res {
392+ Ok ( _) => Ok ( ( ) ) ,
393+ Err ( err) => Err ( err. into ( ) ) ,
394+ }
395+ }
396+
375397 #[ instrument( skip_all) ]
376398 pub async fn store ( & self , batch : Vec < InflightActivation > ) -> Result < QueryResult , Error > {
377399 if batch. is_empty ( ) {
@@ -383,66 +405,9 @@ impl InflightActivationStore {
383405 . map ( ActivationMetadata :: try_from)
384406 . collect :: < Result < Vec < ActivationMetadata > , _ > > ( ) ?;
385407
386- let rows = batch
387- . clone ( )
388- . into_iter ( )
389- . map ( TableRow :: try_from)
390- . collect :: < Result < Vec < TableRow > , _ > > ( ) ?;
391-
392- let mut query_builder = QueryBuilder :: < Sqlite > :: new (
393- "
394- INSERT INTO inflight_taskactivations
395- (
396- id,
397- activation,
398- partition,
399- offset,
400- added_at,
401- received_at,
402- processing_attempts,
403- expires_at,
404- delay_until,
405- processing_deadline_duration,
406- processing_deadline,
407- status,
408- at_most_once,
409- namespace,
410- taskname,
411- on_attempts_exceeded
412- )
413- " ,
414- ) ;
415- let query = query_builder
416- . push_values ( rows, |mut b, row| {
417- b. push_bind ( row. id ) ;
418- b. push_bind ( row. activation ) ;
419- b. push_bind ( row. partition ) ;
420- b. push_bind ( row. offset ) ;
421- b. push_bind ( row. added_at . timestamp ( ) ) ;
422- b. push_bind ( row. received_at . timestamp ( ) ) ;
423- b. push_bind ( row. processing_attempts ) ;
424- b. push_bind ( row. expires_at . map ( |t| Some ( t. timestamp ( ) ) ) ) ;
425- b. push_bind ( row. delay_until . map ( |t| Some ( t. timestamp ( ) ) ) ) ;
426- b. push_bind ( row. processing_deadline_duration ) ;
427- if let Some ( deadline) = row. processing_deadline {
428- b. push_bind ( deadline. timestamp ( ) ) ;
429- } else {
430- // Add a literal null
431- b. push ( "null" ) ;
432- }
433- b. push_bind ( row. status ) ;
434- b. push_bind ( row. at_most_once ) ;
435- b. push_bind ( row. namespace ) ;
436- b. push_bind ( row. taskname ) ;
437- b. push_bind ( row. on_attempts_exceeded as i32 ) ;
438- } )
439- . push ( " ON CONFLICT(id) DO NOTHING" )
440- . build ( ) ;
441408 let mut atomic = self . write_pool . begin ( ) . await ?;
442- let meta_result = Ok ( query. execute ( & mut * atomic) . await ?. into ( ) ) ;
443409
444- // insert into the separate stores.
445- // TODO these queries should use one loop.
410+ // Insert into the blob store and metadata
446411 let mut query_builder =
447412 QueryBuilder :: < Sqlite > :: new ( "INSERT INTO activation_blobs (id, activation) " ) ;
448413 let query = query_builder
@@ -452,7 +417,7 @@ impl InflightActivationStore {
452417 } )
453418 . push ( " ON CONFLICT(id) DO NOTHING" )
454419 . build ( ) ;
455- query. execute ( & mut * atomic) . await ?;
420+ let blob_result = query. execute ( & mut * atomic) . await ?;
456421
457422 {
458423 // append metadata to memory store and flush to sqlite.
@@ -470,7 +435,7 @@ impl InflightActivationStore {
470435 metrics:: gauge!( "store.pages_committed_to_db" )
471436 . set ( checkpoint_result. get :: < i32 , _ > ( "checkpointed" ) ) ;
472437
473- meta_result
438+ Ok ( blob_result . into ( ) )
474439 }
475440
476441 #[ instrument( skip_all) ]
@@ -593,10 +558,6 @@ impl InflightActivationStore {
593558 #[ instrument( skip_all) ]
594559 pub async fn delete_activation ( & self , id : & str ) -> Result < ( ) , Error > {
595560 self . metadata_store . lock ( ) . await . delete ( id) ;
596- sqlx:: query ( "DELETE FROM inflight_taskactivations WHERE id = $1" )
597- . bind ( id)
598- . execute ( & self . write_pool )
599- . await ?;
600561 sqlx:: query ( "DELETE FROM activation_blobs WHERE id = $1" )
601562 . bind ( id)
602563 . execute ( & self . write_pool )
@@ -642,9 +603,6 @@ impl InflightActivationStore {
642603
643604 pub async fn clear ( & self ) -> Result < ( ) , Error > {
644605 let mut atomic = self . write_pool . begin ( ) . await ?;
645- sqlx:: query ( "DELETE FROM inflight_taskactivations" )
646- . execute ( & mut * atomic)
647- . await ?;
648606 sqlx:: query ( "DELETE FROM activation_blobs" )
649607 . execute ( & mut * atomic)
650608 . await ?;
@@ -679,24 +637,8 @@ impl InflightActivationStore {
679637 /// These tasks are set to status=failure and will be handled by handle_failed_tasks accordingly.
680638 #[ instrument( skip_all) ]
681639 pub async fn handle_processing_attempts ( & self ) -> Result < u64 , Error > {
682- // TODO remove this method? It is a no-op with metadata_store
683- // as processing attempts are handled in get_pending_activation
684- let processing_attempts_result = sqlx:: query (
685- "UPDATE inflight_taskactivations
686- SET status = $1
687- WHERE processing_attempts >= $2 AND status = $3" ,
688- )
689- . bind ( InflightActivationStatus :: Failure )
690- . bind ( self . config . max_processing_attempts as i32 )
691- . bind ( InflightActivationStatus :: Pending )
692- . execute ( & self . write_pool )
693- . await ;
694-
695- if let Ok ( query_res) = processing_attempts_result {
696- return Ok ( query_res. rows_affected ( ) ) ;
697- }
698-
699- Err ( anyhow ! ( "Could not update tasks past processing_deadline" ) )
640+ // TODO This is a no-op with metadata_store
641+ Ok ( 0 )
700642 }
701643
702644 /// Perform upkeep work for tasks that are past expires_at deadlines
@@ -707,17 +649,8 @@ impl InflightActivationStore {
707649 /// The number of impacted records is returned in a Result.
708650 #[ instrument( skip_all) ]
709651 pub async fn handle_expires_at ( & self ) -> Result < u64 , Error > {
710- // TODO: Remove this? This is a no-op with the metadata store.
711- let now = Utc :: now ( ) ;
712- let query = sqlx:: query (
713- "DELETE FROM inflight_taskactivations WHERE status = $1 AND expires_at IS NOT NULL AND expires_at < $2" ,
714- )
715- . bind ( InflightActivationStatus :: Pending )
716- . bind ( now. timestamp ( ) )
717- . execute ( & self . write_pool )
718- . await ?;
719-
720- Ok ( query. rows_affected ( ) )
652+ // TODO: This is a no-op with the metadata store.
653+ Ok ( 0 )
721654 }
722655
723656 /// Perform upkeep work for tasks that are past delay_until deadlines
0 commit comments