Skip to content

Commit 4f7147c

Browse files
author
LintianShi
committed
Add lag flag in progress
Signed-off-by: LintianShi <lintian.shi@pingcap.com>
1 parent 40a8fcd commit 4f7147c

File tree

2 files changed

+34
-6
lines changed

2 files changed

+34
-6
lines changed

src/raft.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,11 @@ impl<T: Storage> RaftCore<T> {
800800
);
801801
return false;
802802
}
803+
// Check whether the progress lags behind the leader too much.
804+
if !pr.lag && pr.matched + 200 < self.raft_log.last_index() {
805+
pr.lag = true;
806+
}
807+
803808
let mut m = Message::default();
804809
m.to = to;
805810
if pr.pending_request_snapshot != INVALID_INDEX {
@@ -1048,8 +1053,10 @@ impl<T: Storage> Raft<T> {
10481053
);
10491054
}
10501055
let self_id = self.id;
1056+
let last_idx = self.raft_log.last_index();
10511057
let pr = self.mut_prs().get_mut(self_id).unwrap();
1052-
if pr.maybe_update(index) && self.maybe_commit() && self.should_bcast_commit() {
1058+
if pr.maybe_update(index, last_idx) && self.maybe_commit() && self.should_bcast_commit()
1059+
{
10531060
self.bcast_append();
10541061
}
10551062
}
@@ -1707,6 +1714,7 @@ impl<T: Storage> Raft<T> {
17071714
.0;
17081715
}
17091716

1717+
let last_idx = self.raft_log.last_index();
17101718
let pr = match self.prs.get_mut(m.from) {
17111719
Some(pr) => pr,
17121720
None => {
@@ -1769,7 +1777,7 @@ impl<T: Storage> Raft<T> {
17691777
}
17701778

17711779
let old_paused = pr.is_paused();
1772-
if !pr.maybe_update(m.index) {
1780+
if !pr.maybe_update(m.index, last_idx) {
17731781
return;
17741782
}
17751783

@@ -2742,8 +2750,9 @@ impl<T: Storage> Raft<T> {
27422750
}
27432751

27442752
// TODO: this is untested and likely unneeded
2753+
let last_idx = self.raft_log.last_index();
27452754
let pr = self.prs.get_mut(self.id).unwrap();
2746-
pr.maybe_update(pr.next_idx - 1);
2755+
pr.maybe_update(pr.next_idx - 1, last_idx);
27472756

27482757
self.pending_request_snapshot = INVALID_INDEX;
27492758

src/tracker/progress.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ pub struct Progress {
3939
/// RecentActive can be reset to false after an election timeout.
4040
pub recent_active: bool,
4141

42+
/// This is true if the progress lags behind the leader too much.
43+
pub lag: bool,
44+
4245
/// Inflights is a sliding window for the inflight messages.
4346
/// When inflights is full, no more message should be sent.
4447
/// When a leader sends out a message, the index of the last
@@ -66,6 +69,7 @@ impl Progress {
6669
pending_snapshot: 0,
6770
pending_request_snapshot: 0,
6871
recent_active: false,
72+
lag: false,
6973
ins: Inflights::new(ins_size),
7074
commit_group_id: 0,
7175
committed_index: 0,
@@ -87,6 +91,7 @@ impl Progress {
8791
self.pending_snapshot = 0;
8892
self.pending_request_snapshot = INVALID_INDEX;
8993
self.recent_active = false;
94+
self.lag = false;
9095
self.ins.reset();
9196
}
9297

@@ -133,10 +138,14 @@ impl Progress {
133138

134139
/// Returns false if the given n index comes from an outdated message.
135140
/// Otherwise it updates the progress and returns true.
136-
pub fn maybe_update(&mut self, n: u64) -> bool {
141+
/// If the matched has catched up with last_index, reset the lag flag.
142+
pub fn maybe_update(&mut self, n: u64, last_index: u64) -> bool {
137143
let need_update = self.matched < n;
138144
if need_update {
139145
self.matched = n;
146+
if self.lag && self.matched + 20 > last_index {
147+
self.lag = false;
148+
}
140149
self.resume();
141150
};
142151

@@ -287,7 +296,7 @@ mod tests {
287296
p.maybe_decr_to(1, 1, INVALID_INDEX);
288297
assert!(!p.paused, "paused= true, want false");
289298
p.paused = true;
290-
p.maybe_update(2);
299+
p.maybe_update(2, 2);
291300
assert!(!p.paused, "paused= true, want false");
292301
}
293302

@@ -357,7 +366,7 @@ mod tests {
357366
for (i, &(update, wm, wn, wok)) in tests.iter().enumerate() {
358367
let mut p = Progress::new(prev_n, 256);
359368
p.matched = prev_m;
360-
let ok = p.maybe_update(update);
369+
let ok = p.maybe_update(update, prev_n);
361370
if ok != wok {
362371
panic!("#{}: ok= {}, want {}", i, ok, wok);
363372
}
@@ -368,6 +377,16 @@ mod tests {
368377
panic!("#{}: next= {}, want {}", i, p.next_idx, wn);
369378
}
370379
}
380+
let mut p = Progress::new(5, 256);
381+
p.matched = prev_m;
382+
p.lag = true;
383+
let ok = p.maybe_update(prev_m + 1, prev_m + 10);
384+
if !ok {
385+
panic!("ok= {}", ok);
386+
}
387+
if p.lag {
388+
panic!("lag= {}", p.lag);
389+
}
371390
}
372391

373392
#[test]

0 commit comments

Comments
 (0)