Skip to content

Commit eb005ee

Browse files
authored
fix error if 2 events occur simultaneously: decision progression and view stopping (#622)
Signed-off-by: Fedor Partanskiy <fredprtnsk@gmail.com>
1 parent 990dcbc commit eb005ee

File tree

3 files changed

+116
-1
lines changed

3 files changed

+116
-1
lines changed

internal/bft/controller.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type Proposer interface {
7272
Start()
7373
Abort()
7474
Stopped() bool
75+
AbortChan() <-chan struct{}
7576
GetLeaderID() uint64
7677
GetMetadata() []byte
7778
HandleMessage(sender uint64, m *protos.Message)
@@ -171,6 +172,14 @@ func (c *Controller) currentViewStopped() bool {
171172
return view.Stopped()
172173
}
173174

175+
func (c *Controller) currentViewAbortChan() <-chan struct{} {
176+
c.currViewLock.RLock()
177+
view := c.currView
178+
c.currViewLock.RUnlock()
179+
180+
return view.AbortChan()
181+
}
182+
174183
func (c *Controller) currentViewLeader() uint64 {
175184
c.currViewLock.RLock()
176185
view := c.currView
@@ -785,7 +794,7 @@ func (c *Controller) Start(startViewNumber uint64, startProposalSequence uint64,
785794
c.syncChan = make(chan struct{}, 1)
786795
c.stopChan = make(chan struct{})
787796
c.leaderToken = make(chan struct{}, 1)
788-
c.decisionChan = make(chan decision)
797+
c.decisionChan = make(chan decision, 1)
789798
c.deliverChan = make(chan struct{})
790799
c.viewChange = make(chan viewInfo, 1)
791800
c.abortViewChan = make(chan uint64, 1)
@@ -886,6 +895,7 @@ func (c *Controller) Decide(proposal types.Proposal, signatures []types.Signatur
886895
select {
887896
case <-c.deliverChan: // wait for the delivery of the decision to the application
888897
case <-c.stopChan: // If we stopped the controller, abort delivery
898+
case <-c.currentViewAbortChan(): // If we stopped the view, abort delivery
889899
}
890900
}
891901

internal/bft/view.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,10 @@ func (v *View) Stopped() bool {
10151015
}
10161016
}
10171017

1018+
func (v *View) AbortChan() <-chan struct{} {
1019+
return v.abortChan
1020+
}
1021+
10181022
func (v *View) GetLeaderID() uint64 {
10191023
return v.LeaderID
10201024
}

test/basic_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"encoding/hex"
1111
"fmt"
1212
"os"
13+
"runtime"
1314
"strconv"
1415
"strings"
1516
"sync"
@@ -3306,6 +3307,106 @@ ExternalLoop:
33063307
}
33073308
}
33083309

3310+
// Decide and abort view at the same time
3311+
func TestDecideAndAbortViewAtSameTime(t *testing.T) {
3312+
t.Parallel()
3313+
3314+
done := make(chan struct{})
3315+
defer close(done)
3316+
3317+
network := NewNetwork()
3318+
defer network.Shutdown()
3319+
3320+
testDir, err := os.MkdirTemp("", t.Name())
3321+
assert.NoErrorf(t, err, "generate temporary test dir")
3322+
defer os.RemoveAll(testDir)
3323+
3324+
numberOfNodes := 4
3325+
nodes := make([]*App, 0)
3326+
for i := 1; i <= numberOfNodes; i++ {
3327+
n := newNode(uint64(i), network, t.Name(), testDir, false, 0)
3328+
n.Consensus.Config.LeaderHeartbeatTimeout = 5 * time.Second
3329+
n.Consensus.Config.LeaderHeartbeatCount = 2
3330+
n.Consensus.Config.ViewChangeTimeout = 20 * time.Second
3331+
nodes = append(nodes, n)
3332+
}
3333+
3334+
var (
3335+
once sync.Once
3336+
once1 sync.Once
3337+
)
3338+
leaderComplaneCh := make(chan struct{})
3339+
abortingCh := make(chan struct{}, 1)
3340+
nextSeqCh := make(chan struct{}, 1)
3341+
nextStep := make(chan struct{}, 1)
3342+
3343+
baseLogger := nodes[3].logger.Desugar()
3344+
nodes[3].logger = baseLogger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
3345+
if strings.Contains(entry.Message, "Heartbeat timeout expired") {
3346+
once.Do(func() {
3347+
leaderComplaneCh <- struct{}{}
3348+
})
3349+
}
3350+
if strings.Contains(entry.Message, "Aborting current view with number") {
3351+
once1.Do(func() {
3352+
abortingCh <- struct{}{}
3353+
<-nextStep
3354+
})
3355+
}
3356+
if strings.Contains(entry.Message, "Sequence: 2-->3") {
3357+
nextSeqCh <- struct{}{}
3358+
<-nextStep
3359+
}
3360+
return nil
3361+
})).Sugar()
3362+
3363+
nodes[3].Setup()
3364+
3365+
startNodes(nodes, network)
3366+
3367+
var counter uint64
3368+
accelerateTime(nodes, done, true, true, &counter)
3369+
3370+
nodes[0].Submit(Request{ID: "1", ClientID: "alice"}) // submit to leader
3371+
data := make([]*AppRecord, 0)
3372+
for i := 0; i < numberOfNodes; i++ {
3373+
d := <-nodes[i].Delivered
3374+
data = append(data, d)
3375+
}
3376+
for i := 0; i < numberOfNodes-1; i++ {
3377+
assert.Equal(t, data[i], data[i+1])
3378+
}
3379+
3380+
nodes[0].Submit(Request{ID: "2", ClientID: "alice"})
3381+
<-nextSeqCh
3382+
3383+
// Emulation of node removal from the consensus in the absence of heartbeat
3384+
nodes[3].Disconnect()
3385+
<-leaderComplaneCh
3386+
nodes[3].Connect()
3387+
3388+
<-abortingCh
3389+
3390+
close(nextStep)
3391+
data = make([]*AppRecord, 0, 4)
3392+
fail := time.After(15 * time.Second)
3393+
for i := 0; i < numberOfNodes; i++ {
3394+
select {
3395+
case d := <-nodes[i].Delivered:
3396+
data = append(data, d)
3397+
case <-fail:
3398+
buf := make([]byte, 1<<16)
3399+
runtime.Stack(buf, true)
3400+
fmt.Printf("%s", buf)
3401+
t.Fatal("Didn't deliver two msg")
3402+
}
3403+
}
3404+
3405+
for i := 0; i < numberOfNodes-1; i++ {
3406+
assert.Equal(t, data[i], data[i+1])
3407+
}
3408+
}
3409+
33093410
func doInBackground(f func(), stop <-chan struct{}) {
33103411
for {
33113412
select {

0 commit comments

Comments
 (0)