Skip to content

Commit 75cb96f

Browse files
LintianShiLintianShi
LintianShi
authored andcommitted
Add new message type MsgGroupBroadcast and corresponding handler
Signed-off-by: LintianShi <lintian.shi@pingcap.com>
1 parent 7293f8a commit 75cb96f

File tree

3 files changed

+103
-7
lines changed

3 files changed

+103
-7
lines changed

proto/proto/eraftpb.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ message Snapshot {
4646
SnapshotMetadata metadata = 2;
4747
}
4848

49+
// Forward is a type that tells the agent how to forward the MsgGroupBroadcast from the leader.
50+
//
51+
// Field to is the destination of forwarding.
52+
// Field log_term and index is the previous entry of log entries that should be forwarded.
53+
// Entries to be forwarded is the range (index, last_index].
54+
message Forward {
55+
uint64 to = 1;
56+
uint64 log_term = 2;
57+
uint64 index = 3;
58+
}
59+
4960
enum MessageType {
5061
MsgHup = 0;
5162
MsgBeat = 1;
@@ -66,6 +77,7 @@ enum MessageType {
6677
MsgReadIndexResp = 16;
6778
MsgRequestPreVote = 17;
6879
MsgRequestPreVoteResponse = 18;
80+
MsgGroupBroadcast = 19;
6981
}
7082

7183
message Message {
@@ -89,6 +101,7 @@ message Message {
89101
uint64 reject_hint = 11;
90102
bytes context = 12;
91103
uint64 priority = 14;
104+
repeated Forward forwards = 16;
92105
}
93106

94107
message HardState {

src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ pub struct Config {
6767
/// rejoins the cluster.
6868
pub pre_vote: bool,
6969

70+
/// Enables follower replication.
71+
/// This reduces the across-AZ traffic of cloud deployment.
72+
pub follower_repl: bool,
73+
7074
/// The range of election timeout. In some cases, we hope some nodes has less possibility
7175
/// to become leader. This configuration ensures that the randomized election_timeout
7276
/// will always be suit in [min_election_tick, max_election_tick).
@@ -112,6 +116,7 @@ impl Default for Config {
112116
max_inflight_msgs: 256,
113117
check_quorum: false,
114118
pre_vote: false,
119+
follower_repl: false,
115120
min_election_tick: 0,
116121
max_election_tick: 0,
117122
read_only_option: ReadOnlyOption::Safe,

src/raft.rs

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,13 @@ pub struct RaftCore<T: Storage> {
236236
/// Enable this if greater cluster stability is preferred over faster elections.
237237
pub pre_vote: bool,
238238

239+
/// Enable follower replication.
240+
///
241+
/// This enables data replication from a follower to other servers in the same available zone.
242+
///
243+
/// Enable this for reducing across-AZ traffic of cloud deployment.
244+
pub follower_repl: bool,
245+
239246
skip_bcast_commit: bool,
240247
batch_append: bool,
241248

@@ -337,6 +344,7 @@ impl<T: Storage> Raft<T> {
337344
promotable: false,
338345
check_quorum: c.check_quorum,
339346
pre_vote: c.pre_vote,
347+
follower_repl: c.follower_repl,
340348
read_only: ReadOnly::new(c.read_only_option),
341349
heartbeat_timeout: c.heartbeat_tick,
342350
election_timeout: c.election_tick,
@@ -1372,6 +1380,7 @@ impl<T: Storage> Raft<T> {
13721380
if m.get_msg_type() == MessageType::MsgAppend
13731381
|| m.get_msg_type() == MessageType::MsgHeartbeat
13741382
|| m.get_msg_type() == MessageType::MsgSnapshot
1383+
|| m.get_msg_type() == MessageType::MsgGroupBroadcast && self.follower_repl
13751384
{
13761385
self.become_follower(m.term, m.from);
13771386
} else {
@@ -1381,7 +1390,8 @@ impl<T: Storage> Raft<T> {
13811390
} else if m.term < self.term {
13821391
if (self.check_quorum || self.pre_vote)
13831392
&& (m.get_msg_type() == MessageType::MsgHeartbeat
1384-
|| m.get_msg_type() == MessageType::MsgAppend)
1393+
|| m.get_msg_type() == MessageType::MsgAppend
1394+
|| m.get_msg_type() == MessageType::MsgGroupBroadcast && self.follower_repl)
13851395
{
13861396
// We have received messages from a leader at a lower term. It is possible
13871397
// that these messages were simply delayed in the network, but this could
@@ -2314,6 +2324,11 @@ impl<T: Storage> Raft<T> {
23142324
self.leader_id = m.from;
23152325
self.handle_append_entries(&m);
23162326
}
2327+
MessageType::MsgGroupBroadcast => {
2328+
self.election_elapsed = 0;
2329+
self.leader_id = m.from;
2330+
self.handle_group_broadcast(&m);
2331+
}
23172332
MessageType::MsgHeartbeat => {
23182333
self.election_elapsed = 0;
23192334
self.leader_id = m.from;
@@ -2425,13 +2440,14 @@ impl<T: Storage> Raft<T> {
24252440
Err(Error::RequestSnapshotDropped)
24262441
}
24272442

2428-
// TODO: revoke pub when there is a better way to test.
2429-
/// For a given message, append the entries to the log.
2430-
pub fn handle_append_entries(&mut self, m: &Message) {
2443+
/// Try to append entries, and return the append result.
2444+
/// Return true only if the entries in the message has been appended in the log successfully.
2445+
pub fn try_append_entries(&mut self, m: &Message) -> bool {
24312446
if self.pending_request_snapshot != INVALID_INDEX {
24322447
self.send_request_snapshot();
2433-
return;
2448+
return false;
24342449
}
2450+
24352451
if m.index < self.raft_log.committed {
24362452
debug!(
24372453
self.logger,
@@ -2443,13 +2459,14 @@ impl<T: Storage> Raft<T> {
24432459
to_send.index = self.raft_log.committed;
24442460
to_send.commit = self.raft_log.committed;
24452461
self.r.send(to_send, &mut self.msgs);
2446-
return;
2462+
return false;
24472463
}
24482464

24492465
let mut to_send = Message::default();
24502466
to_send.to = m.from;
24512467
to_send.set_msg_type(MessageType::MsgAppendResponse);
24522468

2469+
let mut success = true;
24532470
if let Some((_, last_idx)) = self
24542471
.raft_log
24552472
.maybe_append(m.index, m.log_term, m.commit, &m.entries)
@@ -2458,7 +2475,7 @@ impl<T: Storage> Raft<T> {
24582475
} else {
24592476
debug!(
24602477
self.logger,
2461-
"rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \
2478+
"Reject append [logterm: {msg_log_term}, index: {msg_index}] \
24622479
from {from}",
24632480
msg_log_term = m.log_term,
24642481
msg_index = m.index,
@@ -2483,9 +2500,70 @@ impl<T: Storage> Raft<T> {
24832500
to_send.reject = true;
24842501
to_send.reject_hint = hint_index;
24852502
to_send.log_term = hint_term.unwrap();
2503+
success = false;
24862504
}
24872505
to_send.set_commit(self.raft_log.committed);
24882506
self.r.send(to_send, &mut self.msgs);
2507+
success
2508+
}
2509+
2510+
// TODO: revoke pub when there is a better way to test.
2511+
/// For a given message, append the entries to the log.
2512+
pub fn handle_append_entries(&mut self, m: &Message) {
2513+
self.try_append_entries(m);
2514+
}
2515+
2516+
/// For a broadcast, append entries to onw log and forward MsgAppend to other dest.
2517+
pub fn handle_group_broadcast(&mut self, m: &Message) {
2518+
if self.try_append_entries(m) {
2519+
// If the agent fails to append entries from the leader,
2520+
// the agent cannot forward MsgAppend.
2521+
let agent_id = m.get_to();
2522+
for forward in m.get_forwards() {
2523+
// Dest should be in the cluster.
2524+
if self.prs().get(forward.get_to()).is_some() {
2525+
// Fetch log entries from the forward.index to the last index of log.
2526+
let ents = self.raft_log.entries(
2527+
forward.get_index() + 1,
2528+
self.max_msg_size,
2529+
GetEntriesContext(GetEntriesFor::SendAppend {
2530+
to: forward.get_to(),
2531+
term: m.term,
2532+
aggressively: false,
2533+
}),
2534+
);
2535+
if self
2536+
.raft_log
2537+
.match_term(forward.get_index(), forward.get_log_term())
2538+
{
2539+
let mut m_append = Message::default();
2540+
m_append.to = forward.get_to();
2541+
m_append.from = m.get_from();
2542+
m_append.set_msg_type(MessageType::MsgAppend);
2543+
m_append.index = forward.get_index();
2544+
m_append.log_term = forward.get_log_term();
2545+
m_append.set_entries(ents.unwrap().into());
2546+
m_append.commit = m.get_commit();
2547+
m_append.commit_term = m.get_commit_term();
2548+
debug!(
2549+
self.logger,
2550+
"Peer {} forward MsgAppend from {} to {}",
2551+
agent_id,
2552+
m_append.from,
2553+
m_append.to
2554+
);
2555+
self.r.send(m_append, &mut self.msgs)
2556+
} else {
2557+
warn!(
2558+
self.logger,
2559+
"The agent's log does not match with index {} log term {} in forward message",
2560+
forward.get_index(),
2561+
forward.get_log_term()
2562+
);
2563+
}
2564+
}
2565+
}
2566+
}
24892567
}
24902568

24912569
// TODO: revoke pub when there is a better way to test.

0 commit comments

Comments
 (0)