Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
b4b73ec
add cuprated skeleton
Boog900 Aug 16, 2024
39d48fe
fmt and add deny exception
Boog900 Aug 16, 2024
a018469
add main chain batch handler
Boog900 Aug 20, 2024
f909c26
add blockchain init
Boog900 Aug 21, 2024
f25588d
very rough block manager
Boog900 Aug 23, 2024
1c93ea1
misc changes
Boog900 Aug 29, 2024
05d0cf2
move more config values
Boog900 Aug 29, 2024
d648871
add new tables & types
Boog900 Aug 29, 2024
e1ae848
add function to fully add an alt block
Boog900 Aug 30, 2024
ed887a7
resolve current todo!s
Boog900 Aug 30, 2024
bc619b6
add new requests
Boog900 Aug 31, 2024
029f439
WIP: starting re-orgs
Boog900 Sep 1, 2024
6927b05
add last service request
Boog900 Sep 5, 2024
21e4b3a
commit Cargo.lock
Boog900 Sep 5, 2024
a9d8eee
Merge branch 'main' into storage-alt-blocks
Boog900 Sep 5, 2024
123aedd
add test
Boog900 Sep 6, 2024
ba5c5ac
more docs + cleanup + alt blocks request
Boog900 Sep 7, 2024
f92375f
clippy + fmt
Boog900 Sep 7, 2024
a864f93
document types
Boog900 Sep 7, 2024
6119972
move tx_fee to helper
Boog900 Sep 8, 2024
b211210
more doc updates
Boog900 Sep 8, 2024
68807e7
fmt
Boog900 Sep 8, 2024
c03065b
fix imports
Boog900 Sep 8, 2024
1831fa6
remove config files
Boog900 Sep 9, 2024
20033ee
Merge branch 'main' into cuprated-blockchain
Boog900 Sep 9, 2024
da78cbd
fix merge errors
Boog900 Sep 9, 2024
b5f8475
Merge branch 'storage-alt-blocks' into cuprated-blockchain
Boog900 Sep 9, 2024
d4e0e30
fix generated coins
Boog900 Sep 9, 2024
915633f
handle more p2p requests + alt blocks
Boog900 Sep 12, 2024
01a3065
clean up handler code
Boog900 Sep 12, 2024
6ec5bc3
add function for incoming blocks
Boog900 Sep 12, 2024
90beed1
add docs to handler functions
Boog900 Sep 13, 2024
a16381e
broadcast new blocks + add commands
Boog900 Sep 14, 2024
d2ab8e2
add fluffy block handler
Boog900 Sep 15, 2024
291ffe3
fix new block handling
Boog900 Sep 15, 2024
c0a3f7a
small cleanup
Boog900 Sep 15, 2024
fa54df2
increase outbound peer count
Boog900 Sep 16, 2024
a7553c2
Merge branch 'main' into cuprated-blockchain
Boog900 Oct 1, 2024
69f9d84
fix merge
Boog900 Oct 3, 2024
caaeced
clean up the blockchain manger
Boog900 Oct 3, 2024
8cff481
add more docs + cleanup imports
Boog900 Oct 3, 2024
4af0f89
fix typo
Boog900 Oct 3, 2024
6702dbe
fix doc
Boog900 Oct 3, 2024
403964b
remove unrelated changes
Boog900 Oct 3, 2024
783f426
improve interface globals
Boog900 Oct 5, 2024
375a1e1
manger -> manager
Boog900 Oct 5, 2024
f50d921
enums instead of bools
Boog900 Oct 5, 2024
27a8acd
move chain service to separate file
Boog900 Oct 5, 2024
a21e489
more review fixes
Boog900 Oct 5, 2024
c87edf2
add link to issue
Boog900 Oct 6, 2024
e7aaf3c
fix syncer + update comment
Boog900 Oct 8, 2024
173f4f7
fmt
Boog900 Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions binaries/cuprated/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use cuprate_types::{

use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;

mod chain_service;
pub mod interface;
mod manager;
mod syncer;
Expand Down
72 changes: 72 additions & 0 deletions binaries/cuprated/src/blockchain/chain_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::task::{Context, Poll};

use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use tower::Service;

use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse};
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};

/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out
/// what blocks we need.
///
/// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier.
#[derive(Clone)]
pub struct ChainService(pub BlockchainReadHandle);

