Skip to content

Commit 6489c80

Browse files
committed
Improve rebalance shards algorithm
1 parent 89b81a5 commit 6489c80

File tree

3 files changed

+55
-51
lines changed

3 files changed

+55
-51
lines changed

quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::time::Duration;
2323
use fnv::FnvHashSet;
2424
use futures::StreamExt;
2525
use futures::stream::FuturesUnordered;
26+
use itertools::{Itertools, MinMaxResult};
2627
use quickwit_actors::Mailbox;
2728
use quickwit_common::Progress;
2829
use quickwit_common::pretty::PrettySample;
@@ -81,7 +82,7 @@ const FIRE_AND_FORGET_TIMEOUT: Duration = Duration::from_secs(3);
8182
/// All errors are ignored, and not even logged.
8283
fn fire_and_forget(
8384
fut: impl Future<Output = ()> + Send + 'static,
84-
operation: impl std::fmt::Display + Send + Sync + 'static,
85+
operation: impl std::fmt::Display + Send + 'static,
8586
) {
8687
tokio::spawn(async move {
8788
if let Err(_timeout_elapsed) = tokio::time::timeout(FIRE_AND_FORGET_TIMEOUT, fut).await {
@@ -1008,59 +1009,83 @@ impl IngestController {
10081009
/// If we are missing some ingesters, their shards should still be in the model, but they should
10091010
/// be missing from the ingester pool.
10101011
///
1011-
/// As a result `num_open_shards_per_leader_threshold` should be inflated.
1012+
/// As a result `target_num_open_shards_per_leader` should be inflated.
10121013
///
10131014
/// TODO this implementation does not consider replica.
10141015
fn rebalance_compute_shards_to_move(&self, model: &ControlPlaneModel) -> Vec<Shard> {
1015-
let num_ingesters = self.ingester_pool.len();
1016-
let mut num_open_shards: usize = 0;
1016+
let ingester_ids: Vec<NodeId> = self.ingester_pool.keys();
1017+
let num_ingesters = ingester_ids.len();
10171018

10181019
if num_ingesters == 0 {
1019-
debug!("no ingester available");
1020+
debug!("no ingesters available");
1021+
return Vec::new();
1022+
}
1023+
if num_ingesters < 2 {
10201024
return Vec::new();
10211025
}
1022-
let mut per_leader_open_shards: HashMap<&str, Vec<&ShardEntry>> = HashMap::new();
1026+
let mut num_open_shards: usize = 0;
1027+
let mut per_leader_open_shards: HashMap<&str, Vec<&Shard>> = HashMap::new();
10231028

10241029
for shard in model.all_shards() {
10251030
if shard.is_open() {
10261031
num_open_shards += 1;
10271032
per_leader_open_shards
10281033
.entry(&shard.leader_id)
10291034
.or_default()
1030-
.push(shard);
1035+
.push(&shard.shard);
10311036
}
10321037
}
1033-
// We tolerate an ingester with 10% more shards than the average.
1034-
// Let's first identify the list of shards we want to "move".
1035-
let num_open_shards_per_leader_threshold =
1036-
(num_open_shards * 11).div_ceil(10 * num_ingesters);
1038+
for ingester_id in &ingester_ids {
1039+
per_leader_open_shards
1040+
.entry(ingester_id.as_str())
1041+
.or_default();
1042+
}
1043+
let target_num_open_shards_per_leader = num_open_shards as f32 / num_ingesters as f32;
1044+
let max_num_open_shards_per_leader =
1045+
f32::ceil(target_num_open_shards_per_leader * 1.1) as usize;
1046+
let min_num_open_shards_per_leader =
1047+
f32::floor(target_num_open_shards_per_leader * 0.9) as usize;
1048+
1049+
let mut rng = thread_rng();
1050+
let mut per_leader_open_shard_shuffled: Vec<Vec<&Shard>> = per_leader_open_shards
1051+
.into_values()
1052+
.map(|mut shards| {
1053+
shards.shuffle(&mut rng);
1054+
shards
1055+
})
1056+
.collect();
10371057

10381058
let mut shards_to_move: Vec<Shard> = Vec::new();
10391059

1040-
let mut rng = thread_rng();
1041-
for open_shards in per_leader_open_shards.values() {
1042-
if let Some(num_shards_to_move) = open_shards
1043-
.len()
1044-
.checked_sub(num_open_shards_per_leader_threshold)
1060+
loop {
1061+
let MinMaxResult::MinMax(min_shards, max_shards) = per_leader_open_shard_shuffled
1062+
.iter_mut()
1063+
.minmax_by_key(|shards| shards.len())
1064+
else {
1065+
break;
1066+
};
1067+
if min_shards.len() < min_num_open_shards_per_leader
1068+
|| max_shards.len() > max_num_open_shards_per_leader
10451069
{
1046-
shards_to_move.extend(
1047-
open_shards[..]
1048-
.choose_multiple(&mut rng, num_shards_to_move)
1049-
.map(|shard_entry| shard_entry.shard.clone()),
1050-
);
1070+
let shard = max_shards.pop().expect("shards should not be empty");
1071+
shards_to_move.push(shard.clone());
1072+
min_shards.push(shard);
1073+
} else {
1074+
break;
10511075
}
10521076
}
1053-
let num_shards_to_rebalance = shards_to_move.len();
1077+
let num_shards_to_move = shards_to_move.len();
10541078

1055-
if num_shards_to_rebalance == 0 {
1079+
if num_shards_to_move == 0 {
10561080
info!("no shards to rebalance");
10571081
} else {
10581082
info!(
10591083
num_open_shards,
10601084
num_available_ingesters = num_ingesters,
1061-
rebalance_threshold = num_open_shards_per_leader_threshold,
1062-
num_shards_to_rebalance,
1063-
"rebalancing shards"
1085+
min_shards_threshold = min_num_open_shards_per_leader,
1086+
max_shards_threshold = max_num_open_shards_per_leader,
1087+
num_shards_to_move,
1088+
"rebalancing {num_shards_to_move} shards"
10641089
);
10651090
}
10661091
shards_to_move

quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,11 +297,6 @@ impl StableLogMergePolicy {
297297
}
298298
}
299299

300-
#[cfg(test)]
301-
fn is_sorted(elements: &[usize]) -> bool {
302-
elements.windows(2).all(|w| w[0] <= w[1])
303-
}
304-
305300
// Helpers which expose some internal properties of
306301
// the stable log merge policy to be tested in unit tests.
307302
#[cfg(test)]
@@ -337,7 +332,8 @@ impl StableLogMergePolicy {
337332
levels: &[usize],
338333
sorted: bool,
339334
) -> usize {
340-
assert!(is_sorted(levels));
335+
assert!(levels.is_sorted());
336+
341337
if num_docs == 0 {
342338
return 0;
343339
}

quickwit/quickwit-search/src/tests.rs

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::cmp::Ordering;
1516
use std::collections::{BTreeMap, BTreeSet};
1617

1718
use assert_json_diff::{assert_json_eq, assert_json_include};
@@ -260,24 +261,6 @@ async fn test_slop_queries() {
260261
test_sandbox.assert_quit().await;
261262
}
262263

263-
// TODO remove me once `Iterator::is_sorted_by_key` is stabilized.
264-
fn is_reverse_sorted<E, I: Iterator<Item = E>>(mut it: I) -> bool
265-
where E: Ord {
266-
let mut previous_el = if let Some(first_el) = it.next() {
267-
first_el
268-
} else {
269-
// The empty list is sorted!
270-
return true;
271-
};
272-
for next_el in it {
273-
if next_el > previous_el {
274-
return false;
275-
}
276-
previous_el = next_el;
277-
}
278-
true
279-
}
280-
281264
#[tokio::test]
282265
async fn test_single_node_several_splits() -> anyhow::Result<()> {
283266
let index_id = "single-node-several-splits";
@@ -325,7 +308,7 @@ async fn test_single_node_several_splits() -> anyhow::Result<()> {
325308
.as_ref()
326309
.map(|partial_hit| (partial_hit.split_id.as_str(), partial_hit.doc_id as i32))
327310
});
328-
assert!(is_reverse_sorted(hit_keys));
311+
assert!(hit_keys.is_sorted_by(|left, right| left.cmp(right) == Ordering::Greater));
329312
assert!(single_node_result.elapsed_time_micros > 10);
330313
assert!(single_node_result.elapsed_time_micros < 1_000_000);
331314
test_sandbox.assert_quit().await;

0 commit comments

Comments
 (0)