@@ -171,6 +171,7 @@ pub(crate) struct ProofCoordinator {
171171 proof_cache : ProofCache ,
172172 broadcast_proofs : bool ,
173173 batch_expiry_gap_when_init_usecs : u64 ,
174+ enable_proof_v2_msg : bool ,
174175}
175176
176177//PoQS builder object - gather signed digest to form PoQS
@@ -183,6 +184,7 @@ impl ProofCoordinator {
183184 proof_cache : ProofCache ,
184185 broadcast_proofs : bool ,
185186 batch_expiry_gap_when_init_usecs : u64 ,
187+ enable_proof_v2_msg : bool ,
186188 ) -> Self {
187189 Self {
188190 peer_id,
@@ -195,6 +197,7 @@ impl ProofCoordinator {
195197 proof_cache,
196198 broadcast_proofs,
197199 batch_expiry_gap_when_init_usecs,
200+ enable_proof_v2_msg,
198201 }
199202 }
200203
@@ -374,45 +377,56 @@ impl ProofCoordinator {
374377 let approx_created_ts_usecs = signed_batch_info
375378 . expiration( )
376379 . saturating_sub( self . batch_expiry_gap_when_init_usecs) ;
380+ let self_peer_id = self . peer_id;
381+ let enable_broadcast_proofs = self . broadcast_proofs;
382+ let enable_proof_v2_msg = self . enable_proof_v2_msg;
377383
378- let mut proofs = vec![ ] ;
379- for signed_batch_info in signed_batch_infos. into_iter( ) {
384+ let mut proofs_iter = signed_batch_infos. into_iter( ) . filter_map( |signed_batch_info| {
380385 let peer_id = signed_batch_info. signer( ) ;
381386 let digest = * signed_batch_info. digest( ) ;
382387 let batch_id = signed_batch_info. batch_id( ) ;
383388 match self . add_signature( signed_batch_info, & validator_verifier) {
384- Ok ( result) => {
385- if let Some ( proof) = result {
386- debug!(
387- LogSchema :: new( LogEvent :: ProofOfStoreReady ) ,
388- digest = digest,
389- batch_id = batch_id. id,
390- ) ;
391- let ( info, sig) = proof. unpack( ) ;
392- proofs. push( ProofOfStore :: new( info. info( ) . clone( ) , sig) ) ;
393- }
389+ Ok ( Some ( proof) ) => {
390+ debug!(
391+ LogSchema :: new( LogEvent :: ProofOfStoreReady ) ,
392+ digest = digest,
393+ batch_id = batch_id. id,
394+ ) ;
395+ Some ( proof)
394396 } ,
397+ Ok ( None ) => None ,
395398 Err ( e) => {
396399 // Can happen if we already garbage collected, the commit notification is late, or the peer is misbehaving.
397400 if peer_id == self . peer_id {
398401 info!( "QS: could not add signature from self, digest = {}, batch_id = {}, err = {:?}" , digest, batch_id, e) ;
399402 } else {
400403 debug!( "QS: could not add signature from peer {}, digest = {}, batch_id = {}, err = {:?}" , peer_id, digest, batch_id, e) ;
401404 }
405+ None
402406 } ,
403407 }
404- }
405- if let Some ( value) = self . batch_info_to_proof. get_mut( & info) {
406- value. observe_voting_pct( approx_created_ts_usecs, & validator_verifier) ;
407- }
408- if !proofs. is_empty( ) {
409- observe_batch( approx_created_ts_usecs, self . peer_id, BatchStage :: POS_FORMED ) ;
410- if self . broadcast_proofs {
411- network_sender. broadcast_proof_of_store_msg( proofs) . await ;
408+ } ) . peekable( ) ;
409+ if proofs_iter. peek( ) . is_some( ) {
410+ observe_batch( approx_created_ts_usecs, self_peer_id, BatchStage :: POS_FORMED ) ;
411+ if enable_broadcast_proofs {
412+ if enable_proof_v2_msg {
413+ let proofs: Vec <_> = proofs_iter. collect( ) ;
414+ network_sender. broadcast_proof_of_store_msg_v2( proofs) . await ;
415+ } else {
416+ let proofs: Vec <_> = proofs_iter. map( |proof| {
417+ let ( info, sig) = proof. unpack( ) ;
418+ ProofOfStore :: new( info. info( ) . clone( ) , sig)
419+ } ) . collect( ) ;
420+ network_sender. broadcast_proof_of_store_msg( proofs) . await ;
421+ }
412422 } else {
423+ let proofs: Vec <_> = proofs_iter. collect( ) ;
413424 network_sender. send_proof_of_store_msg_to_self( proofs) . await ;
414425 }
415426 }
427+ if let Some ( value) = self . batch_info_to_proof. get_mut( & info) {
428+ value. observe_voting_pct( approx_created_ts_usecs, & validator_verifier) ;
429+ }
416430 } ,
417431 }
418432 } ) ,
0 commit comments