impl Service<ChainSvcRequest> for ChainService {
type Response = ChainSvcResponse;
type Error = tower::BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, req: ChainSvcRequest) -> Self::Future {
let map_res = |res: BlockchainResponse| match res {
BlockchainResponse::CompactChainHistory {
block_ids,
cumulative_difficulty,
} => ChainSvcResponse::CompactHistory {
block_ids,
cumulative_difficulty,
},
BlockchainResponse::FindFirstUnknown(res) => ChainSvcResponse::FindFirstUnknown(res),
_ => unreachable!(),
};

match req {
ChainSvcRequest::CompactHistory => self
.0
.call(BlockchainReadRequest::CompactChainHistory)
.map_ok(map_res)
.map_err(Into::into)
.boxed(),
ChainSvcRequest::FindFirstUnknown(req) => self
.0
.call(BlockchainReadRequest::FindFirstUnknown(req))
.map_ok(map_res)
.map_err(Into::into)
.boxed(),
ChainSvcRequest::CumulativeDifficulty => self
.0
.call(BlockchainReadRequest::CompactChainHistory)
.map_ok(|res| {
// TODO create a custom request instead of hijacking this one.
// TODO: use the context cache.
let BlockchainResponse::CompactChainHistory {
cumulative_difficulty,
..
} = res
else {
unreachable!()
};

ChainSvcResponse::CumulativeDifficulty(cumulative_difficulty)
})
.map_err(Into::into)
.boxed(),
}
}
}
63 changes: 28 additions & 35 deletions binaries/cuprated/src/blockchain/interface.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! The blockchain manger interface.
//! The blockchain manager interface.
//!
//! This module contains all the functions to mutate the blockchain's state in any way, through the
//! blockchain manger.
//! blockchain manager.
use std::{
collections::{HashMap, HashSet},
sync::{Mutex, OnceLock},
sync::{LazyLock, Mutex, OnceLock},
};

use monero_serai::{block::Block, transaction::Transaction};
Expand All @@ -21,18 +21,15 @@ use cuprate_types::{
};

use crate::{
blockchain::manager::BlockchainManagerCommand, constants::PANIC_CRITICAL_SERVICE_ERROR,
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
constants::PANIC_CRITICAL_SERVICE_ERROR,
};

/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manger.
pub static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();

/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager.
///
/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of
/// time for new blocks so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks`
/// which are also more expensive than `Mutex`s.
pub static BLOCKS_BEING_HANDLED: OnceLock<Mutex<HashSet<[u8; 32]>>> = OnceLock::new();
/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions
/// in this file document what happens if this is not initialized when they are called.
pub(super) static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (super) is fine but I would have preferred if it were moved to manager.rs and kept private - it is still writeable from interface.rs since it's defined there, even though it only needs a &mpsc::Sender.

Doesn't matter too much I guess but then again things being scoped is nice.


