Skip to content

Commit 2fa5d20

Browse files
author
LintianShi
committed
async log entries fetch for forwarding
Signed-off-by: LintianShi <lintian.shi@pingcap.com>
1 parent d0834f5 commit 2fa5d20

File tree

3 files changed

+106
-73
lines changed

3 files changed

+106
-73
lines changed

src/raft.rs

Lines changed: 74 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,63 @@ impl<T: Storage> RaftCore<T> {
844844
true
845845
}
846846

847+
fn send_forward(
848+
&mut self,
849+
from: u64,
850+
commit: u64,
851+
commit_term: u64,
852+
forward: &Forward,
853+
msgs: &mut Vec<Message>,
854+
) {
855+
let mut m = Message::default();
856+
m.to = forward.to;
857+
m.from = from;
858+
m.commit = commit;
859+
m.commit_term = commit_term;
860+
m.set_msg_type(MessageType::MsgAppend);
861+
// Fetch log entries from the forward.index to the last index of log.
862+
if self
863+
.raft_log
864+
.match_term(forward.get_index(), forward.get_log_term())
865+
{
866+
let ents = self.raft_log.entries(
867+
forward.get_index() + 1,
868+
self.max_msg_size,
869+
GetEntriesContext(GetEntriesFor::SendForward {
870+
from,
871+
commit,
872+
commit_term,
873+
term: self.term,
874+
forward: forward.clone(),
875+
}),
876+
);
877+
878+
match ents {
879+
Ok(ents) => {
880+
m.index = forward.get_index();
881+
m.log_term = forward.get_log_term();
882+
m.set_entries(ents.into());
883+
self.send(m, msgs);
884+
}
885+
Err(Error::Store(StorageError::LogTemporarilyUnavailable)) => {}
886+
_ => {
887+
// Forward MsgAppend with empty entries in order to update commit
888+
// or trigger decrementing next_idx.
889+
m.index = forward.get_index();
890+
m.log_term = forward.get_log_term();
891+
self.send(m, msgs);
892+
warn!(
893+
self.logger,
894+
"The agent fails to fetch entries, index {} log term {} in forward message to peer {}.",
895+
forward.get_index(),
896+
forward.get_log_term(),
897+
forward.get_to()
898+
);
899+
}
900+
}
901+
}
902+
}
903+
847904
// send_heartbeat sends an empty MsgAppend
848905
fn send_heartbeat(
849906
&mut self,
@@ -904,6 +961,12 @@ impl<T: Storage> Raft<T> {
904961
.for_each(|(id, pr)| core.send_append(*id, pr, msgs));
905962
}
906963

964+
/// Forwards an append RPC from the leader to the given peer.
965+
pub fn send_forward(&mut self, from: u64, commit: u64, commit_term: u64, forward: &Forward) {
966+
self.r
967+
.send_forward(from, commit, commit_term, forward, &mut self.msgs);
968+
}
969+
907970
/// Broadcasts heartbeats to all the followers if it's leader.
908971
pub fn ping(&mut self) {
909972
if self.state == StateRole::Leader {
@@ -2541,59 +2604,20 @@ impl<T: Storage> Raft<T> {
25412604
// If the agent fails to append entries from the leader,
25422605
// the agent cannot forward MsgAppend.
25432606
for forward in m.get_forwards() {
2544-
// Fetch log entries from the forward.index to the last index of log.
2545-
if self
2546-
.raft_log
2547-
.match_term(forward.get_index(), forward.get_log_term())
2548-
{
2549-
let ents = self.raft_log.entries(
2550-
forward.get_index() + 1,
2551-
self.max_msg_size,
2552-
GetEntriesContext(GetEntriesFor::SendAppend {
2553-
to: forward.get_to(),
2554-
term: m.term,
2555-
aggressively: false,
2556-
}),
2557-
);
2558-
2559-
match ents {
2560-
Ok(ents) => {
2561-
let mut m_append = Message::default();
2562-
m_append.to = forward.get_to();
2563-
m_append.from = m.get_from();
2564-
m_append.set_msg_type(MessageType::MsgAppend);
2565-
m_append.index = forward.get_index();
2566-
m_append.log_term = forward.get_log_term();
2567-
m_append.set_entries(ents.into());
2568-
m_append.commit = m.get_commit();
2569-
m_append.commit_term = m.get_commit_term();
2570-
self.r.send(m_append, &mut self.msgs);
2571-
}
2572-
Err(_) => {
2573-
self.dummy_forward(m, forward);
2574-
warn!(
2575-
self.logger,
2576-
"The agent fails to fetch entries, index {} log term {} in forward message to peer {}.",
2577-
forward.get_index(),
2578-
forward.get_log_term(),
2579-
forward.get_to()
2580-
);
2581-
}
2582-
}
2583-
} else {
2584-
self.dummy_forward(m, forward);
2585-
warn!(
2586-
self.logger,
2587-
"The agent's log does not match with index {} log term {} in forward message to peer {}.",
2588-
forward.get_index(),
2589-
forward.get_log_term(),
2590-
forward.get_to()
2591-
);
2592-
}
2607+
self.r
2608+
.send_forward(m.from, m.commit, m.commit_term, forward, &mut self.msgs);
25932609
}
25942610
} else {
25952611
for forward in m.get_forwards() {
2596-
self.dummy_forward(m, forward);
2612+
let mut m_append = Message::default();
2613+
m_append.to = forward.to;
2614+
m_append.from = m.from;
2615+
m_append.commit = m.commit;
2616+
m_append.commit_term = m.commit_term;
2617+
m_append.set_msg_type(MessageType::MsgAppend);
2618+
m_append.index = forward.get_index();
2619+
m_append.log_term = forward.get_log_term();
2620+
self.r.send(m_append, &mut self.msgs);
25972621
}
25982622
info!(
25992623
self.logger,
@@ -2937,29 +2961,6 @@ impl<T: Storage> Raft<T> {
29372961
self.lead_transferee = None;
29382962
}
29392963

2940-
// Forward MsgAppend with empty entries in order to update commit
2941-
// or trigger decrementing next_idx.
2942-
fn dummy_forward(&mut self, m: &Message, forward: &Forward) {
2943-
let mut m_append = Message::default();
2944-
m_append.to = forward.get_to();
2945-
m_append.from = m.get_from();
2946-
m_append.set_msg_type(MessageType::MsgAppend);
2947-
m_append.index = forward.get_index();
2948-
m_append.log_term = forward.get_log_term();
2949-
m_append.commit = m.get_commit();
2950-
m_append.commit_term = m.get_commit_term();
2951-
2952-
info!(
2953-
self.logger,
2954-
"The agent forwards reserved empty log entry [logterm: {msg_log_term}, index: {msg_index}] \
2955-
to peer {id}",
2956-
msg_log_term = forward.log_term,
2957-
msg_index = forward.index,
2958-
id = forward.to;
2959-
);
2960-
self.r.send(m_append, &mut self.msgs);
2961-
}
2962-
29632964
fn send_request_snapshot(&mut self) {
29642965
let mut m = Message::default();
29652966
m.set_msg_type(MessageType::MsgAppendResponse);

src/raw_node.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,24 @@ impl<T: Storage> RawNode<T> {
439439
self.raft.send_append(to)
440440
}
441441
}
442+
GetEntriesFor::SendForward {
443+
from,
444+
commit,
445+
commit_term,
446+
term,
447+
forward,
448+
} => {
449+
if self.raft.term != term || self.raft.state == StateRole::Leader {
450+
// term or leadership has changed
451+
// this peer is not the agent
452+
return;
453+
}
454+
if self.raft.prs().get(forward.to).is_none() {
455+
// the peer has been removed, do nothing
456+
return;
457+
}
458+
self.raft.send_forward(from, commit, commit_term, &forward);
459+
}
442460
GetEntriesFor::Empty(can_async) if can_async => {}
443461
_ => panic!("shouldn't call callback on non-async context"),
444462
}

src/storage.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ impl GetEntriesContext {
7070
pub fn can_async(&self) -> bool {
7171
match self.0 {
7272
GetEntriesFor::SendAppend { .. } => true,
73+
GetEntriesFor::SendForward { .. } => true,
7374
GetEntriesFor::Empty(can_async) => can_async,
7475
_ => false,
7576
}
@@ -87,6 +88,19 @@ pub(crate) enum GetEntriesFor {
8788
/// whether to exhaust all the entries
8889
aggressively: bool,
8990
},
91+
// for forwarding entries to followers
92+
SendForward {
93+
/// the peer id from which the entries are forwarded
94+
from: u64,
95+
/// the commit index in MsgGroupbroadcast
96+
commit: u64,
97+
/// the commit term in MsgGroupbroadcast
98+
commit_term: u64,
99+
/// the term when the request is issued
100+
term: u64,
101+
/// the forward information in MsgGroupbroadcast
102+
forward: Forward,
103+
},
90104
// for getting committed entries in a ready
91105
GenReady,
92106
// for getting entries to check pending conf when transferring leader

0 commit comments

Comments
 (0)