Skip to content

Commit 7579548

Browse files
Merge pull request #1277 from keep-network/network-overhaul-5
Context-aware Send BroadcastChannel interface with retransmission support In #1255 we introduced context-aware message handler in Recv interface of net.BroadcastChannel and in #1260 we enhanced those handlers with retransmission support allowing them to filter out retransmissions of messages that have already been seen. Here, we make Send function context-aware and retransmit the message for the entire lifetime of the provided context with a frequency provided by retransmission.Ticker set globally. Retransmissions are filtered out in handlers.
2 parents 274bbf8 + d2b6814 commit 7579548

File tree

28 files changed

+502
-304
lines changed

28 files changed

+502
-304
lines changed

cmd/network.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/keep-network/keep-core/pkg/net"
1919
"github.com/keep-network/keep-core/pkg/net/key"
2020
"github.com/keep-network/keep-core/pkg/net/libp2p"
21+
"github.com/keep-network/keep-core/pkg/net/retransmission"
2122
"github.com/keep-network/keep-core/pkg/operator"
2223
"github.com/pborman/uuid"
2324
"github.com/urfave/cli"
@@ -92,6 +93,7 @@ func pingRequest(c *cli.Context) error {
9293
libp2pConfig,
9394
privKey,
9495
stakeMonitor,
96+
retransmission.NewTimeTicker(ctx, 50*time.Millisecond),
9597
)
9698
if err != nil {
9799
return err
@@ -174,7 +176,7 @@ func pingRequest(c *cli.Context) error {
174176

175177
go func(msg *PingMessage) {
176178
<-start
177-
err := broadcastChannel.Send(message)
179+
err := broadcastChannel.Send(ctx, message)
178180
if err != nil {
179181
fmt.Fprintf(
180182
os.Stderr,
@@ -209,7 +211,7 @@ func pingRequest(c *cli.Context) error {
209211
Sender: netProvider.ID().String(),
210212
Payload: pong + " corresponding to " + pingPayload.Payload,
211213
}
212-
err := broadcastChannel.Send(message)
214+
err := broadcastChannel.Send(ctx, message)
213215
if err != nil {
214216
return err
215217
}

cmd/start.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/keep-network/keep-core/pkg/chain/ethereum"
1212
"github.com/keep-network/keep-core/pkg/net/key"
1313
"github.com/keep-network/keep-core/pkg/net/libp2p"
14+
"github.com/keep-network/keep-core/pkg/net/retransmission"
1415
"github.com/keep-network/keep-core/pkg/operator"
1516
"github.com/urfave/cli"
1617
)
@@ -64,6 +65,11 @@ func Start(c *cli.Context) error {
6465
return fmt.Errorf("error connecting to Ethereum node: [%v]", err)
6566
}
6667

68+
blockCounter, err := chainProvider.BlockCounter()
69+
if err != nil {
70+
return err
71+
}
72+
6773
stakeMonitor, err := chainProvider.StakeMonitor()
6874
if err != nil {
6975
return fmt.Errorf("error obtaining stake monitor handle [%v]", err)
@@ -87,6 +93,7 @@ func Start(c *cli.Context) error {
8793
config.LibP2P,
8894
networkPrivateKey,
8995
stakeMonitor,
96+
retransmission.NewTicker(blockCounter.WatchBlocks(ctx)),
9097
)
9198
if err != nil {
9299
return err

config.toml.SAMPLE

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,6 @@
2525
# Port = 3920
2626
# # Uncomment to override the node's default addresses announced in the network
2727
# # AnnouncedAddresses = ["/dns4/example.com/tcp/3919", "/ip4/80.70.60.50/tcp/3919"]
28-
#
29-
# # Number of retransmissions for each message
30-
# RetransmissionCycles = 5
31-
# # Milliseconds interval between each retransmission of a message
32-
# RetransmissionInterval = 1000
3328

3429
[Storage]
3530
DataDir = "/my/secure/location"

docs/keep-client-quickstart.adoc

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,6 @@ default address then leave `AnnouncedAddresses` set to `[]` or remove the entry
4545

4646
`AnnouncedAddresses = ["/dns4/example.com/tcp/3919", "/ip4/80.70.60.50/tcp/3919"]`
4747

48-
=== Message retransmission
49-
50-
There is no guarantee that all messages sent by the node will reach all of the
51-
expected receivers. Unexpected network issues may cause messages to be dropped.
52-
In order to increase delivery rate, message retransmission can be enabled
53-
by setting `RetransmissionCycles` and `RetransmissionInterval`
54-
in `[libp2p]` section. The first option determines the number of retransmission
55-
cycles which should be performed. The second option sets the milliseconds interval
56-
which should be preserved between cycles. Specific values should be chosen
57-
reasonably because high number of cycles and short interval value, may lead to
58-
increased resource usage and network congestion.
59-
6048
== Starting The Client
6149

6250
*Depending on how you orchestrate containers, these steps will vary. Here we illustrate

infrastructure/kube/templates/keep-client/initcontainer/provision-keep-client/keep-client-config-template.toml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,5 @@
2828
# or DNS entries you want to route traffic through.
2929
AnnouncedAddresses = []
3030

31-
# TODO: Make these configurable
32-
# Number of retransmissions for each message
33-
RetransmissionCycles = 5
34-
# Milliseconds interval between each retransmission of a message
35-
RetransmissionInterval = 1000
36-
3731
[Storage]
3832
DataDir = "Set by Kube"

infrastructure/kube/templates/keep-client/initcontainer/provision-keep-client/provision-keep-client.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ async function createKeepClientConfig(operator) {
220220
Here we format the default rendering to write the config file with Seed/Port values as needed.
221221
*/
222222
let formattedConfigFile = tomlify.toToml(parsedConfigFile, {
223-
replace: (key, value) => { return (key == 'RetransmissionCycles' || key == 'RetransmissionInterval' || key == 'Port') ? value.toFixed(0) : false }
223+
replace: (key, value) => { return (key == 'Port') ? value.toFixed(0) : false }
224224
});
225225

226226
fs.writeFile('/mnt/keep-client/config/keep-client-config.toml', formattedConfigFile, (error) => {

pkg/beacon/relay/dkg/result/states.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (rss *resultSigningState) Initiate(ctx context.Context) error {
4747
if err != nil {
4848
return err
4949
}
50-
if err := rss.channel.Send(message); err != nil {
50+
if err := rss.channel.Send(ctx, message); err != nil {
5151
return err
5252
}
5353
return nil

pkg/beacon/relay/entry/states.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (sss *signatureShareState) Initiate(ctx context.Context) error {
6969
sss.MemberIndex(),
7070
sss.selfSignatureShare.Marshal(),
7171
}
72-
if err := sss.channel.Send(message); err != nil {
72+
if err := sss.channel.Send(ctx, message); err != nil {
7373
return err
7474
}
7575
return nil

pkg/beacon/relay/gjkr/states.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (ekpgs *ephemeralKeyPairGenerationState) Initiate(ctx context.Context) erro
7171
return err
7272
}
7373

74-
if err := ekpgs.channel.Send(message); err != nil {
74+
if err := ekpgs.channel.Send(ctx, message); err != nil {
7575
return err
7676
}
7777
return nil
@@ -169,11 +169,11 @@ func (cs *commitmentState) Initiate(ctx context.Context) error {
169169
return err
170170
}
171171

172-
if err := cs.channel.Send(sharesMsg); err != nil {
172+
if err := cs.channel.Send(ctx, sharesMsg); err != nil {
173173
return err
174174
}
175175

176-
if err := cs.channel.Send(commitmentsMsg); err != nil {
176+
if err := cs.channel.Send(ctx, commitmentsMsg); err != nil {
177177
return err
178178
}
179179

@@ -251,7 +251,7 @@ func (cvs *commitmentsVerificationState) Initiate(ctx context.Context) error {
251251
return err
252252
}
253253

254-
if err := cvs.channel.Send(accusationsMsg); err != nil {
254+
if err := cvs.channel.Send(ctx, accusationsMsg); err != nil {
255255
return err
256256
}
257257

@@ -394,7 +394,7 @@ func (pss *pointsShareState) ActiveBlocks() uint64 {
394394

395395
func (pss *pointsShareState) Initiate(ctx context.Context) error {
396396
message := pss.member.CalculatePublicKeySharePoints()
397-
if err := pss.channel.Send(message); err != nil {
397+
if err := pss.channel.Send(ctx, message); err != nil {
398398
return err
399399
}
400400

@@ -457,7 +457,7 @@ func (pvs *pointsValidationState) Initiate(ctx context.Context) error {
457457
return err
458458
}
459459

460-
if err := pvs.channel.Send(accusationMsg); err != nil {
460+
if err := pvs.channel.Send(ctx, accusationMsg); err != nil {
461461
return err
462462
}
463463

@@ -563,7 +563,7 @@ func (rs *keyRevealState) Initiate(ctx context.Context) error {
563563
return err
564564
}
565565

566-
if err := rs.channel.Send(revealMsg); err != nil {
566+
if err := rs.channel.Send(ctx, revealMsg); err != nil {
567567
return err
568568
}
569569

pkg/beacon/relay/state/machine_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,19 @@ func TestExecute(t *testing.T) {
3030

3131
go func(blockCounter chain.BlockCounter) {
3232
blockCounter.WaitForBlockHeight(1)
33-
channel.Send(&TestMessage{"message_1"})
33+
ctx, cancel := context.WithCancel(context.Background())
34+
channel.Send(ctx, &TestMessage{"message_1"})
35+
cancel()
3436

3537
blockCounter.WaitForBlockHeight(4)
36-
channel.Send(&TestMessage{"message_2"})
38+
ctx, cancel = context.WithCancel(context.Background())
39+
channel.Send(ctx, &TestMessage{"message_2"})
40+
cancel()
3741

3842
blockCounter.WaitForBlockHeight(7)
39-
channel.Send(&TestMessage{"message_3"})
43+
ctx, cancel = context.WithCancel(context.Background())
44+
channel.Send(ctx, &TestMessage{"message_3"})
45+
cancel()
4046
}(blockCounter)
4147

4248
channel.RegisterUnmarshaler(func() net.TaggedUnmarshaler {

0 commit comments

Comments
 (0)