Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1053857
add `cuprate-types`
hinto-janai Mar 31, 2024
56110d5
remove `cuprate_database::service::{request,response}`
hinto-janai Mar 31, 2024
147be97
use `cuprate_types::service::{request,response}`
hinto-janai Mar 31, 2024
b8dad11
service: fix `Request` `match`'s
hinto-janai Apr 1, 2024
9836254
service: create `ReadRequest` function mappings
hinto-janai Apr 1, 2024
f81d0ce
service: create `WriteRequest` function mappings
hinto-janai Apr 1, 2024
cc3b80d
service: add rough `WriteRequest` retry loop
hinto-janai Apr 1, 2024
4c5c747
service: handle `RuntimeError::ResizeNeeded` in writer
hinto-janai Apr 1, 2024
da25eeb
add `{R,r}o` exception to typos
hinto-janai Apr 1, 2024
396dde5
docs
hinto-janai Apr 1, 2024
a9379a9
Merge branch 'main' into db
hinto-janai Apr 11, 2024
ad2ec3f
env: make `resize_map()` return new memory map byte size
hinto-janai Apr 12, 2024
71377f5
write: proactively handle resizes
hinto-janai Apr 12, 2024
684ad0d
read: use type aliases
hinto-janai Apr 12, 2024
c7961ec
docs
hinto-janai Apr 12, 2024
6275644
fix import
hinto-janai Apr 12, 2024
d8cd2fc
write: handle resizes reactively
hinto-janai Apr 14, 2024
dbcf35f
service: panic if response can't be sent back
hinto-janai Apr 14, 2024
0fe535d
write: add loop unreachable asserts
hinto-janai Apr 14, 2024
640b54e
service: print and drop error instead of panic
hinto-janai Apr 14, 2024
37a4317
write: fix retry loop off-by-1
hinto-janai Apr 14, 2024
9107cb1
write: fix docs
hinto-janai Apr 14, 2024
c9b46bc
review changes
hinto-janai Apr 14, 2024
6106013
update readme
hinto-janai Apr 14, 2024
7f7f1d4
remove `BlockBatchInRange` request/response
hinto-janai Apr 14, 2024
da08b25
Update database/README.md
hinto-janai Apr 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ cfg-if = { workspace = true }
# We only need the `thread` feature if `service` is enabled.
# Figure out how to enable features of an already pulled in dependency conditionally.
cuprate-helper = { path = "../helper", features = ["fs", "thread"] }
cuprate-types = { path = "../types", features = ["service"] }
paste = { workspace = true }
page_size = { version = "0.6.0" } # Needed for database resizes, they must be a multiple of the OS page size.
thiserror = { workspace = true }
Expand Down
13 changes: 10 additions & 3 deletions database/src/backend/heed/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{
cell::RefCell,
fmt::Debug,
num::NonZeroUsize,
ops::Deref,
sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
};
Expand Down Expand Up @@ -249,11 +250,11 @@ impl Env for ConcreteEnv {
Ok(self.env.read().unwrap().force_sync()?)
}

