@@ -8,17 +8,16 @@ use std::{
8
8
} ;
9
9
10
10
use monero_serai:: { block:: Block , transaction:: Transaction } ;
11
- use rayon:: prelude:: * ;
12
11
use tokio:: sync:: { mpsc, oneshot} ;
13
12
use tower:: { Service , ServiceExt } ;
14
13
15
14
use cuprate_blockchain:: service:: BlockchainReadHandle ;
16
15
use cuprate_consensus:: transactions:: new_tx_verification_data;
17
- use cuprate_helper:: cast:: usize_to_u64;
18
- use cuprate_types:: {
19
- blockchain:: { BlockchainReadRequest , BlockchainResponse } ,
20
- Chain ,
16
+ use cuprate_txpool:: service:: {
17
+ interface:: { TxpoolReadRequest , TxpoolReadResponse } ,
18
+ TxpoolReadHandle ,
21
19
} ;
20
+ use cuprate_types:: blockchain:: { BlockchainReadRequest , BlockchainResponse } ;
22
21
23
22
use crate :: {
24
23
blockchain:: manager:: { BlockchainManagerCommand , IncomingBlockOk } ,
@@ -38,7 +37,7 @@ pub enum IncomingBlockError {
38
37
///
39
38
/// The inner values are the block hash and the indexes of the missing txs in the block.
40
39
#[ error( "Unknown transactions in block." ) ]
41
- UnknownTransactions ( [ u8 ; 32 ] , Vec < u64 > ) ,
40
+ UnknownTransactions ( [ u8 ; 32 ] , Vec < usize > ) ,
42
41
/// We are missing the block's parent.
43
42
#[ error( "The block has an unknown parent." ) ]
44
43
Orphan ,
@@ -59,8 +58,9 @@ pub enum IncomingBlockError {
59
58
/// - the block's parent is unknown
60
59
pub async fn handle_incoming_block (
61
60
block : Block ,
62
- given_txs : Vec < Transaction > ,
61
+ mut given_txs : HashMap < [ u8 ; 32 ] , Transaction > ,
63
62
blockchain_read_handle : & mut BlockchainReadHandle ,
63
+ txpool_read_handle : & mut TxpoolReadHandle ,
64
64
) -> Result < IncomingBlockOk , IncomingBlockError > {
65
65
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
66
66
///
@@ -72,7 +72,12 @@ pub async fn handle_incoming_block(
72
72
/// which are also more expensive than `Mutex`s.
73
73
static BLOCKS_BEING_HANDLED : LazyLock < Mutex < HashSet < [ u8 ; 32 ] > > > =
74
74
LazyLock :: new ( || Mutex :: new ( HashSet :: new ( ) ) ) ;
75
- // FIXME: we should look in the tx-pool for txs when that is ready.
75
+
76
+ if given_txs. len ( ) > block. transactions . len ( ) {
77
+ return Err ( IncomingBlockError :: InvalidBlock ( anyhow:: anyhow!(
78
+ "Too many transactions given for block"
79
+ ) ) ) ;
80
+ }
76
81
77
82
if !block_exists ( block. header . previous , blockchain_read_handle)
78
83
. await
@@ -90,23 +95,36 @@ pub async fn handle_incoming_block(
90
95
return Ok ( IncomingBlockOk :: AlreadyHave ) ;
91
96
}
92
97
93
- // TODO: remove this when we have a working tx-pool.
94
- if given_txs. len ( ) != block. transactions . len ( ) {
95
- return Err ( IncomingBlockError :: UnknownTransactions (
96
- block_hash,
97
- ( 0 ..usize_to_u64 ( block. transactions . len ( ) ) ) . collect ( ) ,
98
- ) ) ;
99
- }
98
+ let TxpoolReadResponse :: TxsForBlock { mut txs, missing } = txpool_read_handle
99
+ . ready ( )
100
+ . await
101
+ . expect ( PANIC_CRITICAL_SERVICE_ERROR )
102
+ . call ( TxpoolReadRequest :: TxsForBlock ( block. transactions . clone ( ) ) )
103
+ . await
104
+ . expect ( PANIC_CRITICAL_SERVICE_ERROR )
105
+ else {
106
+ unreachable ! ( )
107
+ } ;
100
108
101
- // TODO: check we actually got given the right txs.
102
- let prepped_txs = given_txs
103
- . into_par_iter ( )
104
- . map ( |tx| {
105
- let tx = new_tx_verification_data ( tx) ?;
106
- Ok ( ( tx. tx_hash , tx) )
107
- } )
108
- . collect :: < Result < _ , anyhow:: Error > > ( )
109
- . map_err ( IncomingBlockError :: InvalidBlock ) ?;
109
+ if !missing. is_empty ( ) {
110
+ let needed_hashes = missing. iter ( ) . map ( |index| block. transactions [ * index] ) ;
111
+
112
+ for needed_hash in needed_hashes {
113
+ let Some ( tx) = given_txs. remove ( & needed_hash) else {
114
+ // We return back the indexes of all txs missing from our pool, not taking into account the txs
115
+ // that were given with the block, as these txs will be dropped. It is not worth it to try to add
116
+ // these txs to the pool as this will only happen with a misbehaving peer or if the txpool reaches
117
+ // the size limit.
118
+ return Err ( IncomingBlockError :: UnknownTransactions ( block_hash, missing) ) ;
119
+ } ;
120
+
121
+ txs. insert (
122
+ needed_hash,
123
+ new_tx_verification_data ( tx)
124
+ . map_err ( |e| IncomingBlockError :: InvalidBlock ( e. into ( ) ) ) ?,
125
+ ) ;
126
+ }
127
+ }
110
128
111
129
let Some ( incoming_block_tx) = COMMAND_TX . get ( ) else {
112
130
// We could still be starting up the blockchain manager.
@@ -119,28 +137,37 @@ pub async fn handle_incoming_block(
119
137
return Ok ( IncomingBlockOk :: AlreadyHave ) ;
120
138
}
121
139
122
- // From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`.
140
+ // We must remove the block hash from `BLOCKS_BEING_HANDLED`.
141
+ let _guard = {
142
+ struct RemoveFromBlocksBeingHandled {
143
+ block_hash : [ u8 ; 32 ] ,
144
+ }
145
+ impl Drop for RemoveFromBlocksBeingHandled {
146
+ fn drop ( & mut self ) {
147
+ BLOCKS_BEING_HANDLED
148
+ . lock ( )
149
+ . unwrap ( )
150
+ . remove ( & self . block_hash ) ;
151
+ }
152
+ }
153
+ RemoveFromBlocksBeingHandled { block_hash }
154
+ } ;
123
155
124
156
let ( response_tx, response_rx) = oneshot:: channel ( ) ;
125
157
126
158
incoming_block_tx
127
159
. send ( BlockchainManagerCommand :: AddBlock {
128
160
block,
129
- prepped_txs,
161
+ prepped_txs : txs ,
130
162
response_tx,
131
163
} )
132
164
. await
133
165
. expect ( "TODO: don't actually panic here, an err means we are shutting down" ) ;
134
166
135
- let res = response_rx
167
+ response_rx
136
168
. await
137
169
. expect ( "The blockchain manager will always respond" )
138
- . map_err ( IncomingBlockError :: InvalidBlock ) ;
139
-
140
- // Remove the block hash from the blocks being handled.
141
- BLOCKS_BEING_HANDLED . lock ( ) . unwrap ( ) . remove ( & block_hash) ;
142
-
143
- res
170
+ . map_err ( IncomingBlockError :: InvalidBlock )
144
171
}
145
172
146
173
/// Check if we have a block with the given hash.
0 commit comments