diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index 36dea5826897a..fdd7dbbd86a00 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -127,11 +127,11 @@ pub struct RejectedTransactionSummary { #[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)] pub struct ProofWithData { - pub proofs: Vec, + pub proofs: Vec>, } impl ProofWithData { - pub fn new(proofs: Vec) -> Self { + pub fn new(proofs: Vec>) -> Self { Self { proofs } } @@ -516,9 +516,9 @@ impl Payload { } fn verify_with_cache( - proofs: &[ProofOfStore], + proofs: &[ProofOfStore], validator: &ValidatorVerifier, - proof_cache: &ProofCache, + proof_cache: &ProofCache, ) -> anyhow::Result<()> { let unverified: Vec<_> = proofs .iter() @@ -571,7 +571,7 @@ impl Payload { pub fn verify( &self, verifier: &ValidatorVerifier, - proof_cache: &ProofCache, + proof_cache: &ProofCache, quorum_store_enabled: bool, ) -> anyhow::Result<()> { match (quorum_store_enabled, self) { diff --git a/consensus/consensus-types/src/opt_proposal_msg.rs b/consensus/consensus-types/src/opt_proposal_msg.rs index e26e78032cc80..cf6c21a0142b1 100644 --- a/consensus/consensus-types/src/opt_proposal_msg.rs +++ b/consensus/consensus-types/src/opt_proposal_msg.rs @@ -3,7 +3,10 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - common::Author, opt_block_data::OptBlockData, proof_of_store::ProofCache, sync_info::SyncInfo, + common::Author, + opt_block_data::OptBlockData, + proof_of_store::{BatchInfo, ProofCache}, + sync_info::SyncInfo, }; use anyhow::{ensure, Context, Result}; use aptos_types::validator_verifier::ValidatorVerifier; @@ -98,7 +101,7 @@ impl OptProposalMsg { &self, sender: Author, validator: &ValidatorVerifier, - proof_cache: &ProofCache, + proof_cache: &ProofCache, quorum_store_enabled: bool, ) -> Result<()> { ensure!( diff --git a/consensus/consensus-types/src/payload.rs b/consensus/consensus-types/src/payload.rs index b9c1874e1a2a9..7c5c941da78d8 100644 --- a/consensus/consensus-types/src/payload.rs +++ b/consensus/consensus-types/src/payload.rs @@ -13,7 +13,7 @@ use std::{ pub type OptBatches = BatchPointer; -pub type ProofBatches = BatchPointer; +pub type ProofBatches = BatchPointer>; pub trait TDataInfo { fn num_txns(&self) -> u64; @@ -386,7 +386,7 @@ impl OptQuorumStorePayload { &self.inline_batches } - pub fn proof_with_data(&self) -> &BatchPointer { + pub fn proof_with_data(&self) -> &BatchPointer> { &self.proofs } diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index 9a912046a5226..0b53b065c37b0 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -3,7 +3,7 @@ use crate::{payload::TDataInfo, utils::PayloadTxnsSize}; use anyhow::{bail, ensure, Context}; -use aptos_crypto::{bls12381, CryptoMaterialError, HashValue}; +use aptos_crypto::{bls12381, hash::CryptoHash, CryptoMaterialError, HashValue}; use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher}; use aptos_types::{ aggregate_signature::AggregateSignature, ledger_info::SignatureWithStatus, @@ -14,11 +14,23 @@ use mini_moka::sync::Cache; use rand::{seq::SliceRandom, thread_rng}; use serde::{Deserialize, Serialize}; use std::{ - fmt::{Display, Formatter}, + fmt::{Debug, Display, Formatter}, hash::Hash, ops::Deref, }; +pub trait TBatchInfo: Serialize + CryptoHash + Debug + Clone + Hash + Eq { + fn epoch(&self) -> u64; + + fn expiration(&self) -> u64; + + fn num_txns(&self) -> u64; + + fn num_bytes(&self) -> u64; + + fn as_batch_info(&self) -> &BatchInfo; +} + #[derive( Clone, Debug, Deserialize, Serialize, CryptoHasher, BCSCryptoHash, PartialEq, Eq, Hash, )] @@ -93,6 +105,28 @@ impl BatchInfo { } } +impl TBatchInfo for BatchInfo { + fn epoch(&self) -> u64 { + self.epoch + } + + fn expiration(&self) -> u64 { + self.expiration + } + + fn num_txns(&self) -> u64 { + self.num_txns + } + + fn num_bytes(&self) -> u64 { + self.num_bytes + } + + fn as_batch_info(&self) -> &BatchInfo { + &self + } +} + impl Display for BatchInfo { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { write!(f, "({}:{}:{})", self.author, self.batch_id, self.digest) @@ -118,12 +152,15 @@ impl TDataInfo for BatchInfo { } #[derive(Clone, Debug, Deserialize, Serialize)] -pub struct SignedBatchInfoMsg { - signed_infos: Vec, +pub struct SignedBatchInfoMsg { + signed_infos: Vec>, } -impl SignedBatchInfoMsg { - pub fn new(signed_infos: Vec) -> Self { +impl SignedBatchInfoMsg +where + T: TBatchInfo, +{ + pub fn new(signed_infos: Vec>) -> Self { Self { signed_infos } } @@ -161,21 +198,24 @@ impl SignedBatchInfoMsg { Ok(epoch) } - pub fn take(self) -> Vec { + pub fn take(self) -> Vec> { self.signed_infos } } #[derive(Clone, Debug, Deserialize, Serialize)] -pub struct SignedBatchInfo { - info: BatchInfo, +pub struct SignedBatchInfo { + info: T, signer: PeerId, signature: SignatureWithStatus, } -impl SignedBatchInfo { +impl SignedBatchInfo +where + T: TBatchInfo, +{ pub fn new( - batch_info: BatchInfo, + batch_info: T, validator_signer: &ValidatorSigner, ) -> Result { let signature = validator_signer.sign(&batch_info)?; @@ -188,7 +228,7 @@ impl SignedBatchInfo { } pub fn new_with_signature( - batch_info: BatchInfo, + batch_info: T, signer: PeerId, signature: bls12381::Signature, ) -> Self { @@ -200,7 +240,7 @@ impl SignedBatchInfo { } #[cfg(any(test, feature = "fuzzing"))] - pub fn dummy(batch_info: BatchInfo, signer: PeerId) -> Self { + pub fn dummy(batch_info: T, signer: PeerId) -> Self { Self::new_with_signature(batch_info, signer, bls12381::Signature::dummy_signature()) } @@ -241,13 +281,13 @@ impl SignedBatchInfo { &self.signature } - pub fn batch_info(&self) -> &BatchInfo { + pub fn batch_info(&self) -> &T { &self.info } } -impl Deref for SignedBatchInfo { - type Target = BatchInfo; +impl Deref for SignedBatchInfo { + type Target = T; fn deref(&self) -> &Self::Target { &self.info @@ -267,12 +307,15 @@ pub enum SignedBatchInfoError { } #[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)] -pub struct ProofOfStoreMsg { - proofs: Vec, +pub struct ProofOfStoreMsg { + proofs: Vec>, } -impl ProofOfStoreMsg { - pub fn new(proofs: Vec) -> Self { +impl ProofOfStoreMsg +where + T: TBatchInfo + Send + Sync + 'static, +{ + pub fn new(proofs: Vec>) -> Self { Self { proofs } } @@ -280,7 +323,7 @@ impl ProofOfStoreMsg { &self, max_num_proofs: usize, validator: &ValidatorVerifier, - cache: &ProofCache, + cache: &ProofCache, ) -> anyhow::Result<()> { ensure!(!self.proofs.is_empty(), "Empty message"); ensure!( @@ -309,28 +352,35 @@ impl ProofOfStoreMsg { Ok(epoch) } - pub fn take(self) -> Vec { + pub fn take(self) -> Vec> { self.proofs } } -pub type ProofCache = Cache; +pub type ProofCache = Cache; #[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)] -pub struct ProofOfStore { - info: BatchInfo, +pub struct ProofOfStore { + info: T, multi_signature: AggregateSignature, } -impl ProofOfStore { - pub fn new(info: BatchInfo, multi_signature: AggregateSignature) -> Self { +impl ProofOfStore +where + T: TBatchInfo + Send + Sync + 'static, +{ + pub fn new(info: T, multi_signature: AggregateSignature) -> Self { Self { info, multi_signature, } } - pub fn verify(&self, validator: &ValidatorVerifier, cache: &ProofCache) -> anyhow::Result<()> { + pub fn verify( + &self, + validator: &ValidatorVerifier, + cache: &ProofCache, + ) -> anyhow::Result<()> { if let Some(signature) = cache.get(&self.info) { if signature == self.multi_signature { return Ok(()); @@ -354,7 +404,7 @@ impl ProofOfStore { ret } - pub fn info(&self) -> &BatchInfo { + pub fn info(&self) -> &T { &self.info } @@ -363,25 +413,28 @@ impl ProofOfStore { } } -impl Deref for ProofOfStore { - type Target = BatchInfo; +impl Deref for ProofOfStore { + type Target = T; fn deref(&self) -> &Self::Target { &self.info } } -impl TDataInfo for ProofOfStore { +impl TDataInfo for ProofOfStore +where + T: TBatchInfo + Send + Sync + 'static, +{ fn num_txns(&self) -> u64 { - self.num_txns + self.info.num_txns() } fn num_bytes(&self) -> u64 { - self.num_bytes + self.info.num_bytes() } fn info(&self) -> &BatchInfo { - self.info() + self.info.as_batch_info() } fn signers(&self, ordered_authors: &[PeerId]) -> Vec { diff --git a/consensus/consensus-types/src/proposal_msg.rs b/consensus/consensus-types/src/proposal_msg.rs index 3784651ac88c9..11c14ce723b3f 100644 --- a/consensus/consensus-types/src/proposal_msg.rs +++ b/consensus/consensus-types/src/proposal_msg.rs @@ -2,7 +2,12 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{block::Block, common::Author, proof_of_store::ProofCache, sync_info::SyncInfo}; +use crate::{ + block::Block, + common::Author, + proof_of_store::{BatchInfo, ProofCache}, + sync_info::SyncInfo, +}; use anyhow::{anyhow, ensure, format_err, Context, Result}; use aptos_short_hex_str::AsShortHexStr; use aptos_types::validator_verifier::ValidatorVerifier; @@ -84,7 +89,7 @@ impl ProposalMsg { &self, sender: Author, validator: &ValidatorVerifier, - proof_cache: &ProofCache, + proof_cache: &ProofCache, quorum_store_enabled: bool, ) -> Result<()> { if let Some(proposal_author) = self.proposal.author() { diff --git a/consensus/src/consensus_observer/network/observer_message.rs b/consensus/src/consensus_observer/network/observer_message.rs index e1f4e5bab4f50..2b8415d34a892 100644 --- a/consensus/src/consensus_observer/network/observer_message.rs +++ b/consensus/src/consensus_observer/network/observer_message.rs @@ -379,11 +379,11 @@ impl CommitDecision { #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct PayloadWithProof { transactions: Vec, - proofs: Vec, + proofs: Vec>, } impl PayloadWithProof { - pub fn new(transactions: Vec, proofs: Vec) -> Self { + pub fn new(transactions: Vec, proofs: Vec>) -> Self { Self { transactions, proofs, @@ -439,7 +439,7 @@ impl TransactionsWithProof { } } - pub fn proofs(&self) -> Vec { + pub fn proofs(&self) -> Vec> { match self { TransactionsWithProof::TransactionsWithProofAndLimits(payload) => { payload.payload_with_proof.proofs.clone() @@ -512,7 +512,7 @@ impl BlockTransactionPayload { /// Creates a returns a new InQuorumStore transaction payload pub fn new_in_quorum_store( transactions: Vec, - proofs: Vec, + proofs: Vec>, ) -> Self { let payload_with_proof = PayloadWithProof::new(transactions, proofs); Self::DeprecatedInQuorumStore(payload_with_proof) @@ -521,7 +521,7 @@ impl BlockTransactionPayload { /// Creates a returns a new InQuorumStoreWithLimit transaction payload pub fn new_in_quorum_store_with_limit( transactions: Vec, - proofs: Vec, + proofs: Vec>, limit: Option, ) -> Self { let payload_with_proof = PayloadWithProof::new(transactions, proofs); @@ -532,7 +532,7 @@ impl BlockTransactionPayload { /// Creates a returns a new QuorumStoreInlineHybrid transaction payload pub fn new_quorum_store_inline_hybrid( transactions: Vec, - proofs: Vec, + proofs: Vec>, transaction_limit: Option, gas_limit: Option, inline_batches: Vec, @@ -557,7 +557,7 @@ impl BlockTransactionPayload { pub fn new_opt_quorum_store( transactions: Vec, - proofs: Vec, + proofs: Vec>, transaction_limit: Option, gas_limit: Option, batch_infos: Vec, @@ -613,7 +613,7 @@ impl BlockTransactionPayload { } /// Returns the proofs of the transaction payload - pub fn payload_proofs(&self) -> Vec { + pub fn payload_proofs(&self) -> Vec> { match self { BlockTransactionPayload::DeprecatedInQuorumStore(payload) => payload.proofs.clone(), BlockTransactionPayload::DeprecatedInQuorumStoreWithLimit(payload) => { @@ -715,7 +715,7 @@ impl BlockTransactionPayload { } /// Verifies the payload batches against the expected batches - fn verify_batches(&self, expected_proofs: &[ProofOfStore]) -> Result<(), Error> { + fn verify_batches(&self, expected_proofs: &[ProofOfStore]) -> Result<(), Error> { // Get the batches in the block transaction payload let payload_proofs = self.payload_proofs(); let payload_batches: Vec<&BatchInfo> = @@ -1854,7 +1854,7 @@ mod test { fn create_block_payload( block_info: Option, signed_transactions: &[SignedTransaction], - proofs: &[ProofOfStore], + proofs: &[ProofOfStore], inline_batches: &[BatchInfo], ) -> BlockPayload { // Create the transaction payload @@ -1878,7 +1878,7 @@ mod test { fn create_block_optqs_payload( block_info: Option, signed_transactions: &[SignedTransaction], - proofs: &[ProofOfStore], + proofs: &[ProofOfStore], opt_and_inline_batches: &[BatchInfo], ) -> BlockPayload { // Create the transaction payload @@ -1910,7 +1910,7 @@ mod test { fn create_mixed_expiration_proofs( block_timestamp: u64, signed_transactions: &[SignedTransaction], - ) -> (Vec, Vec) { + ) -> (Vec>, Vec) { let mut proofs = vec![]; let mut non_expired_transactions = vec![]; diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 31181850b32ca..a7863113ffb37 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -66,7 +66,7 @@ use aptos_consensus_types::{ block_retrieval::BlockRetrievalRequest, common::{Author, Round}, epoch_retrieval::EpochRetrievalRequest, - proof_of_store::ProofCache, + proof_of_store::{BatchInfo, ProofCache}, utils::PayloadTxnsSize, }; use aptos_crypto::bls12381::PrivateKey; @@ -172,7 +172,7 @@ pub struct EpochManager { dag_config: DagConsensusConfig, payload_manager: Arc, rand_storage: Arc>, - proof_cache: ProofCache, + proof_cache: ProofCache, consensus_publisher: Option>, pending_blocks: Arc>, key_storage: PersistentSafetyStorage, diff --git a/consensus/src/network.rs b/consensus/src/network.rs index 33d84b8d6db80..214fc50a14ee2 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -28,7 +28,9 @@ use aptos_consensus_types::{ opt_proposal_msg::OptProposalMsg, order_vote_msg::OrderVoteMsg, pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote}, - proof_of_store::{ProofOfStore, ProofOfStoreMsg, SignedBatchInfo, SignedBatchInfoMsg}, + proof_of_store::{ + BatchInfo, ProofOfStore, ProofOfStoreMsg, SignedBatchInfo, SignedBatchInfoMsg, + }, proposal_msg::ProposalMsg, round_timeout::RoundTimeoutMsg, sync_info::SyncInfo, @@ -202,15 +204,18 @@ pub trait QuorumStoreSender: Send + Clone { async fn send_signed_batch_info_msg( &self, - signed_batch_infos: Vec, + signed_batch_infos: Vec>, recipients: Vec, ); async fn broadcast_batch_msg(&mut self, batches: Vec); - async fn broadcast_proof_of_store_msg(&mut self, proof_of_stores: Vec); + async fn broadcast_proof_of_store_msg(&mut self, proof_of_stores: Vec>); - async fn send_proof_of_store_msg_to_self(&mut self, proof_of_stores: Vec); + async fn send_proof_of_store_msg_to_self( + &mut self, + proof_of_stores: Vec>, + ); } /// Implements the actual networking support for all consensus messaging. @@ -556,7 +561,7 @@ impl QuorumStoreSender for NetworkSender { async fn send_signed_batch_info_msg( &self, - signed_batch_infos: Vec, + signed_batch_infos: Vec>, recipients: Vec, ) { fail_point!("consensus::send::signed_batch_info", |_| ()); @@ -571,13 +576,13 @@ impl QuorumStoreSender for NetworkSender { self.broadcast(msg).await } - async fn broadcast_proof_of_store_msg(&mut self, proofs: Vec) { + async fn broadcast_proof_of_store_msg(&mut self, proofs: Vec>) { fail_point!("consensus::send::proof_of_store", |_| ()); let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(ProofOfStoreMsg::new(proofs))); self.broadcast(msg).await } - async fn send_proof_of_store_msg_to_self(&mut self, proofs: Vec) { + async fn send_proof_of_store_msg_to_self(&mut self, proofs: Vec>) { fail_point!("consensus::send::proof_of_store", |_| ()); let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(ProofOfStoreMsg::new(proofs))); self.send(msg, vec![self.author]).await diff --git a/consensus/src/network_interface.rs b/consensus/src/network_interface.rs index aef081a8273a3..9a159f3cbdd93 100644 --- a/consensus/src/network_interface.rs +++ b/consensus/src/network_interface.rs @@ -17,7 +17,7 @@ use aptos_consensus_types::{ opt_proposal_msg::OptProposalMsg, order_vote_msg::OrderVoteMsg, pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote}, - proof_of_store::{ProofOfStoreMsg, SignedBatchInfoMsg}, + proof_of_store::{BatchInfo, BatchInfoExt, ProofOfStoreMsg, SignedBatchInfoMsg}, proposal_msg::ProposalMsg, round_timeout::RoundTimeoutMsg, sync_info::SyncInfo, @@ -71,9 +71,9 @@ pub enum ConsensusMsg { BatchResponse(Box), /// Quorum Store: Send a signed batch digest. This is a vote for the batch and a promise that /// the batch of transactions was received and will be persisted until batch expiration. - SignedBatchInfo(Box), + SignedBatchInfo(Box>), /// Quorum Store: Broadcast a certified proof of store (a digest that received 2f+1 votes). - ProofOfStoreMsg(Box), + ProofOfStoreMsg(Box>), /// DAG protocol message DAGMessage(DAGNetworkMessage), /// Commit message @@ -91,6 +91,10 @@ pub enum ConsensusMsg { BlockRetrievalRequest(Box), /// OptProposalMsg contains the optimistic proposal and sync info. OptProposalMsg(Box), + + SignedBatchInfoV2(Box>), + + ProofOfStoreMsgV2(Box>), } /// Network type for consensus @@ -121,6 +125,8 @@ impl ConsensusMsg { ConsensusMsg::BatchResponseV2(_) => "BatchResponseV2", ConsensusMsg::RoundTimeoutMsg(_) => "RoundTimeoutV2", ConsensusMsg::BlockRetrievalRequest(_) => "BlockRetrievalRequest", + ConsensusMsg::SignedBatchInfoV2(_) => "SignedBatchInfoV2", + ConsensusMsg::ProofOfStoreMsgV2(_) => "ProofOfStoreMsgV2", } } } diff --git a/consensus/src/quorum_store/batch_proof_queue.rs b/consensus/src/quorum_store/batch_proof_queue.rs index 01a16cef86cf7..7f67b5d97caf2 100644 --- a/consensus/src/quorum_store/batch_proof_queue.rs +++ b/consensus/src/quorum_store/batch_proof_queue.rs @@ -37,7 +37,7 @@ struct QueueItem { /// Contains the proof associated with the batch. /// It is optional as the proof can be updated after the summary. - proof: Option, + proof: Option>, /// The time when the proof is inserted into this item. proof_insertion_time: Option, } @@ -173,7 +173,7 @@ impl BatchProofQueue { } /// Add the ProofOfStore to proof queue. - pub(crate) fn insert_proof(&mut self, proof: ProofOfStore) { + pub(crate) fn insert_proof(&mut self, proof: ProofOfStore) { if proof.expiration() <= self.latest_block_timestamp { counters::inc_rejected_pos_count(counters::POS_EXPIRED_LABEL); return; @@ -342,7 +342,7 @@ impl BatchProofQueue { fn log_remaining_data_after_pull( &self, excluded_batches: &HashSet, - pulled_proofs: &[ProofOfStore], + pulled_proofs: &[ProofOfStore], ) { let mut num_proofs_remaining_after_pull = 0; let mut num_txns_remaining_after_pull = 0; @@ -406,7 +406,7 @@ impl BatchProofQueue { soft_max_txns_after_filtering: u64, return_non_full: bool, block_timestamp: Duration, - ) -> (Vec, PayloadTxnsSize, u64, bool) { + ) -> (Vec>, PayloadTxnsSize, u64, bool) { let (result, all_txns, unique_txns, is_full) = self.pull_internal( false, excluded_batches, diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index d5a1c65ed2a59..02364580a8b65 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -381,7 +381,7 @@ impl BatchStore { fn generate_signed_batch_info( &self, batch_info: BatchInfo, - ) -> Result { + ) -> Result, CryptoMaterialError> { fail_point!("quorum_store::create_invalid_signed_batch_info", |_| { Ok(SignedBatchInfo::new_with_signature( batch_info.clone(), @@ -392,7 +392,7 @@ impl BatchStore { SignedBatchInfo::new(batch_info, &self.validator_signer) } - fn persist_inner(&self, persist_request: PersistedValue) -> Option { + fn persist_inner(&self, persist_request: PersistedValue) -> Option> { match self.save(&persist_request) { Ok(needs_db) => { let batch_info = persist_request.batch_info().clone(); @@ -483,7 +483,7 @@ impl BatchStore { } impl BatchWriter for BatchStore { - fn persist(&self, persist_requests: Vec) -> Vec { + fn persist(&self, persist_requests: Vec) -> Vec> { let mut signed_infos = vec![]; for persist_request in persist_requests.into_iter() { if let Some(signed_info) = self.persist_inner(persist_request.clone()) { @@ -610,5 +610,5 @@ impl BatchReader for Batch } pub trait BatchWriter: Send + Sync { - fn persist(&self, persist_requests: Vec) -> Vec; + fn persist(&self, persist_requests: Vec) -> Vec>; } diff --git a/consensus/src/quorum_store/proof_coordinator.rs b/consensus/src/quorum_store/proof_coordinator.rs index ff5bba176a050..506250464c706 100644 --- a/consensus/src/quorum_store/proof_coordinator.rs +++ b/consensus/src/quorum_store/proof_coordinator.rs @@ -37,7 +37,7 @@ use tokio::{ #[derive(Debug)] pub(crate) enum ProofCoordinatorCommand { - AppendSignature(PeerId, SignedBatchInfoMsg), + AppendSignature(PeerId, SignedBatchInfoMsg), CommitNotification(Vec), Shutdown(TokioOneshot::Sender<()>), } @@ -76,7 +76,7 @@ impl IncrementalProofState { fn add_signature( &mut self, - signed_batch_info: &SignedBatchInfo, + signed_batch_info: &SignedBatchInfo, validator_verifier: &ValidatorVerifier, ) -> Result<(), SignedBatchInfoError> { if signed_batch_info.batch_info() != self.signature_aggregator.data() { @@ -138,7 +138,7 @@ impl IncrementalProofState { pub fn aggregate_and_verify( &mut self, validator_verifier: &ValidatorVerifier, - ) -> Result { + ) -> Result, SignedBatchInfoError> { if self.completed { panic!("Cannot call take twice, unexpected issue occurred"); } @@ -168,7 +168,7 @@ pub(crate) struct ProofCoordinator { timeouts: Timeouts, batch_reader: Arc, batch_generator_cmd_tx: tokio::sync::mpsc::Sender, - proof_cache: ProofCache, + proof_cache: ProofCache, broadcast_proofs: bool, batch_expiry_gap_when_init_usecs: u64, } @@ -180,7 +180,7 @@ impl ProofCoordinator { peer_id: PeerId, batch_reader: Arc, batch_generator_cmd_tx: tokio::sync::mpsc::Sender, - proof_cache: ProofCache, + proof_cache: ProofCache, broadcast_proofs: bool, batch_expiry_gap_when_init_usecs: u64, ) -> Self { @@ -200,7 +200,7 @@ impl ProofCoordinator { fn init_proof( &mut self, - signed_batch_info: &SignedBatchInfo, + signed_batch_info: &SignedBatchInfo, ) -> Result<(), SignedBatchInfoError> { // Check if the signed digest corresponding to our batch if signed_batch_info.author() != self.peer_id { @@ -235,9 +235,9 @@ impl ProofCoordinator { fn add_signature( &mut self, - signed_batch_info: SignedBatchInfo, + signed_batch_info: SignedBatchInfo, validator_verifier: &ValidatorVerifier, - ) -> Result, SignedBatchInfoError> { + ) -> Result>, SignedBatchInfoError> { if !self .batch_info_to_proof .contains_key(signed_batch_info.batch_info()) diff --git a/consensus/src/quorum_store/proof_manager.rs b/consensus/src/quorum_store/proof_manager.rs index de6913ed8aeec..3201574adaba9 100644 --- a/consensus/src/quorum_store/proof_manager.rs +++ b/consensus/src/quorum_store/proof_manager.rs @@ -21,7 +21,7 @@ use std::{cmp::min, collections::HashSet, sync::Arc, time::Duration}; #[derive(Debug)] pub enum ProofManagerCommand { - ReceiveProofs(ProofOfStoreMsg), + ReceiveProofs(ProofOfStoreMsg), ReceiveBatches(Vec<(BatchInfo, Vec)>), CommitNotification(u64, Vec), Shutdown(tokio::sync::oneshot::Sender<()>), @@ -62,7 +62,7 @@ impl ProofManager { } } - pub(crate) fn receive_proofs(&mut self, proofs: Vec) { + pub(crate) fn receive_proofs(&mut self, proofs: Vec>) { for proof in proofs.into_iter() { self.batch_proof_queue.insert_proof(proof); } diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index 77e78de8b002d..949b77fa40732 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -30,7 +30,9 @@ use crate::{ use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::config::{BatchTransactionFilterConfig, QuorumStoreConfig}; use aptos_consensus_types::{ - common::Author, proof_of_store::ProofCache, request_response::GetPayloadCommand, + common::Author, + proof_of_store::{BatchInfo, ProofCache}, + request_response::GetPayloadCommand, }; use aptos_crypto::bls12381::PrivateKey; use aptos_logger::prelude::*; @@ -132,7 +134,7 @@ pub struct InnerBuilder { aptos_db: Arc, network_sender: NetworkSender, verifier: Arc, - proof_cache: ProofCache, + proof_cache: ProofCache, coordinator_tx: Sender, coordinator_rx: Option>, batch_generator_cmd_tx: tokio::sync::mpsc::Sender, @@ -168,7 +170,7 @@ impl InnerBuilder { aptos_db: Arc, network_sender: NetworkSender, verifier: Arc, - proof_cache: ProofCache, + proof_cache: ProofCache, quorum_store_storage: Arc, broadcast_proofs: bool, consensus_key: Arc, diff --git a/consensus/src/quorum_store/tests/batch_generator_test.rs b/consensus/src/quorum_store/tests/batch_generator_test.rs index a29d48226b246..a5169d0867926 100644 --- a/consensus/src/quorum_store/tests/batch_generator_test.rs +++ b/consensus/src/quorum_store/tests/batch_generator_test.rs @@ -14,7 +14,7 @@ use crate::{ use aptos_config::config::QuorumStoreConfig; use aptos_consensus_types::{ common::{TransactionInProgress, TransactionSummary}, - proof_of_store::SignedBatchInfo, + proof_of_store::{BatchInfo, SignedBatchInfo}, }; use aptos_mempool::{QuorumStoreRequest, QuorumStoreResponse}; use aptos_types::{quorum_store::BatchId, transaction::SignedTransaction}; @@ -35,7 +35,7 @@ impl MockBatchWriter { } impl BatchWriter for MockBatchWriter { - fn persist(&self, _persist_requests: Vec) -> Vec { + fn persist(&self, _persist_requests: Vec) -> Vec> { vec![] } } diff --git a/consensus/src/quorum_store/tests/batch_proof_queue_test.rs b/consensus/src/quorum_store/tests/batch_proof_queue_test.rs index 731aa730bedad..a75e031f1ceb9 100644 --- a/consensus/src/quorum_store/tests/batch_proof_queue_test.rs +++ b/consensus/src/quorum_store/tests/batch_proof_queue_test.rs @@ -23,7 +23,7 @@ fn proof_of_store( batch_id: BatchId, gas_bucket_start: u64, expiration: u64, -) -> ProofOfStore { +) -> ProofOfStore { ProofOfStore::new( BatchInfo::new( author, @@ -45,7 +45,7 @@ fn proof_of_store_with_size( gas_bucket_start: u64, expiration: u64, num_txns: u64, -) -> ProofOfStore { +) -> ProofOfStore { ProofOfStore::new( BatchInfo::new( author, @@ -100,7 +100,7 @@ async fn test_proof_queue_sorting() { ); let mut count_author_0 = 0; let mut count_author_1 = 0; - let mut prev: Option<&ProofOfStore> = None; + let mut prev: Option<&ProofOfStore> = None; for batch in &pulled { if let Some(prev) = prev { assert!(prev.gas_bucket_start() >= batch.gas_bucket_start()); @@ -129,7 +129,7 @@ async fn test_proof_queue_sorting() { ); let mut count_author_0 = 0; let mut count_author_1 = 0; - let mut prev: Option<&ProofOfStore> = None; + let mut prev: Option<&ProofOfStore> = None; for batch in &pulled { if let Some(prev) = prev { assert!(prev.gas_bucket_start() >= batch.gas_bucket_start()); diff --git a/consensus/src/quorum_store/tests/batch_requester_test.rs b/consensus/src/quorum_store/tests/batch_requester_test.rs index 1f171ce722bca..01476e90eec31 100644 --- a/consensus/src/quorum_store/tests/batch_requester_test.rs +++ b/consensus/src/quorum_store/tests/batch_requester_test.rs @@ -11,7 +11,7 @@ use crate::{ }; use aptos_consensus_types::{ common::Author, - proof_of_store::{ProofOfStore, SignedBatchInfo}, + proof_of_store::{BatchInfo, ProofOfStore, SignedBatchInfo}, }; use aptos_crypto::HashValue; use aptos_infallible::Mutex; @@ -56,7 +56,7 @@ impl QuorumStoreSender for MockBatchRequester { async fn send_signed_batch_info_msg( &self, - _signed_batch_infos: Vec, + _signed_batch_infos: Vec>, _recipients: Vec, ) { unimplemented!() @@ -66,11 +66,17 @@ impl QuorumStoreSender for MockBatchRequester { unimplemented!() } - async fn broadcast_proof_of_store_msg(&mut self, _proof_of_stores: Vec) { + async fn broadcast_proof_of_store_msg( + &mut self, + _proof_of_stores: Vec>, + ) { unimplemented!() } - async fn send_proof_of_store_msg_to_self(&mut self, _proof_of_stores: Vec) { + async fn send_proof_of_store_msg_to_self( + &mut self, + _proof_of_stores: Vec>, + ) { unimplemented!() } } diff --git a/consensus/src/quorum_store/tests/proof_manager_test.rs b/consensus/src/quorum_store/tests/proof_manager_test.rs index 4f3ba16eaec9b..9f188e7159821 100644 --- a/consensus/src/quorum_store/tests/proof_manager_test.rs +++ b/consensus/src/quorum_store/tests/proof_manager_test.rs @@ -20,7 +20,7 @@ fn create_proof_manager() -> ProofManager { ProofManager::new(PeerId::random(), 10, 10, batch_store, true, true, 1) } -fn create_proof(author: PeerId, expiration: u64, batch_sequence: u64) -> ProofOfStore { +fn create_proof(author: PeerId, expiration: u64, batch_sequence: u64) -> ProofOfStore { create_proof_with_gas(author, expiration, batch_sequence, 0) } @@ -29,7 +29,7 @@ fn create_proof_with_gas( expiration: u64, batch_sequence: u64, gas_bucket_start: u64, -) -> ProofOfStore { +) -> ProofOfStore { let digest = HashValue::random(); let batch_id = BatchId::new_for_test(batch_sequence); ProofOfStore::new( @@ -72,7 +72,7 @@ async fn get_proposal( fn assert_payload_response( payload: Payload, - expected: &[ProofOfStore], + expected: &[ProofOfStore], max_txns_from_block_to_execute: Option, expected_block_gas_limit: Option, ) { @@ -116,7 +116,7 @@ async fn get_proposal_and_assert( proof_manager: &mut ProofManager, max_txns: u64, filter: &[BatchInfo], - expected: &[ProofOfStore], + expected: &[ProofOfStore], ) { assert_payload_response( get_proposal(proof_manager, max_txns, filter).await, diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index b5ff3dbe11229..53146a5a451b5 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -45,7 +45,7 @@ use aptos_consensus_types::{ order_vote::OrderVote, order_vote_msg::OrderVoteMsg, pipelined_block::PipelinedBlock, - proof_of_store::{ProofCache, ProofOfStoreMsg, SignedBatchInfoMsg}, + proof_of_store::{BatchInfo, ProofCache, ProofOfStoreMsg, SignedBatchInfoMsg}, proposal_msg::ProposalMsg, quorum_cert::QuorumCert, round_timeout::{RoundTimeout, RoundTimeoutMsg, RoundTimeoutReason}, @@ -95,8 +95,8 @@ pub enum UnverifiedEvent { OrderVoteMsg(Box), SyncInfo(Box), BatchMsg(Box), - SignedBatchInfo(Box), - ProofOfStoreMsg(Box), + SignedBatchInfo(Box>), + ProofOfStoreMsg(Box>), OptProposalMsg(Box), } @@ -107,7 +107,7 @@ impl UnverifiedEvent { self, peer_id: PeerId, validator: &ValidatorVerifier, - proof_cache: &ProofCache, + proof_cache: &ProofCache, quorum_store_enabled: bool, self_message: bool, max_num_batches: usize, @@ -240,8 +240,8 @@ pub enum VerifiedEvent { OrderVoteMsg(Box), UnverifiedSyncInfo(Box), BatchMsg(Box), - SignedBatchInfo(Box), - ProofOfStoreMsg(Box), + SignedBatchInfo(Box>), + ProofOfStoreMsg(Box>), // local messages LocalTimeout(Round), // Shutdown the NetworkListener diff --git a/consensus/src/test_utils/mock_quorum_store_sender.rs b/consensus/src/test_utils/mock_quorum_store_sender.rs index bd962d348b51f..f7a7ddd5dff7d 100644 --- a/consensus/src/test_utils/mock_quorum_store_sender.rs +++ b/consensus/src/test_utils/mock_quorum_store_sender.rs @@ -8,7 +8,9 @@ use crate::{ }; use aptos_consensus_types::{ common::Author, - proof_of_store::{ProofOfStore, ProofOfStoreMsg, SignedBatchInfo, SignedBatchInfoMsg}, + proof_of_store::{ + BatchInfo, ProofOfStore, ProofOfStoreMsg, SignedBatchInfo, SignedBatchInfoMsg, + }, }; use std::time::Duration; use tokio::sync::mpsc::Sender; @@ -37,7 +39,7 @@ impl QuorumStoreSender for MockQuorumStoreSender { async fn send_signed_batch_info_msg( &self, - signed_batch_infos: Vec, + signed_batch_infos: Vec>, recipients: Vec, ) { self.tx @@ -55,7 +57,10 @@ impl QuorumStoreSender for MockQuorumStoreSender { unimplemented!() } - async fn broadcast_proof_of_store_msg(&mut self, proof_of_stores: Vec) { + async fn broadcast_proof_of_store_msg( + &mut self, + proof_of_stores: Vec>, + ) { self.tx .send(( ConsensusMsg::ProofOfStoreMsg(Box::new(ProofOfStoreMsg::new(proof_of_stores))), @@ -65,7 +70,10 @@ impl QuorumStoreSender for MockQuorumStoreSender { .expect("We should be able to send the proof of store message"); } - async fn send_proof_of_store_msg_to_self(&mut self, _proof_of_stores: Vec) { + async fn send_proof_of_store_msg_to_self( + &mut self, + _proof_of_stores: Vec>, + ) { unimplemented!() } }