Skip to content

Commit 408c460

Browse files
Merge pull request #1221 from keep-network/channel-channel-channel
BroadcastChannel: improve delivery rate Two changes to improve the delivery rate of messages. Create separate broadcast channel for DKG execution BroadcastChannel libp2p implementation synchronizes on mutex when publishing message to the pubsub and when firing handlers. When we had one channel for the given list of stakers used for all group operations, it could happen that it was used at the same time for DKG and relay entry signing if the same list of stakers was selected to a new group. As a result, some messages could not reach the destination because mutex synchronization was slowing down message exchange and the given phase could end before all messages were sent and delivered. This change separates DKG from relay entry signing. We create a temporary channel, just for DKG. The name of the final channel for a group is a group public key in a compressed form and that final channel is used always for every single relay entry signing. Initialize channel unmarshallers as early as possible We are sometimes running into a situation when relay entry signing or DKG result publication channel handlers are not yet initialized when the first message is sent by other members. This change moves initialization of entry signing and DKG result publication protocol handlers a bit earlier.
2 parents d5374c5 + ac7f4c2 commit 408c460

File tree

9 files changed

+75
-88
lines changed

9 files changed

+75
-88
lines changed

pkg/beacon/relay/dkg/dkg.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ func ExecuteDKG(
2727
// The staker index should begin with 1
2828
playerIndex := group.MemberIndex(index + 1)
2929

30+
gjkr.RegisterUnmarshallers(channel)
31+
dkgResult.RegisterUnmarshallers(channel)
32+
3033
gjkrResult, gjkrEndBlockHeight, err := gjkr.Execute(
3134
playerIndex,
3235
groupSize,

pkg/beacon/relay/dkg/result/publish.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@ import (
1111
"github.com/keep-network/keep-core/pkg/net"
1212
)
1313

14+
// RegisterUnmarshallers initializes the given broadcast channel to be able to
15+
// perform DKG result publication protocol interactions by registering all the
16+
// required protocol message unmarshallers.
17+
// The channel needs to be fully initialized before Publish is called.
18+
func RegisterUnmarshallers(channel net.BroadcastChannel) {
19+
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
20+
return &DKGResultHashSignatureMessage{}
21+
})
22+
}
23+
1424
// Publish executes Phase 13 and 14 of DKG as a state machine. First, the
1525
// chosen result is hashed, signed, and sent over a broadcast channel. Then, all
1626
// other signatures and results are received and accounted for. Those that match
@@ -37,8 +47,6 @@ func Publish(
3747
signingStartBlockHeight: startBlockHeight,
3848
}
3949

40-
initializeChannel(channel)
41-
4250
stateMachine := state.NewMachine(channel, blockCounter, initialState)
4351

4452
lastState, _, err := stateMachine.Execute(startBlockHeight)
@@ -53,11 +61,3 @@ func Publish(
5361

5462
return nil
5563
}
56-
57-
// initializeChannel initializes a given broadcast channel to be able to
58-
// perform distributed key generation interactions.
59-
func initializeChannel(channel net.BroadcastChannel) {
60-
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
61-
return &DKGResultHashSignatureMessage{}
62-
})
63-
}

pkg/beacon/relay/entry/entry.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@ const (
1616
signatureBlocks = state.MessagingStateActiveBlocks
1717
)
1818

