Skip to content

Commit 951986f

Browse files
authored
Fix fullscan hang when losing connection (#231)
We have detected an annoying issue that stops the fullscan without a way to resume when the connection is lost, losing hours of index building time. This PR fixes the issue by adding a retry loop to the fullscan. Fixes: VECTOR-193
2 parents 7602c4d + 551b1b8 commit 951986f

File tree

8 files changed

+256
-19
lines changed

8 files changed

+256
-19
lines changed

crates/validator/src/common.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::vector_store_cluster::VectorStoreClusterExt;
1010
use httpclient::HttpClient;
1111
use scylla::client::session::Session;
1212
use scylla::client::session_builder::SessionBuilder;
13+
use std::net::Ipv4Addr;
1314
use std::sync::Arc;
1415
use std::time::Duration;
1516
use tokio::time;
@@ -23,21 +24,29 @@ pub(crate) const DB_PORT: u16 = 9042;
2324
pub(crate) const VS_OCTET: u8 = 1;
2425
pub(crate) const DB_OCTET: u8 = 2;
2526

27+
pub(crate) async fn get_default_vs_url(actors: &TestActors) -> String {
28+
format!(
29+
"http://{}.{}:{}",
30+
VS_NAME,
31+
actors.dns.domain().await,
32+
VS_PORT
33+
)
34+
}
35+
36+
pub(crate) fn get_default_db_ip(actors: &TestActors) -> Ipv4Addr {
37+
actors.services_subnet.ip(DB_OCTET)
38+
}
39+
2640
pub(crate) async fn init(actors: TestActors) {
2741
info!("started");
2842

2943
let vs_ip = actors.services_subnet.ip(VS_OCTET);
3044

3145
actors.dns.upsert(VS_NAME.to_string(), vs_ip).await;
3246

33-
let vs_url = format!(
34-
"http://{}.{}:{}",
35-
VS_NAME,
36-
actors.dns.domain().await,
37-
VS_PORT
38-
);
47+
let vs_url = get_default_vs_url(&actors).await;
3948

40-
let db_ip = actors.services_subnet.ip(DB_OCTET);
49+
let db_ip = get_default_db_ip(&actors);
4150

4251
actors.db.start(vs_url, db_ip, None).await;
4352
assert!(actors.db.wait_for_ready().await);
@@ -59,7 +68,7 @@ pub(crate) async fn cleanup(actors: TestActors) {
5968
info!("finished");
6069
}
6170

62-
pub(crate) async fn prepare_connection(actors: TestActors) -> (Arc<Session>, HttpClient) {
71+
pub(crate) async fn prepare_connection(actors: &TestActors) -> (Arc<Session>, HttpClient) {
6372
let session = Arc::new(
6473
SessionBuilder::new()
6574
.known_node(actors.services_subnet.ip(DB_OCTET).to_string())

crates/validator/src/scylla_cluster.rs

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55

66
use std::net::Ipv4Addr;
7+
use std::path::Path;
78
use std::path::PathBuf;
89
use std::process::Stdio;
910
use std::time::Duration;
@@ -31,6 +32,11 @@ pub(crate) enum ScyllaCluster {
3132
tx: oneshot::Sender<bool>,
3233
},
3334
Stop,
35+
Up {
36+
vs_uri: String,
37+
conf: Option<Vec<u8>>,
38+
},
39+
Down,
3440
}
3541

3642
pub(crate) trait ScyllaClusterExt {
@@ -45,6 +51,12 @@ pub(crate) trait ScyllaClusterExt {
4551

4652
/// Waits for the ScyllaDB cluster to be ready.
4753
async fn wait_for_ready(&self) -> bool;
54+
55+
/// Starts a paused instance back again.
56+
async fn up(&self, vs_uri: String, conf: Option<Vec<u8>>);
57+
58+
/// Pauses an instance
59+
async fn down(&self);
4860
}
4961

5062
impl ScyllaClusterExt for mpsc::Sender<ScyllaCluster> {
@@ -81,6 +93,18 @@ impl ScyllaClusterExt for mpsc::Sender<ScyllaCluster> {
8193
rx.await
8294
.expect("ScyllaClusterExt::wait_for_ready: internal actor should send response")
8395
}
96+
97+
async fn up(&self, vs_uri: String, conf: Option<Vec<u8>>) {
98+
self.send(ScyllaCluster::Up { vs_uri, conf })
99+
.await
100+
.expect("ScyllaClusterExt::up: internal actor should receive request")
101+
}
102+
103+
async fn down(&self) {
104+
self.send(ScyllaCluster::Down)
105+
.await
106+
.expect("ScyllaClusterExt::down: internal actor should receive request")
107+
}
84108
}
85109

86110
pub(crate) async fn new(
@@ -175,13 +199,26 @@ async fn process(msg: ScyllaCluster, state: &mut State) {
175199
tx.send(wait_for_ready(state).await)
176200
.expect("process ScyllaCluster::WaitForReady: failed to send a response");
177201
}
202+
203+
ScyllaCluster::Up { vs_uri, conf } => {
204+
up(vs_uri, conf, state).await;
205+
}
206+
207+
ScyllaCluster::Down => {
208+
down(state).await;
209+
}
178210
}
179211
}
180212

181-
async fn start(vs_uri: String, db_ip: Ipv4Addr, conf: Option<Vec<u8>>, state: &mut State) {
182-
let workdir = TempDir::new().expect("start: failed to create temporary directory for scylladb");
213+
async fn run_cluster(
214+
vs_uri: &String,
215+
db_ip: &Ipv4Addr,
216+
conf: &Option<Vec<u8>>,
217+
path: &Path,
218+
state: &mut State,
219+
) {
183220
let conf = if let Some(conf) = conf {
184-
let conf_path = workdir.path().join("scylla.conf");
221+
let conf_path = path.join("scylla.conf");
185222
fs::write(&conf_path, conf)
186223
.await
187224
.expect("start: failed to write scylla config");
@@ -198,7 +235,7 @@ async fn start(vs_uri: String, db_ip: Ipv4Addr, conf: Option<Vec<u8>>, state: &m
198235
.arg("--options-file")
199236
.arg(&conf)
200237
.arg("--workdir")
201-
.arg(workdir.path())
238+
.arg(path)
202239
.arg("--listen-address")
203240
.arg(db_ip.to_string())
204241
.arg("--rpc-address")
@@ -216,6 +253,11 @@ async fn start(vs_uri: String, db_ip: Ipv4Addr, conf: Option<Vec<u8>>, state: &m
216253
.spawn()
217254
.expect("start: failed to spawn scylladb"),
218255
);
256+
}
257+
258+
async fn start(vs_uri: String, db_ip: Ipv4Addr, conf: Option<Vec<u8>>, state: &mut State) {
259+
let workdir = TempDir::new().expect("start: failed to create temporary directory for scylladb");
260+
run_cluster(&vs_uri, &db_ip, &conf, workdir.path(), state).await;
219261
state.workdir = Some(workdir);
220262
state.db_ip = Some(db_ip);
221263
}
@@ -236,6 +278,20 @@ async fn stop(state: &mut State) {
236278
state.db_ip = None;
237279
}
238280

281+
async fn down(state: &mut State) {
282+
let Some(mut child) = state.child.take() else {
283+
return;
284+
};
285+
child
286+
.start_kill()
287+
.expect("stop: failed to send SIGTERM to scylladb process");
288+
child
289+
.wait()
290+
.await
291+
.expect("stop: failed to wait for scylladb process to exit");
292+
state.child = None;
293+
}
294+
239295
/// Waits for ScyllaDB to be ready by checking the nodetool status.
240296
async fn wait_for_ready(state: &State) -> bool {
241297
let Some(db_ip) = state.db_ip else {
@@ -262,3 +318,15 @@ async fn wait_for_ready(state: &State) -> bool {
262318
time::sleep(Duration::from_millis(100)).await;
263319
}
264320
}
321+
322+
async fn up(vs_uri: String, conf: Option<Vec<u8>>, state: &mut State) {
323+
let db_ip = state.db_ip.expect("State should have DB IP");
324+
let path = state
325+
.workdir
326+
.as_ref()
327+
.expect("State should have workdir")
328+
.path()
329+
.to_path_buf();
330+
331+
run_cluster(&vs_uri, &db_ip, &conf, &path, state).await;
332+
}

crates/validator/src/tests/crud.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub(crate) async fn new() -> TestCase {
2828
async fn simple_create_drop_index(actors: TestActors) {
2929
info!("started");
3030

31-
let (session, client) = prepare_connection(actors).await;
31+
let (session, client) = prepare_connection(&actors).await;
3232

3333
session.query_unpaged(
3434
"CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}",
@@ -83,7 +83,7 @@ async fn simple_create_drop_index(actors: TestActors) {
8383
async fn simple_create_drop_multiple_indexes(actors: TestActors) {
8484
info!("started");
8585

86-
let (session, client) = prepare_connection(actors).await;
86+
let (session, client) = prepare_connection(&actors).await;
8787

8888
// Create keyspace
8989
// Different keyspace name have to be used until the issue VECTOR-213 is fixed.

crates/validator/src/tests/full_scan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub(crate) async fn new() -> TestCase {
2323
async fn full_scan_is_completed_when_responding_to_messages_concurrently(actors: TestActors) {
2424
info!("started");
2525

26-
let (session, client) = prepare_connection(actors).await;
26+
let (session, client) = prepare_connection(&actors).await;
2727

2828
session.query_unpaged(
2929
"CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}",

crates/validator/src/tests/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
mod crud;
77
mod full_scan;
8+
mod reconnect;
89
mod serde;
910

1011
use crate::ServicesSubnet;
@@ -197,6 +198,7 @@ pub(crate) async fn register() -> Vec<(String, TestCase)> {
197198
("crud", crud::new().await),
198199
("full_scan", full_scan::new().await),
199200
("serde", serde::new().await),
201+
("reconnect", reconnect::new().await),
200202
]
201203
.into_iter()
202204
.map(|(name, test_case)| (name.to_string(), test_case))
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright 2025-present ScyllaDB
3+
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
4+
*/
5+
6+
use crate::common::*;
7+
use crate::scylla_cluster::ScyllaClusterExt;
8+
use crate::tests::*;
9+
use std::time::Duration;
10+
use tokio::time::sleep;
11+
use tracing::info;
12+
13+
pub(crate) async fn new() -> TestCase {
14+
let timeout = Duration::from_secs(30);
15+
TestCase::empty()
16+
.with_init(timeout, init)
17+
.with_cleanup(timeout, cleanup)
18+
.with_test(
19+
"reconnect_doesnt_break_fullscan",
20+
timeout,
21+
reconnect_doesnt_break_fullscan,
22+
)
23+
}
24+
25+
async fn reconnect_doesnt_break_fullscan(actors: TestActors) {
26+
info!("started");
27+
28+
let (session, client) = prepare_connection(&actors).await;
29+
30+
session.query_unpaged(
31+
"CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}",
32+
(),
33+
).await.expect("failed to create a keyspace");
34+
35+
session
36+
.use_keyspace("ks", false)
37+
.await
38+
.expect("failed to use a keyspace");
39+
40+
session
41+
.query_unpaged(
42+
"
43+
CREATE TABLE tbl (id INT PRIMARY KEY, embedding VECTOR<FLOAT, 3>)
44+
WITH cdc = {'enabled': true }
45+
",
46+
(),
47+
)
48+
.await
49+
.expect("failed to create a table");
50+
51+
let stmt = session
52+
.prepare("INSERT INTO tbl (id, embedding) VALUES (?, [1.0, 2.0, 3.0])")
53+
.await
54+
.expect("failed to prepare a statement");
55+
56+
for id in 0..1000 {
57+
session
58+
.execute_unpaged(&stmt, (id,))
59+
.await
60+
.expect("failed to insert a row");
61+
}
62+
63+
session
64+
.query_unpaged(
65+
"CREATE INDEX idx ON tbl(embedding) USING 'vector_index'",
66+
(),
67+
)
68+
.await
69+
.expect("failed to create an index");
70+
71+
while client.indexes().await.is_empty() {}
72+
let indexes = client.indexes().await;
73+
assert_eq!(indexes.len(), 1);
74+
let index = &indexes[0];
75+
assert_eq!(index.keyspace.as_ref(), "ks");
76+
assert_eq!(index.index.as_ref(), "idx");
77+
78+
let result = session
79+
.query_unpaged(
80+
"SELECT * FROM tbl ORDER BY embedding ANN OF [1.0, 2.0, 3.0] LIMIT 1",
81+
(),
82+
)
83+
.await;
84+
85+
match &result {
86+
Err(e) if format!("{e:?}").contains("503 Service Unavailable") => {}
87+
_ => panic!("Expected SERVICE_UNAVAILABLE error, got: {result:?}"),
88+
}
89+
90+
actors.db.down().await;
91+
92+
sleep(Duration::from_secs(1)).await;
93+
let count = client.count(&index.keyspace, &index.index).await;
94+
assert!(count.is_some() && count.unwrap() < 1000);
95+
actors.db.up(get_default_vs_url(&actors).await, None).await;
96+
97+
assert!(actors.db.wait_for_ready().await);
98+
99+
wait_for(
100+
|| async {
101+
session
102+
.query_unpaged(
103+
"SELECT * FROM tbl ORDER BY embedding ANN OF [1.0, 2.0, 3.0] LIMIT 1",
104+
(),
105+
)
106+
.await
107+
.is_ok()
108+
},
109+
"Waiting for index build",
110+
Duration::from_secs(10),
111+
)
112+
.await;
113+
114+
session
115+
.query_unpaged("DROP KEYSPACE ks", ())
116+
.await
117+
.expect("failed to drop a keyspace");
118+
119+
info!("finished");
120+
}

crates/validator/src/tests/serde.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub(crate) async fn new() -> TestCase {
1919
}
2020

2121
async fn test_serialization_deserialization_all_types(actors: TestActors) {
22-
let (session, _client) = crate::common::prepare_connection(actors).await;
22+
let (session, _client) = crate::common::prepare_connection(&actors).await;
2323

2424
let cases = vec![
2525
("ascii", "'random_text'"),

0 commit comments

Comments
 (0)