Skip to content

Commit 75caee4

Browse files
committed
[qs] support signing BatchInfoExt behind flag
1 parent f966c62 commit 75caee4

File tree

9 files changed

+135
-15
lines changed

9 files changed

+135
-15
lines changed

consensus/src/network.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,12 @@ pub trait QuorumStoreSender: Send + Clone {
208208
recipients: Vec<Author>,
209209
);
210210

211+
async fn send_signed_batch_info_msg_v2(
212+
&self,
213+
signed_batch_infos: Vec<SignedBatchInfo<BatchInfoExt>>,
214+
recipients: Vec<Author>,
215+
);
216+
211217
async fn broadcast_batch_msg(&mut self, batches: Vec<Batch>);
212218

213219
async fn broadcast_proof_of_store_msg(&mut self, proof_of_stores: Vec<ProofOfStore<BatchInfo>>);
@@ -575,6 +581,18 @@ impl QuorumStoreSender for NetworkSender {
575581
self.send(msg, recipients).await
576582
}
577583

584+
async fn send_signed_batch_info_msg_v2(
585+
&self,
586+
signed_batch_infos: Vec<SignedBatchInfo<BatchInfoExt>>,
587+
recipients: Vec<Author>,
588+
) {
589+
fail_point!("consensus::send::signed_batch_info", |_| ());
590+
let msg = ConsensusMsg::SignedBatchInfoMsgV2(Box::new(SignedBatchInfoMsg::new(
591+
signed_batch_infos,
592+
)));
593+
self.send(msg, recipients).await
594+
}
595+
578596
async fn broadcast_batch_msg(&mut self, batches: Vec<Batch>) {
579597
fail_point!("consensus::send::broadcast_batch", |_| ());
580598
let msg = ConsensusMsg::BatchMsg(Box::new(BatchMsg::new(batches)));

consensus/src/quorum_store/batch_coordinator.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub struct BatchCoordinator {
4444
max_total_bytes: u64,
4545
batch_expiry_gap_when_init_usecs: u64,
4646
transaction_filter_config: BatchTransactionFilterConfig,
47+
enable_proof_v2: bool,
4748
}
4849

4950
impl BatchCoordinator {
@@ -59,6 +60,7 @@ impl BatchCoordinator {
5960
max_total_bytes: u64,
6061
batch_expiry_gap_when_init_usecs: u64,
6162
transaction_filter_config: BatchTransactionFilterConfig,
63+
enable_proof_v2: bool,
6264
) -> Self {
6365
Self {
6466
my_peer_id,
@@ -72,6 +74,7 @@ impl BatchCoordinator {
7274
max_total_bytes,
7375
batch_expiry_gap_when_init_usecs,
7476
transaction_filter_config,
77+
enable_proof_v2,
7578
}
7679
}
7780

@@ -87,6 +90,7 @@ impl BatchCoordinator {
8790
let batch_store = self.batch_store.clone();
8891
let network_sender = self.network_sender.clone();
8992
let sender_to_proof_manager = self.sender_to_proof_manager.clone();
93+
let enable_proof_v2 = self.enable_proof_v2;
9094
tokio::spawn(async move {
9195
let peer_id = persist_requests[0].author();
9296
let batches = persist_requests
@@ -98,14 +102,27 @@ impl BatchCoordinator {
98102
)
99103
})
100104
.collect();
101-
let signed_batch_infos = batch_store.persist(persist_requests);
102-
if !signed_batch_infos.is_empty() {
103-
if approx_created_ts_usecs > 0 {
104-
observe_batch(approx_created_ts_usecs, peer_id, BatchStage::SIGNED);
105+
106+
if enable_proof_v2 {
107+
let signed_batch_infos = batch_store.persist_v2(persist_requests);
108+
if !signed_batch_infos.is_empty() {
109+
if approx_created_ts_usecs > 0 {
110+
observe_batch(approx_created_ts_usecs, peer_id, BatchStage::SIGNED);
111+
}
112+
network_sender
113+
.send_signed_batch_info_msg_v2(signed_batch_infos, vec![peer_id])
114+
.await;
115+
}
116+
} else {
117+
let signed_batch_infos = batch_store.persist(persist_requests);
118+
if !signed_batch_infos.is_empty() {
119+
if approx_created_ts_usecs > 0 {
120+
observe_batch(approx_created_ts_usecs, peer_id, BatchStage::SIGNED);
121+
}
122+
network_sender
123+
.send_signed_batch_info_msg(signed_batch_infos, vec![peer_id])
124+
.await;
105125
}
106-
network_sender
107-
.send_signed_batch_info_msg(signed_batch_infos, vec![peer_id])
108-
.await;
109126
}
110127
let _ = sender_to_proof_manager
111128
.send(ProofManagerCommand::ReceiveBatches(batches))

consensus/src/quorum_store/batch_store.rs

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use crate::{
1212
},
1313
};
1414
use anyhow::bail;
15-
use aptos_consensus_types::proof_of_store::{BatchInfo, BatchInfoExt, SignedBatchInfo};
15+
use aptos_consensus_types::proof_of_store::{
16+
BatchInfo, BatchInfoExt, BatchKind, ExtraBatchInfo, SignedBatchInfo, TBatchInfo,
17+
};
1618
use aptos_crypto::{CryptoMaterialError, HashValue};
1719
use aptos_executor_types::{ExecutorError, ExecutorResult};
1820
use aptos_infallible::Mutex;
@@ -26,6 +28,7 @@ use fail::fail_point;
2628
use futures::{future::Shared, FutureExt};
2729
use once_cell::sync::OnceCell;
2830
use std::{
31+
any::TypeId,
2932
collections::{BTreeSet, HashMap},
3033
future::Future,
3134
pin::Pin,
@@ -378,10 +381,10 @@ impl BatchStore {
378381
ret
379382
}
380383

381-
fn generate_signed_batch_info(
384+
fn generate_signed_batch_info<T: TBatchInfo>(
382385
&self,
383-
batch_info: BatchInfo,
384-
) -> Result<SignedBatchInfo<BatchInfo>, CryptoMaterialError> {
386+
batch_info: T,
387+
) -> Result<SignedBatchInfo<T>, CryptoMaterialError> {
385388
fail_point!("quorum_store::create_invalid_signed_batch_info", |_| {
386389
Ok(SignedBatchInfo::new_with_signature(
387390
batch_info.clone(),
@@ -392,10 +395,17 @@ impl BatchStore {
392395
SignedBatchInfo::new(batch_info, &self.validator_signer)
393396
}
394397

395-
fn persist_inner(&self, persist_request: PersistedValue) -> Option<SignedBatchInfo<BatchInfo>> {
398+
fn persist_inner<T: TBatchInfo>(
399+
&self,
400+
batch_info: T,
401+
persist_request: PersistedValue,
402+
) -> Option<SignedBatchInfo<T>> {
403+
assert!(
404+
batch_info.as_batch_info() == persist_request.batch_info(),
405+
"Provided batch info doesn't match persist request batch info"
406+
);
396407
match self.save(&persist_request) {
397408
Ok(needs_db) => {
398-
let batch_info = persist_request.batch_info().clone();
399409
trace!("QS: sign digest {}", persist_request.digest());
400410
if needs_db {
401411
#[allow(clippy::unwrap_in_result)]
@@ -405,7 +415,6 @@ impl BatchStore {
405415
}
406416
self.generate_signed_batch_info(batch_info).ok()
407417
},
408-
409418
Err(e) => {
410419
debug!("QS: failed to store to cache {:?}", e);
411420
None
@@ -486,7 +495,37 @@ impl BatchWriter for BatchStore {
486495
fn persist(&self, persist_requests: Vec<PersistedValue>) -> Vec<SignedBatchInfo<BatchInfo>> {
487496
let mut signed_infos = vec![];
488497
for persist_request in persist_requests.into_iter() {
489-
if let Some(signed_info) = self.persist_inner(persist_request.clone()) {
498+
let batch_info = persist_request.batch_info().clone();
499+
if let Some(signed_info) = self.persist_inner(batch_info, persist_request.clone()) {
500+
self.notify_subscribers(persist_request);
501+
signed_infos.push(signed_info);
502+
}
503+
}
504+
signed_infos
505+
}
506+
507+
fn persist_v2(
508+
&self,
509+
persist_requests: Vec<PersistedValue>,
510+
) -> Vec<SignedBatchInfo<BatchInfoExt>> {
511+
let mut signed_infos = vec![];
512+
for persist_request in persist_requests.into_iter() {
513+
let is_encrypted_batch = persist_request
514+
.payload()
515+
.as_ref()
516+
.expect("Payload must be available for persistence")
517+
.iter()
518+
.any(|txn| txn.is_encrypted());
519+
let batch_kind = if is_encrypted_batch {
520+
BatchKind::Encrypted
521+
} else {
522+
BatchKind::Normal
523+
};
524+
let batch_info = BatchInfoExt::V2 {
525+
info: persist_request.batch_info().clone(),
526+
extra: ExtraBatchInfo { batch_kind },
527+
};
528+
if let Some(signed_info) = self.persist_inner(batch_info, persist_request.clone()) {
490529
self.notify_subscribers(persist_request);
491530
signed_infos.push(signed_info);
492531
}
@@ -611,4 +650,9 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchReader for Batch
611650

612651
pub trait BatchWriter: Send + Sync {
613652
fn persist(&self, persist_requests: Vec<PersistedValue>) -> Vec<SignedBatchInfo<BatchInfo>>;
653+
654+
fn persist_v2(
655+
&self,
656+
persist_requests: Vec<PersistedValue>,
657+
) -> Vec<SignedBatchInfo<BatchInfoExt>>;
614658
}

consensus/src/quorum_store/quorum_store_builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ impl InnerBuilder {
333333
self.config.receiver_max_total_bytes as u64,
334334
self.config.batch_expiry_gap_when_init_usecs,
335335
self.transaction_filter_config.clone(),
336+
self.config.enable_proof_v2,
336337
);
337338
#[allow(unused_variables)]
338339
let name = format!("batch_coordinator-{}", i);

consensus/src/quorum_store/tests/batch_coordinator_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ fn create_batch_coordinator(
142142
10_000,
143143
10_000,
144144
transaction_filter_config,
145+
false,
145146
)
146147
}
147148

consensus/src/quorum_store/tests/batch_generator_test.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ impl BatchWriter for MockBatchWriter {
3838
fn persist(&self, _persist_requests: Vec<PersistedValue>) -> Vec<SignedBatchInfo<BatchInfo>> {
3939
vec![]
4040
}
41+
42+
fn persist_v2(
43+
&self,
44+
persist_requests: Vec<PersistedValue>,
45+
) -> Vec<SignedBatchInfo<aptos_consensus_types::proof_of_store::BatchInfoExt>> {
46+
vec![]
47+
}
4148
}
4249

4350
#[allow(clippy::needless_collect)]

consensus/src/quorum_store/tests/batch_requester_test.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ impl QuorumStoreSender for MockBatchRequester {
6262
unimplemented!()
6363
}
6464

65+
async fn send_signed_batch_info_msg_v2(
66+
&self,
67+
_signed_batch_infos: Vec<SignedBatchInfo<BatchInfoExt>>,
68+
_recipients: Vec<Author>,
69+
) {
70+
unimplemented!()
71+
}
72+
6573
async fn broadcast_batch_msg(&mut self, _batches: Vec<Batch>) {
6674
unimplemented!()
6775
}

consensus/src/test_utils/mock_quorum_store_sender.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,22 @@ impl QuorumStoreSender for MockQuorumStoreSender {
5353
.expect("could not send");
5454
}
5555

56+
async fn send_signed_batch_info_msg_v2(
57+
&self,
58+
signed_batch_infos: Vec<SignedBatchInfo<BatchInfoExt>>,
59+
recipients: Vec<Author>,
60+
) {
61+
self.tx
62+
.send((
63+
ConsensusMsg::SignedBatchInfoMsgV2(Box::new(SignedBatchInfoMsg::new(
64+
signed_batch_infos,
65+
))),
66+
recipients,
67+
))
68+
.await
69+
.expect("could not send");
70+
}
71+
5672
async fn broadcast_batch_msg(&mut self, _batches: Vec<Batch>) {
5773
unimplemented!()
5874
}

types/src/transaction/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,10 @@ impl TransactionPayload {
940940
extra_config,
941941
})
942942
}
943+
944+
pub fn is_encrypted(&self) -> bool {
945+
matches!(self, Self::EncryptedPayload(_))
946+
}
943947
}
944948

945949
impl TransactionExtraConfig {
@@ -1296,6 +1300,10 @@ impl SignedTransaction {
12961300
pub fn replay_protector(&self) -> ReplayProtector {
12971301
self.raw_txn.replay_protector()
12981302
}
1303+
1304+
pub fn is_encrypted(&self) -> bool {
1305+
self.payload().is_encrypted()
1306+
}
12991307
}
13001308

13011309
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]

0 commit comments

Comments
 (0)