19-
func initializeChannel(channel net.BroadcastChannel) {
19+
// RegisterUnmarshallers initializes the given broadcast channel to be able to
20+
// perform relay entry signing protocol interactions by registering all the
21+
// required protocol message unmarshallers.
22+
// The channel has to be initialized before the SignAndSubmit is called.
23+
func RegisterUnmarshallers(channel net.BroadcastChannel) {
2024
channel.RegisterUnmarshaler(
2125
func() net.TaggedUnmarshaler { return &SignatureShareMessage{} })
2226
}
@@ -33,8 +37,6 @@ func SignAndSubmit(
3337
signer *dkg.ThresholdSigner,
3438
startBlockHeight uint64,
3539
) error {
36-
initializeChannel(channel)
37-
3840
initialState := &signatureShareState{
3941
signingStateBase: signingStateBase{
4042
channel: channel,

pkg/beacon/relay/gjkr/gjkr.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,34 @@ import (
1414

1515
var logger = log.Logger("keep-gjkr")
1616

17+
// RegisterUnmarshallers initializes the given broadcast channel to be able to
18+
// perform DKG protocol interactions by registering all the required protocol
19+
// message unmarshallers.
20+
// The channel needs to be fully initialized before Execute is called.
21+
func RegisterUnmarshallers(channel net.BroadcastChannel) {
22+
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
23+
return &EphemeralPublicKeyMessage{}
24+
})
25+
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
26+
return &MemberCommitmentsMessage{}
27+
})
28+
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
29+
return &PeerSharesMessage{}
30+
})
31+
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
32+
return &SecretSharesAccusationsMessage{}
33+
})
34+
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
35+
return &MemberPublicKeySharePointsMessage{}
36+
})
37+
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
38+
return &PointsAccusationsMessage{}
39+
})
40+
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
41+
return &MisbehavedEphemeralKeysMessage{}
42+
})
43+
}
44+
1745
// Execute runs the GJKR distributed key generation protocol, given a
1846
// broadcast channel to mediate with, a block counter used for time tracking,
1947
// a player index to use in the group, dishonest threshold, and block height
@@ -42,8 +70,6 @@ func Execute(
4270
return nil, 0, fmt.Errorf("cannot create a new member: [%v]", err)
4371
}
4472

45-
initializeChannel(channel)
46-
4773
initialState := &joinState{
4874
channel: channel,
4975
member: member,
@@ -63,29 +89,3 @@ func Execute(
6389

6490
return finalizationState.result(), endBlockHeight, nil
6591
}
66-
67-
// initializeChannel initializes a given broadcast channel to be able to
68-
// perform distributed key generation interactions.
69-
func initializeChannel(channel net.BroadcastChannel) {
70-
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
71-
return &EphemeralPublicKeyMessage{}
72-
})
73-
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
74-
return &MemberCommitmentsMessage{}
75-
})
76-
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
77-
return &PeerSharesMessage{}
78-
})
79-
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
80-
return &SecretSharesAccusationsMessage{}
81-
})
82-
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
83-
return &MemberPublicKeySharePointsMessage{}
84-
})
85-
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
86-
return &PointsAccusationsMessage{}
87-
})
88-
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {
89-
return &MisbehavedEphemeralKeysMessage{}
90-
})
91-
}

pkg/beacon/relay/gjkr/states_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestFullStateTransitions(t *testing.T) {
4343
t.Fatal(err)
4444
}
4545

46-
initializeChannel(channel)
46+
RegisterUnmarshallers(channel)
4747

4848
channels[i] = channel
4949
states[i] = &joinState{channel, member}

