Skip to content

Commit 6b97f6d

Browse files
authored
vector-store: fix fullscan in progress status (#288)
The are issues with wrong index status during fullscan - some vectors are still in the pipeline when status is changing to SERVING meaning fullscan finished. The patch introduces marker for asynchronous operation in pipeline - now fullscan waits for all add operation to finish.
2 parents a018311 + 6a04610 commit 6b97f6d

File tree

8 files changed

+166
-69
lines changed

8 files changed

+166
-69
lines changed

crates/vector-store/src/db.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
44
*/
55

6+
use crate::AsyncInProgress;
67
use crate::ColumnName;
78
use crate::Connectivity;
89
use crate::Credentials;
@@ -53,7 +54,10 @@ use tracing::trace;
5354
use tracing::warn;
5455
use uuid::Uuid;
5556

56-
type GetDbIndexR = anyhow::Result<(mpsc::Sender<DbIndex>, mpsc::Receiver<DbEmbedding>)>;
57+
type GetDbIndexR = anyhow::Result<(
58+
mpsc::Sender<DbIndex>,
59+
mpsc::Receiver<(DbEmbedding, Option<AsyncInProgress>)>,
60+
)>;
5761
pub(crate) type LatestSchemaVersionR = anyhow::Result<Option<CqlTimeuuid>>;
5862
type GetIndexesR = anyhow::Result<Vec<DbCustomIndex>>;
5963
type GetIndexVersionR = anyhow::Result<Option<IndexVersion>>;

crates/vector-store/src/db_index.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
44
*/
55

6+
use crate::AsyncInProgress;
67
use crate::ColumnName;
78
use crate::DbEmbedding;
89
use crate::IndexMetadata;
@@ -112,7 +113,10 @@ pub(crate) async fn new(
112113
db_session: Arc<Session>,
113114
metadata: IndexMetadata,
114115
node_state: Sender<NodeState>,
115-
) -> anyhow::Result<(mpsc::Sender<DbIndex>, mpsc::Receiver<DbEmbedding>)> {
116+
) -> anyhow::Result<(
117+
mpsc::Sender<DbIndex>,
118+
mpsc::Receiver<(DbEmbedding, Option<AsyncInProgress>)>,
119+
)> {
116120
let id = metadata.id();
117121

118122
// TODO: The value of channel size was taken from initial benchmarks. Needs more testing
@@ -313,7 +317,7 @@ impl Statements {
313317
/// to send read embeddings into the pipeline.
314318
async fn initial_scan(
315319
&self,
316-
tx: mpsc::Sender<DbEmbedding>,
320+
tx: mpsc::Sender<(DbEmbedding, Option<AsyncInProgress>)>,
317321
completed_scan_length: Arc<AtomicU64>,
318322
) {
319323
let semaphore_capacity = self.nr_parallel_queries().get();
@@ -324,14 +328,27 @@ impl Statements {
324328

325329
let range_scan = self.preform_range_scan(begin, end).await;
326330
if let Ok(embeddings) = range_scan {
327-
let tx: Sender<DbEmbedding> = tx.clone();
331+
let tx = tx.clone();
328332
let scan_length = completed_scan_length.clone();
329333
tokio::spawn(async move {
334+
let (tx_in_progress, mut rx_in_progress) = mpsc::channel(1);
330335
embeddings
331-
.for_each(|embedding| async {
332-
_ = tx.send(embedding).await;
336+
.for_each(move |embedding| {
337+
let tx = tx.clone();
338+
let tx_in_progress = tx_in_progress.clone();
339+
async move {
340+
_ = tx
341+
.send((embedding, Some(AsyncInProgress(tx_in_progress))))
342+
.await;
343+
}
333344
})
334345
.await;
346+
347+
// wait until all in-progress markers are dropped
348+
while rx_in_progress.recv().await.is_some() {
349+
rx_in_progress.len();
350+
}
351+
335352
//Safety: end > begin, and the range fits into u64
336353
scan_length.fetch_add(
337354
end.value().abs_diff(begin.value() - 1),
@@ -493,7 +510,7 @@ impl Statements {
493510
struct CdcConsumerData {
494511
primary_key_columns: Vec<ColumnName>,
495512
target_column: ColumnName,
496-
tx: mpsc::Sender<DbEmbedding>,
513+
tx: mpsc::Sender<(DbEmbedding, Option<AsyncInProgress>)>,
497514
gregorian_epoch: OffsetDateTime,
498515
}
499516

@@ -563,11 +580,14 @@ impl Consumer for CdcConsumer {
563580
_ = self
564581
.0
565582
.tx
566-
.send(DbEmbedding {
567-
primary_key,
568-
embedding,
569-
timestamp,
570-
})
583+
.send((
584+
DbEmbedding {
585+
primary_key,
586+
embedding,
587+
timestamp,
588+
},
589+
None,
590+
))
571591
.await;
572592
Ok(())
573593
}
@@ -586,7 +606,7 @@ impl CdcConsumerFactory {
586606
fn new(
587607
session: Arc<Session>,
588608
metadata: &IndexMetadata,
589-
tx: mpsc::Sender<DbEmbedding>,
609+
tx: mpsc::Sender<(DbEmbedding, Option<AsyncInProgress>)>,
590610
) -> anyhow::Result<Self> {
591611
let cluster_state = session.get_cluster_state();
592612
let table = cluster_state

crates/vector-store/src/index/actor.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
44
*/
55

6+
use crate::AsyncInProgress;
67
use crate::Distance;
78
use crate::Limit;
89
use crate::PrimaryKey;
@@ -17,9 +18,11 @@ pub enum Index {
1718
AddOrReplace {
1819
primary_key: PrimaryKey,
1920
embedding: Vector,
21+
in_progress: Option<AsyncInProgress>,
2022
},
2123
Remove {
2224
primary_key: PrimaryKey,
25+
in_progress: Option<AsyncInProgress>,
2326
},
2427
Ann {
2528
embedding: Vector,
@@ -32,26 +35,40 @@ pub enum Index {
3235
}
3336

3437
pub(crate) trait IndexExt {
35-
async fn add_or_replace(&self, primary_key: PrimaryKey, embedding: Vector);
36-
async fn remove(&self, primary_key: PrimaryKey);
38+
async fn add_or_replace(
39+
&self,
40+
primary_key: PrimaryKey,
41+
embedding: Vector,
42+
in_progress: Option<AsyncInProgress>,
43+
);
44+
async fn remove(&self, primary_key: PrimaryKey, in_progress: Option<AsyncInProgress>);
3745
async fn ann(&self, embedding: Vector, limit: Limit) -> AnnR;
3846
async fn count(&self) -> CountR;
3947
}
4048

4149
impl IndexExt for mpsc::Sender<Index> {
42-
async fn add_or_replace(&self, primary_key: PrimaryKey, embedding: Vector) {
50+
async fn add_or_replace(
51+
&self,
52+
primary_key: PrimaryKey,
53+
embedding: Vector,
54+
in_progress: Option<AsyncInProgress>,
55+
) {
4356
self.send(Index::AddOrReplace {
4457
primary_key,
4558
embedding,
59+
in_progress,
4660
})
4761
.await
4862
.expect("internal actor should receive request");
4963
}
5064

51-
async fn remove(&self, primary_key: PrimaryKey) {
52-
self.send(Index::Remove { primary_key })
53-
.await
54-
.expect("internal actor should receive request");
65+
async fn remove(&self, primary_key: PrimaryKey, in_progress: Option<AsyncInProgress>) {
66+
self.send(Index::Remove {
67+
primary_key,
68+
in_progress,
69+
})
70+
.await
71+
.expect("internal actor should receive request");
5572
}
5673

5774
async fn ann(&self, embedding: Vector, limit: Limit) -> AnnR {

crates/vector-store/src/index/opensearch.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,12 @@ async fn process(
255255
Index::AddOrReplace {
256256
primary_key,
257257
embedding,
258+
in_progress: _in_progress,
258259
} => add_or_replace(id, keys, opensearch_key, primary_key, embedding, client).await,
259-
Index::Remove { primary_key } => remove(id, keys, primary_key, client).await,
260+
Index::Remove {
261+
primary_key,
262+
in_progress: _in_progress,
263+
} => remove(id, keys, primary_key, client).await,
260264
Index::Ann {
261265
embedding,
262266
limit,

crates/vector-store/src/index/usearch.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,15 @@ fn process(
158158
Index::AddOrReplace {
159159
primary_key,
160160
embedding,
161+
in_progress: _in_progress,
161162
} => {
162163
add_or_replace(idx, keys, usearch_key, primary_key, embedding, allocate);
163164
}
164165

165-
Index::Remove { primary_key } => {
166+
Index::Remove {
167+
primary_key,
168+
in_progress: _in_progress,
169+
} => {
166170
remove(idx, keys, primary_key);
167171
}
168172

@@ -339,18 +343,21 @@ mod tests {
339343
.add_or_replace(
340344
vec![CqlValue::Int(1), CqlValue::Text("one".to_string())].into(),
341345
vec![1., 1., 1.].into(),
346+
None,
342347
)
343348
.await;
344349
actor
345350
.add_or_replace(
346351
vec![CqlValue::Int(2), CqlValue::Text("two".to_string())].into(),
347352
vec![2., -2., 2.].into(),
353+
None,
348354
)
349355
.await;
350356
actor
351357
.add_or_replace(
352358
vec![CqlValue::Int(3), CqlValue::Text("three".to_string())].into(),
353359
vec![3., 3., 3.].into(),
360+
None,
354361
)
355362
.await;
356363

@@ -380,6 +387,7 @@ mod tests {
380387
.add_or_replace(
381388
vec![CqlValue::Int(3), CqlValue::Text("three".to_string())].into(),
382389
vec![2.1, -2.1, 2.1].into(),
390+
None,
383391
)
384392
.await;
385393

@@ -403,7 +411,10 @@ mod tests {
403411
.unwrap();
404412

405413
actor
406-
.remove(vec![CqlValue::Int(3), CqlValue::Text("three".to_string())].into())
414+
.remove(
415+
vec![CqlValue::Int(3), CqlValue::Text("three".to_string())].into(),
416+
None,
417+
)
407418
.await;
408419

409420
time::timeout(Duration::from_secs(10), async {

crates/vector-store/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use tokio::runtime::Builder;
4242
use tokio::runtime::Handle;
4343
use tokio::signal;
4444
use tokio::sync::Semaphore;
45+
use tokio::sync::mpsc;
4546
use tokio::sync::mpsc::Sender;
4647
use tokio::task;
4748
use utoipa::PartialSchema;
@@ -494,6 +495,11 @@ impl From<SocketAddr> for HttpServerConfig {
494495
}
495496
}
496497

498+
#[derive(Clone)]
499+
/// Marker struct to indicate that an async operation is in progress.
500+
#[allow(dead_code)]
501+
pub struct AsyncInProgress(mpsc::Sender<()>);
502+
497503
pub fn block_on<Output>(threads: Option<usize>, f: impl AsyncFnOnce() -> Output) -> Output {
498504
let mut builder = match threads {
499505
Some(0) | None => Builder::new_multi_thread(),

0 commit comments

Comments
 (0)