-
Notifications
You must be signed in to change notification settings - Fork 49
cuprated: Blockchain Manager #274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
b4b73ec
39d48fe
a018469
f909c26
f25588d
1c93ea1
05d0cf2
d648871
e1ae848
ed887a7
bc619b6
029f439
6927b05
21e4b3a
a9d8eee
123aedd
ba5c5ac
f92375f
a864f93
6119972
b211210
68807e7
c03065b
1831fa6
20033ee
da78cbd
b5f8475
d4e0e30
915633f
01a3065
6ec5bc3
90beed1
a16381e
d2ab8e2
291ffe3
c0a3f7a
fa54df2
a7553c2
69f9d84
caaeced
8cff481
4af0f89
6702dbe
403964b
783f426
375a1e1
f50d921
27a8acd
a21e489
c87edf2
e7aaf3c
173f4f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
use std::task::{Context, Poll}; | ||
|
||
use futures::{future::BoxFuture, FutureExt, TryFutureExt}; | ||
use tower::Service; | ||
|
||
use cuprate_blockchain::service::BlockchainReadHandle; | ||
use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse}; | ||
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; | ||
|
||
/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out | ||
/// what blocks we need. | ||
/// | ||
/// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier. | ||
#[derive(Clone)] | ||
pub struct ChainService(pub BlockchainReadHandle); | ||
|
||
impl Service<ChainSvcRequest> for ChainService { | ||
type Response = ChainSvcResponse; | ||
type Error = tower::BoxError; | ||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; | ||
|
||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
self.0.poll_ready(cx).map_err(Into::into) | ||
} | ||
|
||
fn call(&mut self, req: ChainSvcRequest) -> Self::Future { | ||
let map_res = |res: BlockchainResponse| match res { | ||
BlockchainResponse::CompactChainHistory { | ||
block_ids, | ||
cumulative_difficulty, | ||
} => ChainSvcResponse::CompactHistory { | ||
block_ids, | ||
cumulative_difficulty, | ||
}, | ||
BlockchainResponse::FindFirstUnknown(res) => ChainSvcResponse::FindFirstUnknown(res), | ||
_ => unreachable!(), | ||
}; | ||
|
||
match req { | ||
ChainSvcRequest::CompactHistory => self | ||
.0 | ||
.call(BlockchainReadRequest::CompactChainHistory) | ||
.map_ok(map_res) | ||
.map_err(Into::into) | ||
.boxed(), | ||
ChainSvcRequest::FindFirstUnknown(req) => self | ||
.0 | ||
.call(BlockchainReadRequest::FindFirstUnknown(req)) | ||
.map_ok(map_res) | ||
.map_err(Into::into) | ||
.boxed(), | ||
ChainSvcRequest::CumulativeDifficulty => self | ||
.0 | ||
.call(BlockchainReadRequest::CompactChainHistory) | ||
.map_ok(|res| { | ||
// TODO create a custom request instead of hijacking this one. | ||
// TODO: use the context cache. | ||
let BlockchainResponse::CompactChainHistory { | ||
cumulative_difficulty, | ||
.. | ||
} = res | ||
else { | ||
unreachable!() | ||
}; | ||
|
||
ChainSvcResponse::CumulativeDifficulty(cumulative_difficulty) | ||
}) | ||
.map_err(Into::into) | ||
.boxed(), | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,10 @@ | ||
//! The blockchain manger interface. | ||
//! The blockchain manager interface. | ||
//! | ||
//! This module contains all the functions to mutate the blockchain's state in any way, through the | ||
//! blockchain manger. | ||
//! blockchain manager. | ||
use std::{ | ||
collections::{HashMap, HashSet}, | ||
sync::{Mutex, OnceLock}, | ||
sync::{LazyLock, Mutex, OnceLock}, | ||
}; | ||
|
||
use monero_serai::{block::Block, transaction::Transaction}; | ||
|
@@ -21,18 +21,15 @@ use cuprate_types::{ | |
}; | ||
|
||
use crate::{ | ||
blockchain::manager::BlockchainManagerCommand, constants::PANIC_CRITICAL_SERVICE_ERROR, | ||
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk}, | ||
constants::PANIC_CRITICAL_SERVICE_ERROR, | ||
}; | ||
|
||
/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manger. | ||
pub static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new(); | ||
|
||
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling. | ||
/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager. | ||
/// | ||
/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of | ||
/// time for new blocks so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks` | ||
/// which are also more expensive than `Mutex`s. | ||
pub static BLOCKS_BEING_HANDLED: OnceLock<Mutex<HashSet<[u8; 32]>>> = OnceLock::new(); | ||
/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions | ||
/// in this file document what happens if this is not initialized when they are called. | ||
pub(super) static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new(); | ||
|
||
/// An error that can be returned from [`handle_incoming_block`]. | ||
#[derive(Debug, thiserror::Error)] | ||
|
@@ -52,10 +49,7 @@ pub enum IncomingBlockError { | |
|
||
/// Try to add a new block to the blockchain. | ||
/// | ||
/// This returns a [`bool`] indicating if the block was added to the main-chain ([`true`]) or an alt-chain | ||
/// ([`false`]). | ||
/// | ||
/// If we already knew about this block or the blockchain manger is not setup yet `Ok(false)` is returned. | ||
/// On success returns [`IncomingBlockOk`]. | ||
/// | ||
/// # Errors | ||
/// | ||
|
@@ -67,7 +61,17 @@ pub async fn handle_incoming_block( | |
block: Block, | ||
given_txs: Vec<Transaction>, | ||
blockchain_read_handle: &mut BlockchainReadHandle, | ||
) -> Result<bool, IncomingBlockError> { | ||
) -> Result<IncomingBlockOk, IncomingBlockError> { | ||
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling. | ||
/// | ||
/// This lock prevents sending the same block to the blockchain manager from multiple connections | ||
/// before one of them actually gets added to the chain, allowing peers to do other things. | ||
/// | ||
/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of | ||
/// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks` | ||
/// which are also more expensive than `Mutex`s. | ||
static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> = | ||
LazyLock::new(|| Mutex::new(HashSet::new())); | ||
Comment on lines
+73
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could/should this be a As it is now, if Although IDK really, the lock isn't held for long so this probably doesn't matter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although std Mutex is blocking we hold the lock for a tiny amount of time + an async Mutex is a lot more expensive: https://docs.rs/tokio/latest/tokio/sync/struct.Mutex.html#which-kind-of-mutex-should-you-use |
||
// FIXME: we should look in the tx-pool for txs when that is ready. | ||
Boog900 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if !block_exists(block.header.previous, blockchain_read_handle) | ||
|
@@ -83,7 +87,7 @@ pub async fn handle_incoming_block( | |
.await | ||
.expect(PANIC_CRITICAL_SERVICE_ERROR) | ||
{ | ||
return Ok(false); | ||
return Ok(IncomingBlockOk::AlreadyHave); | ||
} | ||
|
||
// TODO: remove this when we have a working tx-pool. | ||
|
@@ -105,20 +109,14 @@ pub async fn handle_incoming_block( | |
.map_err(IncomingBlockError::InvalidBlock)?; | ||
|
||
let Some(incoming_block_tx) = COMMAND_TX.get() else { | ||
// We could still be starting up the blockchain manger, so just return this as there is nothing | ||
// else we can do. | ||
return Ok(false); | ||
// We could still be starting up the blockchain manager. | ||
return Ok(IncomingBlockOk::NotReady); | ||
}; | ||
|
||
// Add the blocks hash to the blocks being handled. | ||
if !BLOCKS_BEING_HANDLED | ||
.get_or_init(|| Mutex::new(HashSet::new())) | ||
.lock() | ||
.unwrap() | ||
.insert(block_hash) | ||
{ | ||
if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) { | ||
// If another place is already adding this block then we can stop. | ||
return Ok(false); | ||
return Ok(IncomingBlockOk::AlreadyHave); | ||
} | ||
|
||
// From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`. | ||
|
@@ -140,12 +138,7 @@ pub async fn handle_incoming_block( | |
.map_err(IncomingBlockError::InvalidBlock); | ||
|
||
// Remove the block hash from the blocks being handled. | ||
BLOCKS_BEING_HANDLED | ||
.get() | ||
.unwrap() | ||
.lock() | ||
.unwrap() | ||
.remove(&block_hash); | ||
BLOCKS_BEING_HANDLED.lock().unwrap().remove(&block_hash); | ||
|
||
res | ||
} | ||
|
@@ -161,7 +154,7 @@ async fn block_exists( | |
.call(BlockchainReadRequest::FindBlock(block_hash)) | ||
.await? | ||
else { | ||
panic!("Invalid blockchain response!"); | ||
unreachable!(); | ||
}; | ||
|
||
Ok(chain.is_some()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This
(super)
is fine but I would have preferred if it were moved tomanager.rs
and kept private - it is still writeable frominterface.rs
since it's defined there, even though it only needs a&mpsc::Sender
.Doesn't matter too much I guess but then again things being scoped is nice.