Skip to content

Commit 63aec46

Browse files
authored
feat: add disable_proposal_forwarding config params (#552)
Signed-off-by: Dat Tien Nguyen <phongtomfapp@gmail.com>
1 parent dfe2239 commit 63aec46

File tree

3 files changed

+86
-0
lines changed

3 files changed

+86
-0
lines changed

harness/tests/integration_cases/test_raw_node.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1876,6 +1876,77 @@ fn test_committed_entries_pagination_after_restart() {
18761876
}
18771877
}
18781878

1879+
#[test]
1880+
fn test_disable_proposal_forwarding() {
1881+
let l = default_logger();
1882+
1883+
let n1 = new_test_raft_with_config(
1884+
&Config {
1885+
id: 1,
1886+
heartbeat_tick: 1,
1887+
election_tick: 10,
1888+
disable_proposal_forwarding: false,
1889+
..Default::default()
1890+
},
1891+
MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])),
1892+
&l,
1893+
);
1894+
1895+
let n2 = new_test_raft_with_config(
1896+
&Config {
1897+
id: 2,
1898+
heartbeat_tick: 1,
1899+
election_tick: 10,
1900+
disable_proposal_forwarding: false,
1901+
..Default::default()
1902+
},
1903+
MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])),
1904+
&l,
1905+
);
1906+
1907+
let n3 = new_test_raft_with_config(
1908+
&Config {
1909+
id: 3,
1910+
heartbeat_tick: 1,
1911+
election_tick: 10,
1912+
disable_proposal_forwarding: true,
1913+
..Default::default()
1914+
},
1915+
MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])),
1916+
&l,
1917+
);
1918+
1919+
let mut network = Network::new(vec![Some(n1), Some(n2), Some(n3)], &l);
1920+
1921+
// node 1 starts campaign to become leader.
1922+
network.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
1923+
1924+
// send proposal to n2(follower) where DisableProposalForwarding is false
1925+
assert_eq!(
1926+
network
1927+
.peers
1928+
.get_mut(&2)
1929+
.unwrap()
1930+
.step(new_message(2, 2, MessageType::MsgPropose, 1)),
1931+
Ok(())
1932+
);
1933+
1934+
// verify n2(follower) does forward the proposal when DisableProposalForwarding is false
1935+
assert_eq!(network.peers.get(&2).unwrap().msgs.len(), 1);
1936+
1937+
// send proposal to n3(follower) where DisableProposalForwarding is true
1938+
assert_eq!(
1939+
network
1940+
.peers
1941+
.get_mut(&3)
1942+
.unwrap()
1943+
.step(new_message(3, 3, MessageType::MsgPropose, 1)),
1944+
Err(Error::ProposalDropped)
1945+
);
1946+
1947+
assert!(network.peers.get(&3).unwrap().msgs.is_empty());
1948+
}
1949+
18791950
#[derive(Default)]
18801951
struct IgnoreSizeHintMemStorage {
18811952
inner: MemStorage,

src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ pub struct Config {
102102
/// Maximum raft log number that can be applied after commit but before persist.
103103
/// The default value is 0, which means apply after both commit and persist.
104104
pub max_apply_unpersisted_log_limit: u64,
105+
106+
/// If enable, followers will not forward proposal to leader.
107+
pub disable_proposal_forwarding: bool,
105108
}
106109

107110
impl Default for Config {
@@ -125,6 +128,7 @@ impl Default for Config {
125128
max_uncommitted_size: NO_LIMIT,
126129
max_committed_size_per_ready: NO_LIMIT,
127130
max_apply_unpersisted_log_limit: 0,
131+
disable_proposal_forwarding: false,
128132
}
129133
}
130134
}

src/raft.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ pub struct RaftCore<T: Storage> {
233233

234234
skip_bcast_commit: bool,
235235
batch_append: bool,
236+
disable_proposal_forwarding: bool,
236237

237238
heartbeat_timeout: usize,
238239
election_timeout: usize,
@@ -363,6 +364,7 @@ impl<T: Storage> Raft<T> {
363364
last_log_tail_index: 0,
364365
},
365366
max_committed_size_per_ready: c.max_committed_size_per_ready,
367+
disable_proposal_forwarding: c.disable_proposal_forwarding,
366368
},
367369
};
368370
confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?;
@@ -2337,6 +2339,15 @@ impl<T: Storage> Raft<T> {
23372339
term = self.term;
23382340
);
23392341
return Err(Error::ProposalDropped);
2342+
} else if self.disable_proposal_forwarding {
2343+
info!(
2344+
self.logger,
2345+
"{from} not forwarding to leader {to} at term {term}; dropping proposal",
2346+
from = self.id,
2347+
to = self.leader_id,
2348+
term = self.term;
2349+
);
2350+
return Err(Error::ProposalDropped);
23402351
}
23412352
m.to = self.leader_id;
23422353
self.r.send(m, &mut self.msgs);

0 commit comments

Comments
 (0)