Skip to content

Commit 2357cb2

Browse files
authored
Ignore async entries when leadership or term is changed (#465)
* ignore async entries when leadership or term is changed Signed-off-by: Connor1996 <[email protected]>
1 parent 95c5326 commit 2357cb2

File tree

4 files changed

+83
-14
lines changed

4 files changed

+83
-14
lines changed

harness/tests/integration_cases/test_raw_node.rs

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -865,15 +865,7 @@ fn test_bounded_uncommitted_entries_growth_with_partition() {
865865
raw_node.propose(vec![], data).unwrap();
866866
}
867867

868-
// Test entries are handled properly when they are fetched asynchronously
869-
#[test]
870-
fn test_raw_node_with_async_entries() {
871-
let l = default_logger();
872-
let mut cfg = new_test_config(1, 10, 1);
873-
cfg.max_size_per_msg = 2048;
874-
let s = new_storage();
875-
let mut raw_node = new_raw_node_with_config(vec![1, 2], &cfg, s.clone(), &l);
876-
868+
fn prepare_async_entries(raw_node: &mut RawNode<MemStorage>, s: &MemStorage) {
877869
raw_node.raft.become_candidate();
878870
raw_node.raft.become_leader();
879871

@@ -906,6 +898,18 @@ fn test_raw_node_with_async_entries() {
906898
append_response.set_term(2);
907899
append_response.set_index(2);
908900
raw_node.step(append_response).unwrap();
901+
}
902+
903+
// Test entries are handled properly when they are fetched asynchronously
904+
#[test]
905+
fn test_raw_node_with_async_entries() {
906+
let l = default_logger();
907+
let mut cfg = new_test_config(1, 10, 1);
908+
cfg.max_size_per_msg = 2048;
909+
let s = new_storage();
910+
let mut raw_node = new_raw_node_with_config(vec![1, 2], &cfg, s.clone(), &l);
911+
912+
prepare_async_entries(&mut raw_node, &s);
909913

910914
// No entries are sent because the entries are temporarily unavailable
911915
let rd = raw_node.ready();
@@ -929,7 +933,7 @@ fn test_raw_node_with_async_entries() {
929933
let _ = raw_node.advance_append(rd);
930934
}
931935

932-
// Test async fetch entries works well when there is a remove node conf-change.
936+
// Test if async fetch entries works well when there is a remove node conf-change.
933937
#[test]
934938
fn test_raw_node_with_async_entries_to_removed_node() {
935939
let l = default_logger();
@@ -938,6 +942,57 @@ fn test_raw_node_with_async_entries_to_removed_node() {
938942
let s = new_storage();
939943
let mut raw_node = new_raw_node_with_config(vec![1, 2], &cfg, s.clone(), &l);
940944

945+
prepare_async_entries(&mut raw_node, &s);
946+
947+
raw_node.apply_conf_change(&remove_node(2)).unwrap();
948+
949+
// Entries are not sent due to the node is removed.
950+
s.wl().trigger_log_unavailable(false);
951+
let context = s.wl().take_get_entries_context().unwrap();
952+
raw_node.on_entries_fetched(context);
953+
let rd = raw_node.ready();
954+
assert_eq!(rd.entries().len(), 0);
955+
assert_eq!(rd.messages().len(), 0);
956+
let _ = raw_node.advance_append(rd);
957+
}
958+
959+
// Test if async fetch entries works well when there is a leader step-down.
960+
#[test]
961+
fn test_raw_node_with_async_entries_on_follower() {
962+
let l = default_logger();
963+
let mut cfg = new_test_config(1, 10, 1);
964+
cfg.max_size_per_msg = 2048;
965+
let s = new_storage();
966+
let mut raw_node = new_raw_node_with_config(vec![1, 2], &cfg, s.clone(), &l);
967+
968+
prepare_async_entries(&mut raw_node, &s);
969+
970+
// Set recent inactive to step down leader
971+
raw_node.raft.mut_prs().get_mut(2).unwrap().recent_active = false;
972+
let mut msg = Message::new();
973+
msg.set_to(1);
974+
msg.set_msg_type(MessageType::MsgCheckQuorum);
975+
raw_node.raft.step(msg).unwrap();
976+
assert_ne!(raw_node.raft.state, StateRole::Leader);
977+
978+
// Entries are not sent due to the leader is changed.
979+
s.wl().trigger_log_unavailable(false);
980+
let context = s.wl().take_get_entries_context().unwrap();
981+
raw_node.on_entries_fetched(context);
982+
let rd = raw_node.ready();
983+
assert_eq!(rd.entries().len(), 0);
984+
assert_eq!(rd.messages().len(), 0);
985+
let _ = raw_node.advance_append(rd);
986+
}
987+
988+
#[test]
989+
fn test_raw_node_async_entries_with_leader_change() {
990+
let l = default_logger();
991+
let mut cfg = new_test_config(1, 10, 1);
992+
cfg.max_size_per_msg = 2048;
993+
let s = new_storage();
994+
let mut raw_node = new_raw_node_with_config(vec![1, 2], &cfg, s.clone(), &l);
995+
941996
raw_node.raft.become_candidate();
942997
raw_node.raft.become_leader();
943998

@@ -971,14 +1026,16 @@ fn test_raw_node_with_async_entries_to_removed_node() {
9711026
append_response.set_index(2);
9721027
raw_node.step(append_response).unwrap();
9731028

974-
raw_node.apply_conf_change(&remove_node(2)).unwrap();
1029+
raw_node.raft.become_follower(raw_node.raft.term + 1, 2);
1030+
raw_node.raft.become_candidate();
1031+
raw_node.raft.become_leader();
9751032

976-
// Entries are not sent due to the node is removed.
1033+
// Entries are not sent due to the leadership or the term is changed.
9771034
s.wl().trigger_log_unavailable(false);
9781035
let context = s.wl().take_get_entries_context().unwrap();
9791036
raw_node.on_entries_fetched(context);
9801037
let rd = raw_node.ready();
981-
assert_eq!(rd.entries().len(), 0);
1038+
assert_eq!(rd.entries().len(), 1); // no-op entry
9821039
assert_eq!(rd.messages().len(), 0);
9831040
let _ = raw_node.advance_append(rd);
9841041
}

src/raft.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,7 @@ impl<T: Storage> RaftCore<T> {
805805
self.max_msg_size,
806806
GetEntriesContext(GetEntriesFor::SendAppend {
807807
to,
808+
term: self.term,
808809
aggressively: !allow_empty,
809810
}),
810811
);

src/raw_node.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,11 +419,20 @@ impl<T: Storage> RawNode<T> {
419419
/// Panics if passed with the context of context.can_async() == false
420420
pub fn on_entries_fetched(&mut self, context: GetEntriesContext) {
421421
match context.0 {
422-
GetEntriesFor::SendAppend { to, aggressively } => {
422+
GetEntriesFor::SendAppend {
423+
to,
424+
term,
425+
aggressively,
426+
} => {
427+
if self.raft.term != term || self.raft.state != StateRole::Leader {
428+
// term or leadership has changed
429+
return;
430+
}
423431
if self.raft.prs().get(to).is_none() {
424432
// the peer has been removed, do nothing
425433
return;
426434
}
435+
427436
if aggressively {
428437
self.raft.send_append_aggressively(to)
429438
} else {

src/storage.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ pub(crate) enum GetEntriesFor {
8282
SendAppend {
8383
/// the peer id to which the entries are going to send
8484
to: u64,
85+
/// the term when the request is issued
86+
term: u64,
8587
/// whether to exhaust all the entries
8688
aggressively: bool,
8789
},

0 commit comments

Comments
 (0)