Skip to content

Commit eed70bc

Browse files
author
LintianShi
committed
async log entries fetch for forwarding
Signed-off-by: LintianShi <lintian.shi@pingcap.com>
1 parent 1fcdb47 commit eed70bc

File tree

4 files changed

+125
-96
lines changed

4 files changed

+125
-96
lines changed

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ pub use raw_node::is_empty_snap;
536536
pub use raw_node::{LightReady, Peer, RawNode, Ready, SnapshotStatus};
537537
pub use read_only::{ReadOnlyOption, ReadState};
538538
pub use status::Status;
539-
pub use storage::{GetEntriesContext, RaftState, Storage};
539+
pub use storage::{GetEntriesContext, GetEntriesFor, RaftState, Storage};
540540
pub use tracker::{Inflights, Progress, ProgressState, ProgressTracker};
541541
pub use util::majority;
542542

src/raft.rs

Lines changed: 74 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,63 @@ impl<T: Storage> RaftCore<T> {
844844
true
845845
}
846846

847+
fn send_forward(
848+
&mut self,
849+
from: u64,
850+
commit: u64,
851+
commit_term: u64,
852+
forward: &Forward,
853+
msgs: &mut Vec<Message>,
854+
) {
855+
let mut m = Message::default();
856+
m.to = forward.to;
857+
m.from = from;
858+
m.commit = commit;
859+
m.commit_term = commit_term;
860+
m.set_msg_type(MessageType::MsgAppend);
861+
// Fetch log entries from the forward.index to the last index of log.
862+
if self
863+
.raft_log
864+
.match_term(forward.get_index(), forward.get_log_term())
865+
{
866+
let ents = self.raft_log.entries(
867+
forward.get_index() + 1,
868+
self.max_msg_size,
869+
GetEntriesContext(GetEntriesFor::SendForward {
870+
from,
871+
commit,
872+
commit_term,
873+
term: self.term,
874+
forward: forward.clone(),
875+
}),
876+
);
877+
878+
match ents {
879+
Ok(ents) => {
880+
m.index = forward.get_index();
881+
m.log_term = forward.get_log_term();
882+
m.set_entries(ents.into());
883+
self.send(m, msgs);
884+
}
885+
Err(Error::Store(StorageError::LogTemporarilyUnavailable)) => {}
886+
_ => {
887+
// Forward MsgAppend with empty entries in order to update commit
888+
// or trigger decrementing next_idx.
889+
m.index = forward.get_index();
890+
m.log_term = forward.get_log_term();
891+
self.send(m, msgs);
892+
warn!(
893+
self.logger,
894+
"The agent fails to fetch entries, index {} log term {} in forward message to peer {}.",
895+
forward.get_index(),
896+
forward.get_log_term(),
897+
forward.get_to()
898+
);
899+
}
900+
}
901+
}
902+
}
903+
847904
// send_heartbeat sends an empty MsgAppend
848905
fn send_heartbeat(
849906
&mut self,
@@ -904,6 +961,12 @@ impl<T: Storage> Raft<T> {
904961
.for_each(|(id, pr)| core.send_append(*id, pr, msgs));
905962
}
906963

964+
/// Forwards an append RPC from the leader to the given peer.
965+
pub fn send_forward(&mut self, from: u64, commit: u64, commit_term: u64, forward: &Forward) {
966+
self.r
967+
.send_forward(from, commit, commit_term, forward, &mut self.msgs);
968+
}
969+
907970
/// Broadcasts heartbeats to all the followers if it's leader.
908971
pub fn ping(&mut self) {
909972
if self.state == StateRole::Leader {
@@ -2541,59 +2604,20 @@ impl<T: Storage> Raft<T> {
25412604
// If the agent fails to append entries from the leader,
25422605
// the agent cannot forward MsgAppend.
25432606
for forward in m.get_forwards() {
2544-
// Fetch log entries from the forward.index to the last index of log.
2545-
if self
2546-
.raft_log
2547-
.match_term(forward.get_index(), forward.get_log_term())
2548-
{
2549-
let ents = self.raft_log.entries(
2550-
forward.get_index() + 1,
2551-
self.max_msg_size,
2552-
GetEntriesContext(GetEntriesFor::SendAppend {
2553-
to: forward.get_to(),
2554-
term: m.term,
2555-
aggressively: false,
2556-
}),
2557-
);
2558-
2559-
match ents {
2560-
Ok(ents) => {
2561-
let mut m_append = Message::default();
2562-
m_append.to = forward.get_to();
2563-
m_append.from = m.get_from();
2564-
m_append.set_msg_type(MessageType::MsgAppend);
2565-
m_append.index = forward.get_index();
2566-
m_append.log_term = forward.get_log_term();
2567-
m_append.set_entries(ents.into());
2568-
m_append.commit = m.get_commit();
2569-
m_append.commit_term = m.get_commit_term();
2570-
self.r.send(m_append, &mut self.msgs);
2571-
}
2572-
Err(_) => {
2573-
self.dummy_forward(m, forward);
2574-
warn!(
2575-
self.logger,
2576-
"The agent fails to fetch entries, index {} log term {} in forward message to peer {}.",
2577-
forward.get_index(),
2578-
forward.get_log_term(),
2579-
forward.get_to()
2580-
);
2581-
}
2582-
}
2583-
} else {
2584-
self.dummy_forward(m, forward);
2585-
warn!(
2586-
self.logger,
2587-
"The agent's log does not match with index {} log term {} in forward message to peer {}.",
2588-
forward.get_index(),
2589-
forward.get_log_term(),
2590-
forward.get_to()
2591-
);
2592-
}
2607+
self.r
2608+
.send_forward(m.from, m.commit, m.commit_term, forward, &mut self.msgs);
25932609
}
25942610
} else {
25952611
for forward in m.get_forwards() {
2596-
self.dummy_forward(m, forward);
2612+
let mut m_append = Message::default();
2613+
m_append.to = forward.to;
2614+
m_append.from = m.from;
2615+
m_append.commit = m.commit;
2616+
m_append.commit_term = m.commit_term;
2617+
m_append.set_msg_type(MessageType::MsgAppend);
2618+
m_append.index = forward.get_index();
2619+
m_append.log_term = forward.get_log_term();
2620+
self.r.send(m_append, &mut self.msgs);
25972621
}
25982622
info!(
25992623
self.logger,
@@ -2937,29 +2961,6 @@ impl<T: Storage> Raft<T> {
29372961
self.lead_transferee = None;
29382962
}
29392963

2940-
// Forward MsgAppend with empty entries in order to update commit
2941-
// or trigger decrementing next_idx.
2942-
fn dummy_forward(&mut self, m: &Message, forward: &Forward) {
2943-
let mut m_append = Message::default();
2944-
m_append.to = forward.get_to();
2945-
m_append.from = m.get_from();
2946-
m_append.set_msg_type(MessageType::MsgAppend);
2947-
m_append.index = forward.get_index();
2948-
m_append.log_term = forward.get_log_term();
2949-
m_append.commit = m.get_commit();
2950-
m_append.commit_term = m.get_commit_term();
2951-
2952-
info!(
2953-
self.logger,
2954-
"The agent forwards reserved empty log entry [logterm: {msg_log_term}, index: {msg_index}] \
2955-
to peer {id}",
2956-
msg_log_term = forward.log_term,
2957-
msg_index = forward.index,
2958-
id = forward.to;
2959-
);
2960-
self.r.send(m_append, &mut self.msgs);
2961-
}
2962-
29632964
fn send_request_snapshot(&mut self) {
29642965
let mut m = Message::default();
29652966
m.set_msg_type(MessageType::MsgAppendResponse);

src/raw_node.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,19 @@ impl<T: Storage> RawNode<T> {
439439
self.raft.send_append(to)
440440
}
441441
}
442+
GetEntriesFor::SendForward {
443+
from,
444+
commit,
445+
commit_term,
446+
term,
447+
forward,
448+
} => {
449+
if self.raft.term != term {
450+
// term has changed
451+
return;
452+
}
453+
self.raft.send_forward(from, commit, commit_term, &forward);
454+
}
442455
GetEntriesFor::Empty(can_async) if can_async => {}
443456
_ => panic!("shouldn't call callback on non-async context"),
444457
}

src/storage.rs

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,44 @@ impl RaftState {
5656
}
5757
}
5858

59+
/// Purpose of getting entries.
60+
#[derive(Debug)]
61+
pub enum GetEntriesFor {
62+
/// For sending entries to followers.
63+
SendAppend {
64+
/// The peer id to which the entries are going to send.
65+
to: u64,
66+
/// The term when the request is issued.
67+
term: u64,
68+
/// Whether to exhaust all the entries.
69+
aggressively: bool,
70+
},
71+
/// For forwarding entries to followers.
72+
SendForward {
73+
/// The peer id from which the entries are forwarded.
74+
from: u64,
75+
/// The commit index in MsgGroupbroadcast.
76+
commit: u64,
77+
/// The commit term in MsgGroupbroadcast.
78+
commit_term: u64,
79+
/// The term when the request is issued.
80+
term: u64,
81+
/// The forward information in MsgGroupbroadcast.
82+
forward: Forward,
83+
},
84+
/// For getting committed entries in a ready.
85+
GenReady,
86+
/// For getting entries to check pending conf when transferring leader.
87+
TransferLeader,
88+
/// For getting entries to check pending conf when forwarding commit index by vote messages
89+
CommitByVote,
90+
/// It's not called by the raft itself.
91+
Empty(bool),
92+
}
93+
5994
/// Records the context of the caller who calls entries() of Storage trait.
6095
#[derive(Debug)]
61-
pub struct GetEntriesContext(pub(crate) GetEntriesFor);
96+
pub struct GetEntriesContext(pub GetEntriesFor);
6297

6398
impl GetEntriesContext {
6499
/// Used for callers out of raft. Caller can customize if it supports async.
@@ -70,33 +105,13 @@ impl GetEntriesContext {
70105
pub fn can_async(&self) -> bool {
71106
match self.0 {
72107
GetEntriesFor::SendAppend { .. } => true,
108+
GetEntriesFor::SendForward { .. } => true,
73109
GetEntriesFor::Empty(can_async) => can_async,
74110
_ => false,
75111
}
76112
}
77113
}
78114

79-
#[derive(Debug)]
80-
pub(crate) enum GetEntriesFor {
81-
// for sending entries to followers
82-
SendAppend {
83-
/// the peer id to which the entries are going to send
84-
to: u64,
85-
/// the term when the request is issued
86-
term: u64,
87-
/// whether to exhaust all the entries
88-
aggressively: bool,
89-
},
90-
// for getting committed entries in a ready
91-
GenReady,
92-
// for getting entries to check pending conf when transferring leader
93-
TransferLeader,
94-
// for getting entries to check pending conf when forwarding commit index by vote messages
95-
CommitByVote,
96-
// It's not called by the raft itself
97-
Empty(bool),
98-
}
99-
100115
/// Storage saves all the information about the current Raft implementation, including Raft Log,
101116
/// commit index, the leader to vote for, etc.
102117
///

0 commit comments

Comments
 (0)