Skip to content

Commit d052869

Browse files
committed
add replication outbound msgs
1 parent 3f74024 commit d052869

File tree

4 files changed

+150
-66
lines changed

4 files changed

+150
-66
lines changed

message/messagemock/outbound_message_builder.go

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

message/simplex_outbound_msg_builder.go

Lines changed: 115 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,18 @@ type SimplexOutboundMessageBuilder interface {
5151
blockHeader simplex.BlockHeader,
5252
qc []byte,
5353
) (OutboundMessage, error)
54+
55+
ReplicationRequest(
56+
chainID ids.ID,
57+
seqs []uint64,
58+
latestRound uint64,
59+
) (OutboundMessage, error)
60+
61+
VerifiedReplicationResponse(
62+
chainID ids.ID,
63+
data []simplex.VerifiedQuorumRound,
64+
latestRound *simplex.VerifiedQuorumRound,
65+
) (OutboundMessage, error)
5466
}
5567

5668
func (b *outMsgBuilder) BlockProposal(
@@ -67,16 +79,7 @@ func (b *outMsgBuilder) BlockProposal(
6779
BlockProposal: &p2p.BlockProposal{
6880
Block: block,
6981
Vote: &p2p.Vote{
70-
Vote: &p2p.BlockHeader{
71-
Metadata: &p2p.ProtocolMetadata{
72-
Version: uint32(vote.Vote.Version),
73-
Epoch: vote.Vote.Epoch,
74-
Round: vote.Vote.Round,
75-
Seq: vote.Vote.Seq,
76-
Prev: vote.Vote.Prev[:],
77-
},
78-
Digest: vote.Vote.Digest[:],
79-
},
82+
Vote: b.simplexMetadataToP2P(vote.Vote.BlockHeader),
8083
Signature: &p2p.Signature{
8184
Signer: vote.Signature.Signer[:],
8285
Value: vote.Signature.Value[:],
@@ -104,16 +107,7 @@ func (b *outMsgBuilder) Vote(
104107
ChainId: chainID[:],
105108
Message: &p2p.Simplex_Vote{
106109
Vote: &p2p.Vote{
107-
Vote: &p2p.BlockHeader{
108-
Metadata: &p2p.ProtocolMetadata{
109-
Version: uint32(blockHeader.Version),
110-
Epoch: blockHeader.Epoch,
111-
Round: blockHeader.Round,
112-
Seq: blockHeader.Seq,
113-
Prev: blockHeader.Prev[:],
114-
},
115-
Digest: blockHeader.Digest[:],
116-
},
110+
Vote: b.simplexMetadataToP2P(blockHeader),
117111
Signature: &p2p.Signature{
118112
Signer: signature.Signer[:],
119113
Value: signature.Value[:],
@@ -140,13 +134,7 @@ func (b *outMsgBuilder) EmptyVote(
140134
ChainId: chainID[:],
141135
Message: &p2p.Simplex_EmptyVote{
142136
EmptyVote: &p2p.EmptyVote{
143-
Vote: &p2p.ProtocolMetadata{
144-
Version: uint32(protocolMetadata.Version),
145-
Epoch: protocolMetadata.Epoch,
146-
Round: protocolMetadata.Round,
147-
Seq: protocolMetadata.Seq,
148-
Prev: protocolMetadata.Prev[:],
149-
},
137+
Vote: b.simplexProtocolMetadataToP2p(protocolMetadata),
150138
Signature: &p2p.Signature{
151139
Signer: signature.Signer[:],
152140
Value: signature.Value[:],
@@ -173,16 +161,7 @@ func (b *outMsgBuilder) FinalizeVote(
173161
ChainId: chainID[:],
174162
Message: &p2p.Simplex_FinalizeVote{
175163
FinalizeVote: &p2p.Vote{
176-
Vote: &p2p.BlockHeader{
177-
Metadata: &p2p.ProtocolMetadata{
178-
Version: uint32(blockHeader.Version),
179-
Epoch: blockHeader.Epoch,
180-
Round: blockHeader.Round,
181-
Seq: blockHeader.Seq,
182-
Prev: blockHeader.Prev[:],
183-
},
184-
Digest: blockHeader.Digest[:],
185-
},
164+
Vote: b.simplexMetadataToP2P(blockHeader),
186165
Signature: &p2p.Signature{
187166
Signer: signature.Signer[:],
188167
Value: signature.Value[:],
@@ -209,16 +188,7 @@ func (b *outMsgBuilder) Notarization(
209188
ChainId: chainID[:],
210189
Message: &p2p.Simplex_Notarization{
211190
Notarization: &p2p.QuorumCertificate{
212-
Finalization: &p2p.BlockHeader{
213-
Metadata: &p2p.ProtocolMetadata{
214-
Version: uint32(blockHeader.Version),
215-
Epoch: blockHeader.Epoch,
216-
Round: blockHeader.Round,
217-
Seq: blockHeader.Seq,
218-
Prev: blockHeader.Prev[:],
219-
},
220-
Digest: blockHeader.Digest[:],
221-
},
191+
Finalization: b.simplexMetadataToP2P(blockHeader),
222192
QuorumCertificate: qc,
223193
},
224194
},
@@ -242,13 +212,7 @@ func (b *outMsgBuilder) EmptyNotarization(
242212
ChainId: chainID[:],
243213
Message: &p2p.Simplex_EmptyNotarization{
244214
EmptyNotarization: &p2p.EmptyNotarization{
245-
EmptyVote: &p2p.ProtocolMetadata{
246-
Version: uint32(protocolMetadata.Version),
247-
Epoch: protocolMetadata.Epoch,
248-
Round: protocolMetadata.Round,
249-
Seq: protocolMetadata.Seq,
250-
Prev: protocolMetadata.Prev[:],
251-
},
215+
EmptyVote: b.simplexProtocolMetadataToP2p(protocolMetadata),
252216
QuorumCertificate: qc,
253217
},
254218
},
@@ -272,16 +236,7 @@ func (b *outMsgBuilder) Finalization(
272236
ChainId: chainID[:],
273237
Message: &p2p.Simplex_Finalization{
274238
Finalization: &p2p.QuorumCertificate{
275-
Finalization: &p2p.BlockHeader{
276-
Metadata: &p2p.ProtocolMetadata{
277-
Version: uint32(blockHeader.Version),
278-
Epoch: blockHeader.Epoch,
279-
Round: blockHeader.Round,
280-
Seq: blockHeader.Seq,
281-
Prev: blockHeader.Prev[:],
282-
},
283-
Digest: blockHeader.Digest[:],
284-
},
239+
Finalization: b.simplexMetadataToP2P(blockHeader),
285240
QuorumCertificate: qc,
286241
},
287242
},
@@ -292,3 +247,99 @@ func (b *outMsgBuilder) Finalization(
292247
false,
293248
)
294249
}
250+
251+
func (b *outMsgBuilder) ReplicationRequest(
252+
chainID ids.ID,
253+
seqs []uint64,
254+
latestRound uint64,
255+
) (OutboundMessage, error) {
256+
return b.builder.createOutbound(
257+
&p2p.Message{
258+
Message: &p2p.Message_Simplex{
259+
Simplex: &p2p.Simplex{
260+
ChainId: chainID[:],
261+
Message: &p2p.Simplex_ReplicationRequest{
262+
ReplicationRequest: &p2p.ReplicationRequest{
263+
Seqs: seqs,
264+
LatestRound: latestRound,
265+
},
266+
},
267+
},
268+
},
269+
},
270+
b.compressionType,
271+
false,
272+
)
273+
}
274+
275+
func (b *outMsgBuilder) VerifiedReplicationResponse(
276+
chainID ids.ID,
277+
data []simplex.VerifiedQuorumRound,
278+
latestRound *simplex.VerifiedQuorumRound,
279+
) (OutboundMessage, error) {
280+
var qrs []*p2p.QuorumRound
281+
for _, qr := range data {
282+
qrs = append(qrs, b.simplexQuorumRoundToP2P(&qr))
283+
}
284+
285+
return b.builder.createOutbound(
286+
&p2p.Message{
287+
Message: &p2p.Message_Simplex{
288+
Simplex: &p2p.Simplex{
289+
ChainId: chainID[:],
290+
Message: &p2p.Simplex_ReplicationResponse{
291+
ReplicationResponse: &p2p.ReplicationResponse{
292+
Data: qrs,
293+
LatestRound: b.simplexQuorumRoundToP2P(latestRound),
294+
},
295+
},
296+
},
297+
},
298+
},
299+
b.compressionType,
300+
false,
301+
)
302+
}
303+
304+
func (b *outMsgBuilder) simplexMetadataToP2P(bh simplex.BlockHeader) *p2p.BlockHeader {
305+
return &p2p.BlockHeader{
306+
Metadata: b.simplexProtocolMetadataToP2p(bh.ProtocolMetadata),
307+
Digest: bh.Digest[:],
308+
}
309+
}
310+
311+
func (b *outMsgBuilder) simplexProtocolMetadataToP2p(md simplex.ProtocolMetadata) *p2p.ProtocolMetadata {
312+
return &p2p.ProtocolMetadata{
313+
Version: uint32(md.Version),
314+
Epoch: md.Epoch,
315+
Round: md.Round,
316+
Seq: md.Seq,
317+
Prev: md.Prev[:],
318+
}
319+
}
320+
321+
func (b *outMsgBuilder) simplexQuorumRoundToP2P(qr *simplex.VerifiedQuorumRound) *p2p.QuorumRound {
322+
var p2pQR *p2p.QuorumRound
323+
if qr.VerifiedBlock != nil {
324+
p2pQR.Block = qr.VerifiedBlock.Bytes()
325+
}
326+
if qr.Notarization != nil {
327+
p2pQR.Notarization = &p2p.QuorumCertificate{
328+
Finalization: b.simplexMetadataToP2P(qr.Notarization.Vote.BlockHeader),
329+
QuorumCertificate: qr.Notarization.QC.Bytes(),
330+
}
331+
}
332+
if qr.Finalization != nil {
333+
p2pQR.Finalization = &p2p.QuorumCertificate{
334+
Finalization: b.simplexMetadataToP2P(qr.Finalization.Finalization.BlockHeader),
335+
QuorumCertificate: qr.Finalization.QC.Bytes(),
336+
}
337+
}
338+
if qr.EmptyNotarization != nil {
339+
p2pQR.EmptyNotarization = &p2p.EmptyNotarization{
340+
EmptyVote: b.simplexProtocolMetadataToP2p(qr.EmptyNotarization.Vote.ProtocolMetadata),
341+
QuorumCertificate: qr.EmptyNotarization.QC.Bytes(),
342+
}
343+
}
344+
return p2pQR
345+
}

proto/p2p/p2p.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ message EmptyVote {
483483
}
484484

485485
message QuorumCertificate {
486-
BlockHeader finalization = 1;
486+
BlockHeader finalization = 1; // TODO: update the name to something more generic
487487
bytes quorum_certificate = 2;
488488
}
489489

simplex/comm.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,10 @@ func (c *Comm) SendMessage(msg *simplex.Message, destination simplex.NodeID) {
100100
outboundMessage, err = c.msgBuilder.EmptyNotarization(c.chainID, msg.EmptyNotarization.Vote.ProtocolMetadata, msg.EmptyNotarization.QC.Bytes())
101101
case msg.Finalization != nil:
102102
outboundMessage, err = c.msgBuilder.Finalization(c.chainID, msg.Finalization.Finalization.BlockHeader, msg.Finalization.QC.Bytes())
103-
// TODO: create replication messages
103+
case msg.ReplicationRequest != nil:
104+
outboundMessage, err = c.msgBuilder.ReplicationRequest(c.chainID, msg.ReplicationRequest.Seqs, msg.ReplicationRequest.LatestRound)
105+
case msg.VerifiedReplicationResponse != nil:
106+
outboundMessage, err = c.msgBuilder.VerifiedReplicationResponse(c.chainID, msg.VerifiedReplicationResponse.Data, msg.VerifiedReplicationResponse.LatestRound)
104107
}
105108

106109
if err != nil {

0 commit comments

Comments
 (0)