Skip to content

Commit a6203a2

Browse files
mpetrun5MakMuftic
andauthored
fix: handle messsages in separate routine (#155)
<!--- Provide a general summary of your changes in the Title above --> ## Description <!--- Describe your changes in detail --> ## Related Issue Or Context <!--- If suggesting a new feature or change, please discuss it in an issue first --> <!--- If fixing a bug, there should be an issue describing it with steps to reproduce --> <!--- Otherwise, describe context and motivation for change herre --> Closes: #<issue> ## How Has This Been Tested? Testing details. <!--- Please describe in detail how you tested your changes. --> <!--- Include details of your testing environment, and the tests you ran to --> <!--- see how your change affects other areas of the code, etc. --> ## Types of changes <!--- What types of changes does your code introduce? Put an `x` in all the boxes that apply: --> - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation ## Checklist: <!--- Go over all the following points, and put an `x` in all the boxes that apply. --> <!--- If you're unsure about any of these, don't hesitate to ask. We're here to help! --> - [ ] I have commented my code, particularly in hard-to-understand areas. - [ ] I have ensured that all acceptance criteria (or expected behavior) from issue are met - [ ] I have updated the documentation locally and in docs. - [ ] I have added tests to cover my changes. - [ ] I have ensured that all the checks are passing and green, I've signed the CLA bot --------- Co-authored-by: mace <mak@chainsafe.io>
1 parent be613ef commit a6203a2

File tree

5 files changed

+127
-127
lines changed

5 files changed

+127
-127
lines changed

api/server.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ func Serve(
2727
r.HandleFunc("/health", health.HealthHandler()).Methods("GET")
2828

2929
server := &http.Server{
30-
Addr: addr,
31-
Handler: r,
30+
Addr: addr,
31+
Handler: r,
32+
ReadHeaderTimeout: time.Second * 15,
3233
}
3334
go func() {
3435
log.Info().Msgf("Starting server on %s", addr)

comm/p2p/manager.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package p2p
66
import (
77
"context"
88
"sync"
9+
"time"
910

1011
"github.com/libp2p/go-libp2p/core/host"
1112
"github.com/libp2p/go-libp2p/core/network"
@@ -14,6 +15,10 @@ import (
1415
"github.com/rs/zerolog/log"
1516
)
1617

18+
const (
19+
STREAM_TIMEOUT = time.Second * 5
20+
)
21+
1722
// StreamManager manages instances of network.Stream
1823
type StreamManager struct {
1924
streamsByPeer map[peer.ID]network.Stream
@@ -37,6 +42,7 @@ func (sm *StreamManager) CloseStream(peerID peer.ID) {
3742
sm.streamLocker.Lock()
3843
stream, ok := sm.streamsByPeer[peerID]
3944
if !ok {
45+
sm.streamLocker.Unlock()
4046
return
4147
}
4248

@@ -57,7 +63,9 @@ func (sm *StreamManager) Stream(peerID peer.ID) (network.Stream, error) {
5763

5864
stream, ok := sm.streamsByPeer[peerID]
5965
if !ok {
60-
stream, err := sm.host.NewStream(context.TODO(), peerID, sm.protocolID)
66+
ctx, cancel := context.WithTimeout(context.Background(), STREAM_TIMEOUT)
67+
defer cancel()
68+
stream, err := sm.host.NewStream(ctx, peerID, sm.protocolID)
6169
if err != nil {
6270
return nil, err
6371
}

tss/coordinator.go

Lines changed: 46 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,12 @@ import (
1010
"sync"
1111
"time"
1212

13-
"github.com/binance-chain/tss-lib/tss"
1413
"github.com/libp2p/go-libp2p/core/host"
1514
"github.com/libp2p/go-libp2p/core/peer"
1615
"github.com/rs/zerolog/log"
1716
"github.com/sourcegraph/conc/pool"
1817
"github.com/sprintertech/sprinter-signing/comm"
1918
"github.com/sprintertech/sprinter-signing/comm/elector"
20-
"github.com/sprintertech/sprinter-signing/tss/ecdsa/common"
2119
"github.com/sprintertech/sprinter-signing/tss/message"
2220
"golang.org/x/exp/slices"
2321
)
@@ -77,7 +75,8 @@ func NewCoordinator(
7775
// Array of processes can be passed if all the processes have to have the same peer subset and
7876
// the result of all of them is needed. The processes should have an unique session ID for each one.
7977
func (c *Coordinator) Execute(ctx context.Context, tssProcesses []TssProcess, resultChn chan interface{}, coordinator peer.ID) error {
80-
sessionID := tssProcesses[0].SessionID()
78+
process := tssProcesses[0]
79+
sessionID := process.SessionID()
8180

8281
c.processLock.Lock()
8382
value, ok := c.pendingProcesses[sessionID]
@@ -111,60 +110,26 @@ func (c *Coordinator) Execute(ctx context.Context, tssProcesses []TssProcess, re
111110
log.Info().Str("SessionID", sessionID).Msgf("Starting process with coordinator %s", coordinator.String())
112111

113112
p.Go(func(ctx context.Context) error {
114-
err := c.start(ctx, tssProcesses, coordinator, resultChn, []peer.ID{})
113+
err := c.start(ctx, process, coordinator, resultChn, []peer.ID{})
115114
if err == nil {
116115
cancel()
117116
}
118117
return err
119118
})
120119
p.Go(func(ctx context.Context) error {
121-
return c.watchExecution(ctx, tssProcesses[0], coordinator, cancel)
120+
return c.watchExecution(ctx, process, coordinator, cancel)
122121
})
123122
err := p.Wait()
124123
if err == nil {
125124
return nil
126125
}
127126

128-
if !tssProcesses[0].Retryable() {
129-
return err
130-
}
131-
132-
return c.handleError(ctx, err, tssProcesses, resultChn)
133-
}
134-
135-
func (c *Coordinator) handleError(ctx context.Context, err error, tssProcesses []TssProcess, resultChn chan interface{}) error {
136-
ctx, cancel := context.WithCancel(ctx)
137-
defer cancel()
138-
139-
rp := pool.New().WithContext(ctx).WithCancelOnError()
140-
rp.Go(func(ctx context.Context) error {
141-
return c.watchExecution(ctx, tssProcesses[0], peer.ID(""), cancel)
142-
})
143-
sessionID := tssProcesses[0].SessionID()
144-
145-
var commError *comm.CommunicationError
146127
var subsetError *SubsetError
147-
var tssError *tss.Error
148-
if errors.As(err, &commError) {
149-
log.Err(err).Str("SessionID", sessionID).Msgf("Tss process failed with error %+v", err)
150-
rp.Go(func(ctx context.Context) error { return c.retry(ctx, tssProcesses, resultChn, []peer.ID{}) })
151-
} else if errors.As(err, &tssError) {
152-
log.Err(err).Str("SessionID", sessionID).Msgf("Tss process failed with error %+v", err)
153-
excludedPeers, err := common.PeersFromParties(tssError.Culprits())
154-
if err != nil {
155-
return err
156-
}
157-
rp.Go(func(ctx context.Context) error { return c.retry(ctx, tssProcesses, resultChn, excludedPeers) })
158-
} else if errors.As(err, &subsetError) {
159-
// wait for start message if existing signing process fails
160-
rp.Go(func(ctx context.Context) error {
161-
return c.waitForStart(ctx, tssProcesses, resultChn, peer.ID(""))
162-
})
163-
} else {
164-
return err
128+
if errors.As(err, &subsetError) {
129+
return nil
165130
}
166131

167-
return rp.Wait()
132+
return err
168133
}
169134

170135
func (c *Coordinator) watchExecution(ctx context.Context, tssProcess TssProcess, coordinator peer.ID, cancel context.CancelFunc) error {
@@ -180,6 +145,8 @@ func (c *Coordinator) watchExecution(ctx context.Context, tssProcess TssProcess,
180145
select {
181146
case <-ticker.C:
182147
{
148+
log.Error().Str("SessionID", tssProcess.SessionID()).Msgf("Process timed out")
149+
tssProcess.Stop()
183150
cancel()
184151
return nil
185152
}
@@ -201,24 +168,18 @@ func (c *Coordinator) watchExecution(ctx context.Context, tssProcess TssProcess,
201168
}
202169

203170
// start initiates listeners for coordinator and participants with static calculated coordinator
204-
func (c *Coordinator) start(ctx context.Context, tssProcesses []TssProcess, coordinator peer.ID, resultChn chan interface{}, excludedPeers []peer.ID) error {
171+
func (c *Coordinator) start(
172+
ctx context.Context,
173+
tssProcess TssProcess,
174+
coordinator peer.ID,
175+
resultChn chan interface{},
176+
excludedPeers []peer.ID,
177+
) error {
205178
if coordinator.String() == c.host.ID().String() {
206-
return c.initiate(ctx, tssProcesses, resultChn, excludedPeers)
179+
return c.initiate(ctx, tssProcess, resultChn, excludedPeers)
207180
} else {
208-
return c.waitForStart(ctx, tssProcesses, resultChn, coordinator)
209-
}
210-
}
211-
212-
// retry initiates full bully process to calculate coordinator and starts a new tss process after
213-
// an expected error occurred during regular tss execution
214-
func (c *Coordinator) retry(ctx context.Context, tssProcesses []TssProcess, resultChn chan interface{}, excludedPeers []peer.ID) error {
215-
coordinatorElector := c.electorFactory.CoordinatorElector(tssProcesses[0].SessionID(), elector.Static)
216-
coordinator, err := coordinatorElector.Coordinator(ctx, common.ExcludePeers(tssProcesses[0].ValidCoordinators(), excludedPeers))
217-
if err != nil {
218-
return err
181+
return c.waitForStart(ctx, tssProcess, resultChn, coordinator)
219182
}
220-
221-
return c.start(ctx, tssProcesses, coordinator, resultChn, excludedPeers)
222183
}
223184

224185
// broadcastInitiateMsg sends TssInitiateMsg to all peers
@@ -232,12 +193,17 @@ func (c *Coordinator) broadcastInitiateMsg(sessionID string) {
232193
// initiate sends initiate message to all peers and waits
233194
// for ready response. After tss process declares that enough
234195
// peers are ready, start message is broadcasted and tss process is started.
235-
func (c *Coordinator) initiate(ctx context.Context, tssProcesses []TssProcess, resultChn chan interface{}, excludedPeers []peer.ID) error {
196+
func (c *Coordinator) initiate(
197+
ctx context.Context,
198+
tssProcess TssProcess,
199+
resultChn chan interface{},
200+
excludedPeers []peer.ID,
201+
) error {
236202
readyChan := make(chan *comm.WrappedMessage)
237203
readyPeers := make([]peer.ID, 0)
238204
readyPeers = append(readyPeers, c.host.ID())
205+
errChn := make(chan error)
239206

240-
tssProcess := tssProcesses[0]
241207
subID := c.communication.Subscribe(tssProcess.SessionID(), comm.TssReadyMsg, readyChan)
242208
defer c.communication.UnSubscribe(subID)
243209

@@ -246,6 +212,8 @@ func (c *Coordinator) initiate(ctx context.Context, tssProcesses []TssProcess, r
246212
c.broadcastInitiateMsg(tssProcess.SessionID())
247213
for {
248214
select {
215+
case err := <-errChn:
216+
return err
249217
case wMsg := <-readyChan:
250218
{
251219
log.Debug().Str("SessionID", tssProcess.SessionID()).Msgf("received ready message from %s", wMsg.From)
@@ -267,14 +235,14 @@ func (c *Coordinator) initiate(ctx context.Context, tssProcesses []TssProcess, r
267235
}
268236

269237
_ = c.communication.Broadcast(c.host.Peerstore().Peers(), startMsgBytes, comm.TssStartMsg, tssProcess.SessionID())
270-
p := pool.New().WithContext(ctx).WithCancelOnError()
271-
for _, process := range tssProcesses {
272-
tssProcess := process
273-
p.Go(func(ctx context.Context) error {
274-
return tssProcess.Run(ctx, true, resultChn, startParams)
275-
})
276-
}
277-
return p.Wait()
238+
ticker.Stop()
239+
go func() {
240+
err := tssProcess.Run(ctx, true, resultChn, startParams)
241+
select {
242+
case errChn <- err:
243+
default:
244+
}
245+
}()
278246
}
279247
case <-ticker.C:
280248
{
@@ -292,19 +260,19 @@ func (c *Coordinator) initiate(ctx context.Context, tssProcesses []TssProcess, r
292260
// when it receives the start message.
293261
func (c *Coordinator) waitForStart(
294262
ctx context.Context,
295-
tssProcesses []TssProcess,
263+
tssProcess TssProcess,
296264
resultChn chan interface{},
297265
coordinator peer.ID,
298266
) error {
299267
msgChan := make(chan *comm.WrappedMessage)
300268
startMsgChn := make(chan *comm.WrappedMessage)
301269

302-
tssProcess := tssProcesses[0]
303270
initSubID := c.communication.Subscribe(tssProcess.SessionID(), comm.TssInitiateMsg, msgChan)
304271
defer c.communication.UnSubscribe(initSubID)
305272
startSubID := c.communication.Subscribe(tssProcess.SessionID(), comm.TssStartMsg, startMsgChn)
306273
defer c.communication.UnSubscribe(startSubID)
307274

275+
errChn := make(chan error)
308276
for {
309277
select {
310278
case wMsg := <-msgChan:
@@ -319,6 +287,8 @@ func (c *Coordinator) waitForStart(
319287
peer.IDSlice{wMsg.From}, []byte{}, comm.TssReadyMsg, tssProcess.SessionID(),
320288
)
321289
}
290+
case err := <-errChn:
291+
return err
322292
case startMsg := <-startMsgChn:
323293
{
324294
log.Debug().Str("SessionID", tssProcess.SessionID()).Msgf("received start message from %s", startMsg.From)
@@ -335,14 +305,13 @@ func (c *Coordinator) waitForStart(
335305
return err
336306
}
337307

338-
p := pool.New().WithContext(ctx).WithCancelOnError()
339-
for _, process := range tssProcesses {
340-
tssProcess := process
341-
p.Go(func(ctx context.Context) error {
342-
return tssProcess.Run(ctx, false, resultChn, msg.Params)
343-
})
344-
}
345-
return p.Wait()
308+
go func() {
309+
err := tssProcess.Run(ctx, false, resultChn, msg.Params)
310+
select {
311+
case errChn <- err:
312+
default:
313+
}
314+
}()
346315
}
347316
case <-ctx.Done():
348317
{

0 commit comments

Comments
 (0)