fn resize_map(&self, resize_algorithm: Option<ResizeAlgorithm>) {
fn resize_map(&self, resize_algorithm: Option<ResizeAlgorithm>) -> NonZeroUsize {
let resize_algorithm = resize_algorithm.unwrap_or_else(|| self.config().resize_algorithm);

let current_size_bytes = self.current_map_size();
let new_size_bytes = resize_algorithm.resize(current_size_bytes).get();
let new_size_bytes = resize_algorithm.resize(current_size_bytes);

// SAFETY:
// Resizing requires that we have
Expand All @@ -264,8 +265,14 @@ impl Env for ConcreteEnv {
// <http://www.lmdb.tech/doc/group__mdb.html#gaa2506ec8dab3d969b0e609cd82e619e5>
unsafe {
// INVARIANT: `resize()` returns a valid `usize` to resize to.
self.env.write().unwrap().resize(new_size_bytes).unwrap();
self.env
.write()
.unwrap()
.resize(new_size_bytes.get())
.unwrap();
}

new_size_bytes
}

#[inline]
Expand Down
4 changes: 2 additions & 2 deletions database/src/env.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Abstracted database environment; `trait Env`.

//---------------------------------------------------------------------------------------------------- Import
use std::{fmt::Debug, ops::Deref};
use std::{fmt::Debug, num::NonZeroUsize, ops::Deref};

use crate::{
config::Config,
Expand Down Expand Up @@ -113,7 +113,7 @@ pub trait Env: Sized {
/// This function _must_ be re-implemented if [`Env::MANUAL_RESIZE`] is `true`.
///
/// Otherwise, this function will panic with `unreachable!()`.
fn resize_map(&self, resize_algorithm: Option<ResizeAlgorithm>) {
fn resize_map(&self, resize_algorithm: Option<ResizeAlgorithm>) -> NonZeroUsize {
unreachable!()
}

Expand Down
2 changes: 2 additions & 0 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
//! config::Config,
//! ConcreteEnv,
//! Env, Key, TxRo, TxRw,
//! };
//! use cuprate_types::{
//! service::{ReadRequest, WriteRequest, Response},
//! };
//!
Expand Down
20 changes: 11 additions & 9 deletions database/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! along with the reader/writer thread-pool system.
//!
//! The thread-pool allows outside crates to communicate with it by
//! sending database [`Request`](ReadRequest)s and receiving [`Response`]s `async`hronously -
//! sending database [`Request`][req_r]s and receiving [`Response`][resp]s `async`hronously -
//! without having to actually worry and handle the database themselves.
//!
//! The system is managed by this crate, and only requires [`init`] by the user.
Expand All @@ -17,9 +17,9 @@
//! - [`DatabaseReadHandle`]
//! - [`DatabaseWriteHandle`]
//!
//! The 1st allows any caller to send [`ReadRequest`]s.
//! The 1st allows any caller to send [`ReadRequest`][req_r]s.
//!
//! The 2nd allows any caller to send [`WriteRequest`]s.
//! The 2nd allows any caller to send [`WriteRequest`][req_w]s.
//!
//! The `DatabaseReadHandle` can be shared as it is cheaply [`Clone`]able, however,
//! the `DatabaseWriteHandle` cannot be cloned. There is only 1 place in Cuprate that
Expand Down Expand Up @@ -49,6 +49,14 @@
//! An `async`hronous channel will be returned from the call.
//! This channel can be `.await`ed upon to (eventually) receive
//! the corresponding `Response` to your `Request`.
//!
//!
//!
//! [req_r]: cuprate_types::service::ReadRequest
//!
//! [req_w]: cuprate_types::service::WriteRequest
//!
//! [resp]: cuprate_types::service::Response

mod read;
pub use read::DatabaseReadHandle;
Expand All @@ -59,11 +67,5 @@ pub use write::DatabaseWriteHandle;
mod free;
pub use free::init;

mod request;
pub use request::{ReadRequest, WriteRequest};

mod response;
pub use response::Response;

#[cfg(test)]
mod tests;
106 changes: 80 additions & 26 deletions database/src/service/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

//---------------------------------------------------------------------------------------------------- Import
use std::{
collections::{HashMap, HashSet},
ops::Range,
sync::Arc,
task::{Context, Poll},
};
Expand All @@ -14,11 +16,12 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore;

use cuprate_helper::asynch::InfallibleOneshotReceiver;
use cuprate_types::service::{ReadRequest, Response};

use crate::{
config::ReaderThreads,
error::RuntimeError,
service::{request::ReadRequest, response::Response},
types::{Amount, AmountIndex, BlockHeight, KeyImage},
ConcreteEnv,
};

Expand Down Expand Up @@ -155,10 +158,8 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
}

// Acquire a permit before returning `Ready`.
let Some(permit) = ready!(self.semaphore.poll_acquire(cx)) else {
// `self` itself owns the backing semaphore, so it can't be closed.
unreachable!();
};
let permit = ready!(self.semaphore.poll_acquire(cx))
.expect("`self` itself owns the backing semaphore, so it can't be closed.");

self.permit = Some(permit);
Poll::Ready(Ok(()))
Expand Down Expand Up @@ -199,30 +200,46 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
///
/// This is the main entrance into all `Request` handler functions.
/// The basic structure is:
///
/// 1. `Request` is mapped to a handler function
/// 2. Handler function is called
/// 3. [`Response`] is sent
fn map_request(
_permit: OwnedSemaphorePermit, // Permit for this request
env: Arc<ConcreteEnv>, // Access to the database
request: ReadRequest, // The request we must fulfill
_permit: OwnedSemaphorePermit, // Permit for this request, dropped at end of function
env: Arc<ConcreteEnv>, // Access to the database
request: ReadRequest, // The request we must fulfill
response_sender: ResponseSender, // The channel we must send the response back to
) {
/* TODO: pre-request handling, run some code for each request? */

match request {
ReadRequest::Example1 => example_handler_1(env, response_sender),
ReadRequest::Example2(x) => example_handler_2(env, response_sender, x),
ReadRequest::Example3(x) => example_handler_3(env, response_sender, x),
}
use ReadRequest as R;

let response = match request {
R::BlockExtendedHeader(block) => block_extended_header(&env, block),
R::BlockHash(block) => block_hash(&env, block),
R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(&env, range),
R::ChainHeight => chain_height(&env),
R::GeneratedCoins => generated_coins(&env),
R::Outputs(map) => outputs(&env, map),
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(&env, vec),
R::CheckKIsNotSpent(set) => check_k_is_not_spent(&env, set),
R::BlockBatchInRange(range) => block_batch_in_range(&env, range),
};

response_sender
.send(response)
.expect("database reader thread failed to send response back to requester");

/* TODO: post-request handling, run some code for each request? */
}

//---------------------------------------------------------------------------------------------------- Handler functions
// These are the actual functions that do stuff according to the incoming [`Request`].
//
// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`.
//
// Each function will return the [`Response`] that we
// should send back to the caller in [`map_request()`].
//
// INVARIANT:
// These functions are called above in `tower::Service::call()`
// using a custom threadpool which means any call to `par_*()` functions
Expand All @@ -231,26 +248,63 @@ fn map_request(
// All functions below assume that this is the case, such that
// `par_*()` functions will not block the _global_ rayon thread-pool.

/// TODO
/// [`ReadRequest::BlockExtendedHeader`].
#[inline]
fn block_extended_header(env: &Arc<ConcreteEnv>, block_height: BlockHeight) -> ResponseResult {
todo!()
}

/// [`ReadRequest::BlockHash`].
#[inline]
fn block_hash(env: &Arc<ConcreteEnv>, block_height: BlockHeight) -> ResponseResult {
todo!()
}

/// [`ReadRequest::BlockExtendedHeaderInRange`].
#[inline]
fn block_extended_header_in_range(
env: &Arc<ConcreteEnv>,
range: std::ops::Range<BlockHeight>,
) -> ResponseResult {
todo!()
}

/// [`ReadRequest::ChainHeight`].
#[inline]
fn chain_height(env: &Arc<ConcreteEnv>) -> ResponseResult {
todo!()
}

/// [`ReadRequest::GeneratedCoins`].
#[inline]
fn generated_coins(env: &Arc<ConcreteEnv>) -> ResponseResult {
todo!()
}

/// [`ReadRequest::Outputs`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn example_handler_1(env: Arc<ConcreteEnv>, response_sender: ResponseSender) {
let db_result = Ok(Response::Example1);
response_sender.send(db_result).unwrap();
fn outputs(env: &Arc<ConcreteEnv>, map: HashMap<Amount, HashSet<AmountIndex>>) -> ResponseResult {
todo!()
}

/// [`ReadRequest::NumberOutputsWithAmount`].
/// TODO
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn example_handler_2(env: Arc<ConcreteEnv>, response_sender: ResponseSender, x: usize) {
let db_result = Ok(Response::Example2(x));
response_sender.send(db_result).unwrap();
fn number_outputs_with_amount(env: &Arc<ConcreteEnv>, vec: Vec<Amount>) -> ResponseResult {
todo!()
}

/// TODO
/// [`ReadRequest::CheckKIsNotSpent`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn example_handler_3(env: Arc<ConcreteEnv>, response_sender: ResponseSender, x: String) {
let db_result = Ok(Response::Example3(x));
response_sender.send(db_result).unwrap();
fn check_k_is_not_spent(env: &Arc<ConcreteEnv>, set: HashSet<KeyImage>) -> ResponseResult {
todo!()
}

/// [`ReadRequest::BlockBatchInRange`].
#[inline]
fn block_batch_in_range(env: &Arc<ConcreteEnv>, range: Range<BlockHeight>) -> ResponseResult {
todo!()
}
41 changes: 0 additions & 41 deletions database/src/service/request.rs

This file was deleted.

38 changes: 0 additions & 38 deletions database/src/service/response.rs

This file was deleted.

Loading