/// An error that can be returned from [`handle_incoming_block`].
#[derive(Debug, thiserror::Error)]
Expand All @@ -52,10 +49,7 @@ pub enum IncomingBlockError {

/// Try to add a new block to the blockchain.
///
/// This returns a [`bool`] indicating if the block was added to the main-chain ([`true`]) or an alt-chain
/// ([`false`]).
///
/// If we already knew about this block or the blockchain manger is not setup yet `Ok(false)` is returned.
/// On success returns [`IncomingBlockOk`].
///
/// # Errors
///
Expand All @@ -67,7 +61,17 @@ pub async fn handle_incoming_block(
block: Block,
given_txs: Vec<Transaction>,
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<bool, IncomingBlockError> {
) -> Result<IncomingBlockOk, IncomingBlockError> {
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
///
/// This lock prevents sending the same block to the blockchain manager from multiple connections
/// before one of them actually gets added to the chain, allowing peers to do other things.
///
/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of
/// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks`
/// which are also more expensive than `Mutex`s.
static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
LazyLock::new(|| Mutex::new(HashSet::new()));
Comment on lines +73 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could/should this be a tokio::sync::Mutex?

As it is now, if handle_incoming_block is asynchronously called from many many places, the std::sync::Mutex may block the executor more than tokio::sync::Mutex, right?

Although IDK really, the lock isn't held for long so this probably doesn't matter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although std Mutex is blocking we hold the lock for a tiny amount of time + an async Mutex is a lot more expensive: https://docs.rs/tokio/latest/tokio/sync/struct.Mutex.html#which-kind-of-mutex-should-you-use

// FIXME: we should look in the tx-pool for txs when that is ready.

if !block_exists(block.header.previous, blockchain_read_handle)
Expand All @@ -83,7 +87,7 @@ pub async fn handle_incoming_block(
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
{
return Ok(false);
return Ok(IncomingBlockOk::AlreadyHave);
}

// TODO: remove this when we have a working tx-pool.
Expand All @@ -105,20 +109,14 @@ pub async fn handle_incoming_block(
.map_err(IncomingBlockError::InvalidBlock)?;

let Some(incoming_block_tx) = COMMAND_TX.get() else {
// We could still be starting up the blockchain manger, so just return this as there is nothing
// else we can do.
return Ok(false);
// We could still be starting up the blockchain manager.
return Ok(IncomingBlockOk::NotReady);
};

// Add the blocks hash to the blocks being handled.
if !BLOCKS_BEING_HANDLED
.get_or_init(|| Mutex::new(HashSet::new()))
.lock()
.unwrap()
.insert(block_hash)
{
if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) {
// If another place is already adding this block then we can stop.
return Ok(false);
return Ok(IncomingBlockOk::AlreadyHave);
}

// From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`.
Expand All @@ -140,12 +138,7 @@ pub async fn handle_incoming_block(
.map_err(IncomingBlockError::InvalidBlock);

// Remove the block hash from the blocks being handled.
BLOCKS_BEING_HANDLED
.get()
.unwrap()
.lock()
.unwrap()
.remove(&block_hash);
BLOCKS_BEING_HANDLED.lock().unwrap().remove(&block_hash);

res
}
Expand All @@ -161,7 +154,7 @@ async fn block_exists(
.call(BlockchainReadRequest::FindBlock(block_hash))
.await?
else {
panic!("Invalid blockchain response!");
unreachable!();
};

Ok(chain.is_some())
Expand Down
14 changes: 7 additions & 7 deletions binaries/cuprated/src/blockchain/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use cuprate_types::{

use crate::{
blockchain::{
chain_service::ChainService,
interface::COMMAND_TX,
syncer,
types::ChainService,
types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle},
},
constants::PANIC_CRITICAL_SERVICE_ERROR,
Expand All @@ -35,13 +35,13 @@ use crate::{
mod commands;
mod handler;

pub use commands::BlockchainManagerCommand;
pub use commands::{BlockchainManagerCommand, IncomingBlockOk};

/// Initialize the blockchain manger.
/// Initialize the blockchain manager.
///
/// This function sets up the [`BlockchainManager`] and the [`syncer`] so that the functions in [`interface`](super::interface)
/// can be called.
pub async fn init_blockchain_manger(
pub async fn init_blockchain_manager(
clearnet_interface: NetworkInterface<ClearNet>,
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
Expand Down Expand Up @@ -73,10 +73,10 @@ pub async fn init_blockchain_manger(
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
panic!("Blockchain context service returned wrong response!");
unreachable!()
};

let manger = BlockchainManager {
let manager = BlockchainManager {
blockchain_write_handle,
blockchain_read_handle,
blockchain_context_service,
Expand All @@ -86,7 +86,7 @@ pub async fn init_blockchain_manger(
broadcast_svc: clearnet_interface.broadcast_svc(),
};

tokio::spawn(manger.run(batch_rx, command_rx));
tokio::spawn(manager.run(batch_rx, command_rx));
}

/// The blockchain manager.
Expand Down
16 changes: 14 additions & 2 deletions binaries/cuprated/src/blockchain/manager/commands.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This module contains the commands for th blockchain manager.
//! This module contains the commands for the blockchain manager.
use std::collections::HashMap;

use monero_serai::block::Block;
Expand All @@ -15,6 +15,18 @@ pub enum BlockchainManagerCommand {
/// All the transactions defined in [`Block::transactions`].
prepped_txs: HashMap<[u8; 32], TransactionVerificationData>,
/// The channel to send the response down.
response_tx: oneshot::Sender<Result<bool, anyhow::Error>>,
response_tx: oneshot::Sender<Result<IncomingBlockOk, anyhow::Error>>,
},
}

/// The [`Ok`] response for an incoming block.
pub enum IncomingBlockOk {
/// The block was added to the main-chain.
AddedToMainChain,
/// The blockchain manager is not ready yet.
NotReady,
/// The block was added to an alt-chain.
AddedToAltChain,
/// We already have the block.
AlreadyHave,
}
Loading
Loading