@@ -7,6 +7,10 @@ use crate::IndexMetadata;
77use std:: collections:: HashSet ;
88use tokio:: sync:: mpsc;
99use tokio:: sync:: oneshot;
10+ use tracing:: Instrument ;
11+ use tracing:: debug;
12+ use tracing:: debug_span;
13+ use tracing:: info;
1014
1115#[ derive( Clone , Copy , Debug , PartialEq , Eq ) ]
1216pub enum Status {
@@ -57,44 +61,54 @@ pub(crate) async fn new() -> mpsc::Sender<NodeState> {
5761 const CHANNEL_SIZE : usize = 10 ;
5862 let ( tx, mut rx) = mpsc:: channel ( CHANNEL_SIZE ) ;
5963
60- tokio:: spawn ( async move {
61- let mut status = Status :: Initializing ;
62- let mut idxs = HashSet :: new ( ) ;
63- while let Some ( msg) = rx. recv ( ) . await {
64- match msg {
65- NodeState :: SendEvent ( event) => match event {
66- Event :: ConnectingToDb => {
67- status = Status :: ConnectingToDb ;
68- }
69- Event :: ConnectedToDb => { }
70- Event :: DiscoveringIndexes => {
71- status = Status :: DiscoveringIndexes ;
72- }
73- Event :: IndexesDiscovered ( indexes) => {
74- if indexes. is_empty ( ) {
75- status = Status :: Serving ;
76- continue ;
64+ tokio:: spawn (
65+ async move {
66+ debug ! ( "starting" ) ;
67+
68+ let mut status = Status :: Initializing ;
69+ let mut idxs = HashSet :: new ( ) ;
70+ while let Some ( msg) = rx. recv ( ) . await {
71+ match msg {
72+ NodeState :: SendEvent ( event) => match event {
73+ Event :: ConnectingToDb => {
74+ status = Status :: ConnectingToDb ;
7775 }
78- if status == Status :: DiscoveringIndexes {
79- status = Status :: IndexingEmbeddings ;
80- idxs = indexes;
76+ Event :: ConnectedToDb => { }
77+ Event :: DiscoveringIndexes => {
78+ if status != Status :: Serving {
79+ status = Status :: DiscoveringIndexes ;
80+ }
8181 }
82- }
83- Event :: FullScanFinished ( metadata) => {
84- idxs. remove ( & metadata) ;
85- if idxs. is_empty ( ) {
86- status = Status :: Serving ;
82+ Event :: IndexesDiscovered ( indexes) => {
83+ if indexes. is_empty ( ) && status != Status :: Serving {
84+ status = Status :: Serving ;
85+ info ! ( "Service is running, no indexes to build" ) ;
86+ continue ;
87+ }
88+ if status == Status :: DiscoveringIndexes {
89+ status = Status :: IndexingEmbeddings ;
90+ idxs = indexes;
91+ }
8792 }
93+ Event :: FullScanFinished ( metadata) => {
94+ idxs. remove ( & metadata) ;
95+ if idxs. is_empty ( ) && status != Status :: Serving {
96+ status = Status :: Serving ;
97+ info ! ( "Service is running, finished building indexes" ) ;
98+ }
99+ }
100+ } ,
101+ NodeState :: GetStatus ( tx) => {
102+ tx. send ( status) . unwrap_or_else ( |_| {
103+ tracing:: debug!( "Failed to send current state" ) ;
104+ } ) ;
88105 }
89- } ,
90- NodeState :: GetStatus ( tx) => {
91- tx. send ( status) . unwrap_or_else ( |_| {
92- tracing:: debug!( "Failed to send current state" ) ;
93- } ) ;
94106 }
95107 }
108+ debug ! ( "finished" ) ;
96109 }
97- } ) ;
110+ . instrument ( debug_span ! ( "node_state" ) ) ,
111+ ) ;
98112
99113 tx
100114}
@@ -177,4 +191,43 @@ mod tests {
177191 . await ;
178192 assert_eq ! ( node_state. get_status( ) . await , Status :: Serving ) ;
179193 }
194+
195+ #[ tokio:: test]
196+ async fn status_remains_serving_when_discovering_indexes ( ) {
197+ let node_state = new ( ) . await ;
198+ // Move to Serving status
199+ node_state. send_event ( Event :: ConnectingToDb ) . await ;
200+ node_state. send_event ( Event :: DiscoveringIndexes ) . await ;
201+ node_state
202+ . send_event ( Event :: IndexesDiscovered ( HashSet :: new ( ) ) )
203+ . await ;
204+ assert_eq ! ( node_state. get_status( ) . await , Status :: Serving ) ;
205+
206+ // Try to trigger DiscoveringIndexes again
207+ node_state. send_event ( Event :: DiscoveringIndexes ) . await ;
208+ // Status should remain Serving
209+ let status = node_state. get_status ( ) . await ;
210+ assert_eq ! ( status, Status :: Serving ) ;
211+
212+ let idx = IndexMetadata {
213+ keyspace_name : KeyspaceName ( "test_keyspace" . to_string ( ) ) ,
214+ index_name : IndexName ( "test_index" . to_string ( ) ) ,
215+ table_name : TableName ( "test_table" . to_string ( ) ) ,
216+ target_column : ColumnName ( "test_column" . to_string ( ) ) ,
217+ dimensions : Dimensions ( NonZeroUsize :: new ( 3 ) . unwrap ( ) ) ,
218+ connectivity : Default :: default ( ) ,
219+ expansion_add : Default :: default ( ) ,
220+ expansion_search : Default :: default ( ) ,
221+ space_type : Default :: default ( ) ,
222+ version : Uuid :: new_v4 ( ) . into ( ) ,
223+ } ;
224+
225+ // Simulate discovering an index
226+ node_state
227+ . send_event ( Event :: IndexesDiscovered ( HashSet :: from ( [ idx] ) ) )
228+ . await ;
229+ // Status should remain Serving
230+ let status = node_state. get_status ( ) . await ;
231+ assert_eq ! ( status, Status :: Serving ) ;
232+ }
180233}
0 commit comments