Skip to content

Commit c270fae

Browse files
committed
WIP
1 parent e510b43 commit c270fae

File tree

7 files changed

+37
-15
lines changed

7 files changed

+37
-15
lines changed

.github/workflows/juno-test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ jobs:
1818
fail-fast: false
1919
matrix:
2020
os: [ubuntu-latest, macos-latest, ubuntu-arm64-4-core]
21+
iteration: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
2122
runs-on: ${{ matrix.os }}
2223
env:
2324
VM_DEBUG: true

consensus/p2p/buffered/proto_broadcaster.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func (b ProtoBroadcaster[M]) Broadcast(ctx context.Context, msg M) {
4242
func (b ProtoBroadcaster[M]) Loop(ctx context.Context, topic *pubsub.Topic) {
4343
readinessOpt := pubsub.WithReadiness(pubsub.MinTopicSize(1))
4444
var rebroadcasted rebroadcastMessages
45+
4546
for {
4647
select {
4748
case <-ctx.Done():

consensus/p2p/p2p.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,30 +144,37 @@ func (p *p2p[V, H, A]) Run(ctx context.Context) error {
144144
return fmt.Errorf("unable to create gossipsub with error: %w", err)
145145
}
146146

147-
topics := make(map[topicName]*libp2p.Topic)
147+
topics := make([]*libp2p.Topic, 0, len(p.topicAttachment))
148+
relayCancels := make([]func(), 0, len(p.topicAttachment))
148149
defer func() {
150+
for _, cancel := range relayCancels {
151+
cancel()
152+
}
149153
for _, topic := range topics {
150154
topic.Close()
151155
}
152156
}()
153157

154158
wg := conc.NewWaitGroup()
159+
defer wg.Wait()
155160

156-
for topicName := range p.topicAttachment {
157-
if topics[topicName], err = gossipSub.Join(string(topicName)); err != nil {
161+
for topicName, services := range p.topicAttachment {
162+
topic, relayCancel, err := pubsub.JoinTopic(gossipSub, string(topicName))
163+
if err != nil {
158164
return fmt.Errorf("unable to join topic %s with error: %w", topicName, err)
159165
}
160-
}
161166

162-
for topicName, services := range p.topicAttachment {
167+
topics = append(topics, topic)
168+
relayCancels = append(relayCancels, relayCancel)
169+
163170
for _, service := range services {
164171
wg.Go(func() {
165-
service.Loop(ctx, topics[topicName])
172+
service.Loop(ctx, topic)
166173
})
167174
}
168175
}
169176

170-
wg.Wait()
177+
<-ctx.Done()
171178
return nil
172179
}
173180

mempool/p2p/p2p.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ func (p *P2P) Run(ctx context.Context) error {
6161
return fmt.Errorf("unable to create gossipsub with error: %w", err)
6262
}
6363

64-
topic, err := gossipSub.Join(transactionTopicName)
64+
topic, relayCancel, err := pubsub.JoinTopic(gossipSub, transactionTopicName)
6565
if err != nil {
6666
return fmt.Errorf("unable to join topic %s with error: %w", transactionTopicName, err)
6767
}
68+
defer relayCancel()
6869
defer topic.Close()
6970

7071
wg := conc.NewWaitGroup()

mempool/p2p/p2p_broadcasters_listeners_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
const (
2323
logLevel = zapcore.DebugLevel
2424
nodeCount = 3
25-
txCount = 300
25+
txCount = 10
2626
maxWait = 5 * time.Second
2727
)
2828

p2p/pubsub/pubsub.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,21 @@ func Run(
7070
)
7171
}
7272

73+
func JoinTopic(pubSub *pubsub.PubSub, topicName string) (*pubsub.Topic, func(), error) {
74+
topic, err := pubSub.Join(topicName)
75+
if err != nil {
76+
return nil, nil, fmt.Errorf("unable to join topic %s with error: %w", topicName, err)
77+
}
78+
79+
// Make sure that the host starts connecting to other nodes
80+
relayCancel, err := topic.Relay()
81+
if err != nil {
82+
return nil, nil, fmt.Errorf("unable to relay topic %s with error: %w", topicName, err)
83+
}
84+
85+
return topic, relayCancel, nil
86+
}
87+
7388
func ExtractPeers(peers string) ([]peer.AddrInfo, error) {
7489
if peers == "" {
7590
return nil, nil

p2p/pubsub/testutils/pubsub.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,12 @@ func BuildNetworks(
6464
return nodes
6565
}
6666

67-
func (n Nodes) JoinTopic(t *testing.T, chainID, protocolID protocol.ID, topic string) []*libp2p.Topic {
67+
func (n Nodes) JoinTopic(t *testing.T, chainID, protocolID protocol.ID, topicName string) []*libp2p.Topic {
6868
return iter.Map(n, func(node *Node) *libp2p.Topic {
69-
pubsub, err := pubsub.Run(t.Context(), chainID, protocolID, node.Host, config.DefaultBufferSizes.PubSubQueueSize, node.GetBootstrapPeers)
69+
pubSub, err := pubsub.Run(t.Context(), chainID, protocolID, node.Host, config.DefaultBufferSizes.PubSubQueueSize, node.GetBootstrapPeers)
7070
require.NoError(t, err)
7171

72-
topic, err := pubsub.Join(topic)
73-
require.NoError(t, err)
74-
75-
relayCancel, err := topic.Relay()
72+
topic, relayCancel, err := pubsub.JoinTopic(pubSub, topicName)
7673
require.NoError(t, err)
7774
t.Cleanup(relayCancel)
7875

0 commit comments

Comments
 (0)