pkg/beacon/relay/node.go

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package relay
33
import (
44
"bytes"
55
"crypto/ecdsa"
6-
"crypto/sha256"
76
"encoding/hex"
87
"math/big"
98
"sync"
@@ -64,20 +63,11 @@ func (n *Node) JoinGroupIfEligible(
6463
}
6564

6665
if len(indexes) > 0 {
67-
// build the channel name and get the broadcast channel
68-
broadcastChannelName := channelNameForGroup(groupSelectionResult)
69-
70-
// We should only join the broadcast channel if we're
71-
// elligible for the group
72-
broadcastChannel, err := n.netProvider.ChannelFor(
73-
broadcastChannelName,
74-
)
66+
// create temporary broadcast channel for DKG using the group selection
67+
// seed
68+
broadcastChannel, err := n.netProvider.ChannelFor(newEntry.Text(16))
7569
if err != nil {
76-
logger.Errorf(
77-
"failed to get broadcastChannel for name [%s] with err: [%v]",
78-
broadcastChannelName,
79-
err,
80-
)
70+
logger.Errorf("failed to get broadcast channel: [%v]", err)
8171
return
8272
}
8373

@@ -116,10 +106,13 @@ func (n *Node) JoinGroupIfEligible(
116106
return
117107
}
118108

119-
err = n.groupRegistry.RegisterGroup(
120-
signer,
121-
broadcastChannelName,
109+
// final broadcast channel name for group is the compressed
110+
// public key of the group
111+
channelName := hex.EncodeToString(
112+
signer.GroupPublicKeyBytesCompressed(),
122113
)
114+
115+
err = n.groupRegistry.RegisterGroup(signer, channelName)
123116
if err != nil {
124117
logger.Errorf("failed to register a group: [%v]", err)
125118
}
@@ -130,22 +123,6 @@ func (n *Node) JoinGroupIfEligible(
130123
return
131124
}
132125

133-
// channelNameForGroup takes the selected stakers, and does the
134-
// following to construct the broadcastChannel name:
135-
// * concatenates all of the staker values
136-
// * returns the hashed concatenated values in hexadecimal representation
137-
func channelNameForGroup(group *groupselection.Result) string {
138-
var channelNameBytes []byte
139-
for _, staker := range group.SelectedStakers {
140-
channelNameBytes = append(channelNameBytes, staker...)
141-
}
142-
143-
hash := sha256.Sum256(channelNameBytes)
144-
hexChannelName := hex.EncodeToString(hash[:])
145-
146-
return hexChannelName
147-
}
148-
149126
func candidateGroupMembersFilter(
150127
selectedStakers []relaychain.StakerAddress,
151128
signing chain.Signing,

pkg/beacon/relay/relay.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,25 +105,23 @@ func (n *Node) GenerateRelayEntry(
105105
return
106106
}
107107

108-
for _, signer := range memberships {
109-
go func(signer *registry.Membership) {
110-
channel, err := n.netProvider.ChannelFor(signer.ChannelName)
111-
if err != nil {
112-
logger.Errorf(
113-
"could not create broadcast channel with name [%v]: [%v]",
114-
signer.ChannelName,
115-
err,
116-
)
117-
return
118-
}
108+
channel, err := n.netProvider.ChannelFor(memberships[0].ChannelName)
109+
if err != nil {
110+
logger.Errorf("could not create broadcast channel: [%v]", err)
111+
return
112+
}
113+
114+
entry.RegisterUnmarshallers(channel)
119115

116+
for _, member := range memberships {
117+
go func(member *registry.Membership) {
120118
err = entry.SignAndSubmit(
121119
n.blockCounter,
122120
channel,
123121
relayChain,
124122
previousEntry,
125123
n.chainConfig.HonestThreshold,
126-
signer.Signer,
124+
member.Signer,
127125
startBlockHeight,
128126
)
129127
if err != nil {
@@ -133,6 +131,6 @@ func (n *Node) GenerateRelayEntry(
133131
)
134132
return
135133
}
136-
}(signer)
134+
}(member)
137135
}
138136
}

pkg/internal/dkgtest/dkgtest.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import (
1414

1515
relaychain "github.com/keep-network/keep-core/pkg/beacon/relay/chain"
1616
"github.com/keep-network/keep-core/pkg/beacon/relay/dkg"
17+
dkgResult "github.com/keep-network/keep-core/pkg/beacon/relay/dkg/result"
1718
"github.com/keep-network/keep-core/pkg/beacon/relay/event"
19+
"github.com/keep-network/keep-core/pkg/beacon/relay/gjkr"
1820
"github.com/keep-network/keep-core/pkg/beacon/relay/group"
1921
chainLocal "github.com/keep-network/keep-core/pkg/chain/local"
2022
"github.com/keep-network/keep-core/pkg/internal/interception"
@@ -122,6 +124,9 @@ func executeDKG(
122124
// make sure all members are up.
123125
startBlockHeight := currentBlockHeight + 3
124126

127+
gjkr.RegisterUnmarshallers(broadcastChannel)
128+
dkgResult.RegisterUnmarshallers(broadcastChannel)
129+
125130
for i := 0; i < relayConfig.GroupSize; i++ {
126131
i := i // capture for goroutine
127132
go func() {

pkg/internal/entrytest/entrytest.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ func executeSigning(
128128
// make sure all signers are ready
129129
startBlockHeight := currentBlockHeight + 3
130130

131+
entry.RegisterUnmarshallers(broadcastChannel)
132+
131133
for _, signer := range signers {
132134
go func(signer *dkg.ThresholdSigner) {
133135
err := entry.SignAndSubmit(

0 commit comments

Comments
 (0)