Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1053857
add `cuprate-types`
hinto-janai Mar 31, 2024
56110d5
remove `cuprate_database::service::{request,response}`
hinto-janai Mar 31, 2024
147be97
use `cuprate_types::service::{request,response}`
hinto-janai Mar 31, 2024
b8dad11
service: fix `Request` `match`'s
hinto-janai Apr 1, 2024
9836254
service: create `ReadRequest` function mappings
hinto-janai Apr 1, 2024
f81d0ce
service: create `WriteRequest` function mappings
hinto-janai Apr 1, 2024
cc3b80d
service: add rough `WriteRequest` retry loop
hinto-janai Apr 1, 2024
4c5c747
service: handle `RuntimeError::ResizeNeeded` in writer
hinto-janai Apr 1, 2024
da25eeb
add `{R,r}o` exception to typos
hinto-janai Apr 1, 2024
396dde5
docs
hinto-janai Apr 1, 2024
a9379a9
Merge branch 'main' into db
hinto-janai Apr 11, 2024
ad2ec3f
env: make `resize_map()` return new memory map byte size
hinto-janai Apr 12, 2024
71377f5
write: proactively handle resizes
hinto-janai Apr 12, 2024
684ad0d
read: use type aliases
hinto-janai Apr 12, 2024
c7961ec
docs
hinto-janai Apr 12, 2024
6275644
fix import
hinto-janai Apr 12, 2024
d8cd2fc
write: handle resizes reactively
hinto-janai Apr 14, 2024
dbcf35f
service: panic if response can't be sent back
hinto-janai Apr 14, 2024
0fe535d
write: add loop unreachable asserts
hinto-janai Apr 14, 2024
640b54e
service: print and drop error instead of panic
hinto-janai Apr 14, 2024
37a4317
write: fix retry loop off-by-1
hinto-janai Apr 14, 2024
9107cb1
write: fix docs
hinto-janai Apr 14, 2024
c9b46bc
review changes
hinto-janai Apr 14, 2024
6106013
update readme
hinto-janai Apr 14, 2024
7f7f1d4
remove `BlockBatchInRange` request/response
hinto-janai Apr 14, 2024
da08b25
Update database/README.md
hinto-janai Apr 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ cfg-if = { workspace = true }
# We only need the `thread` feature if `service` is enabled.
# Figure out how to enable features of an already pulled in dependency conditionally.
cuprate-helper = { path = "../helper", features = ["fs", "thread"] }
cuprate-types = { path = "../types", features = ["service"] }
paste = { workspace = true }
page_size = { version = "0.6.0" } # Needed for database resizes, they must be a multiple of the OS page size.
thiserror = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
//! config::Config,
//! ConcreteEnv,
//! Env, Key, TxRo, TxRw,
//! };
//! use cuprate_types::{
//! service::{ReadRequest, WriteRequest, Response},
//! };
//!
Expand Down
20 changes: 11 additions & 9 deletions database/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! along with the reader/writer thread-pool system.
//!
//! The thread-pool allows outside crates to communicate with it by
//! sending database [`Request`](ReadRequest)s and receiving [`Response`]s `async`hronously -
//! sending database [`Request`][req_r]s and receiving [`Response`][resp]s `async`hronously -
//! without having to actually worry and handle the database themselves.
//!
//! The system is managed by this crate, and only requires [`init`] by the user.
Expand All @@ -17,9 +17,9 @@
//! - [`DatabaseReadHandle`]
//! - [`DatabaseWriteHandle`]
//!
//! The 1st allows any caller to send [`ReadRequest`]s.
//! The 1st allows any caller to send [`ReadRequest`][req_r]s.
//!
//! The 2nd allows any caller to send [`WriteRequest`]s.
//! The 2nd allows any caller to send [`WriteRequest`][req_w]s.
//!
//! The `DatabaseReadHandle` can be shared as it is cheaply [`Clone`]able, however,
//! the `DatabaseWriteHandle` cannot be cloned. There is only 1 place in Cuprate that
Expand Down Expand Up @@ -49,6 +49,14 @@
//! An `async`hronous channel will be returned from the call.
//! This channel can be `.await`ed upon to (eventually) receive
//! the corresponding `Response` to your `Request`.
//!
//!
//!
//! [req_r]: cuprate_types::service::ReadRequest
//!
//! [req_w]: cuprate_types::service::WriteRequest
//!
//! [resp]: cuprate_types::service::Response

mod read;
pub use read::DatabaseReadHandle;
Expand All @@ -59,11 +67,5 @@ pub use write::DatabaseWriteHandle;
mod free;
pub use free::init;

mod request;
pub use request::{ReadRequest, WriteRequest};

mod response;
pub use response::Response;

#[cfg(test)]
mod tests;
97 changes: 74 additions & 23 deletions database/src/service/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

//---------------------------------------------------------------------------------------------------- Import
use std::{
collections::{HashMap, HashSet},
ops::Range,
sync::Arc,
task::{Context, Poll},
};
Expand All @@ -14,13 +16,9 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore;

use cuprate_helper::asynch::InfallibleOneshotReceiver;
use cuprate_types::service::{ReadRequest, Response};

use crate::{
config::ReaderThreads,
error::RuntimeError,
service::{request::ReadRequest, response::Response},
ConcreteEnv,
};
use crate::{config::ReaderThreads, error::RuntimeError, ConcreteEnv};

//---------------------------------------------------------------------------------------------------- Types
/// The actual type of the response.
Expand Down Expand Up @@ -210,19 +208,35 @@ fn map_request(
response_sender: ResponseSender, // The channel we must send the response back to
) {
/* TODO: pre-request handling, run some code for each request? */

match request {
ReadRequest::Example1 => example_handler_1(env, response_sender),
ReadRequest::Example2(x) => example_handler_2(env, response_sender, x),
ReadRequest::Example3(x) => example_handler_3(env, response_sender, x),
}
use ReadRequest as R;

let response = match request {
R::BlockExtendedHeader(block) => block_extended_header(&env, block),
R::BlockHash(block) => block_hash(&env, block),
R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(&env, range),
R::ChainHeight => chain_height(&env),
R::GeneratedCoins => generated_coins(&env),
R::Outputs(map) => outputs(&env, map),
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(&env, vec),
R::CheckKIsNotSpent(set) => check_k_is_not_spent(&env, set),
R::BlockBatchInRange(range) => block_batch_in_range(&env, range),
};

// TODO: what do we do if this errors?
response_sender.send(response).unwrap();

/* TODO: post-request handling, run some code for each request? */
}

//---------------------------------------------------------------------------------------------------- Handler functions
// These are the actual functions that do stuff according to the incoming [`Request`].
//
// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`.
//
// Each function will return the [`Response`] that we
// should send back to the caller in [`map_request()`].
//
// INVARIANT:
// These functions are called above in `tower::Service::call()`
// using a custom threadpool which means any call to `par_*()` functions
Expand All @@ -231,26 +245,63 @@ fn map_request(
// All functions below assume that this is the case, such that
// `par_*()` functions will not block the _global_ rayon thread-pool.

/// TODO
/// [`ReadRequest::BlockExtendedHeader`].
#[inline]
fn block_extended_header(env: &Arc<ConcreteEnv>, block: u64) -> ResponseResult {
todo!()
}

/// [`ReadRequest::BlockHash`].
#[inline]
fn block_hash(env: &Arc<ConcreteEnv>, block: u64) -> ResponseResult {
todo!()
}

/// [`ReadRequest::BlockExtendedHeaderInRange`].
#[inline]
fn block_extended_header_in_range(
env: &Arc<ConcreteEnv>,
range: std::ops::Range<u64>,
) -> ResponseResult {
todo!()
}

/// [`ReadRequest::ChainHeight`].
#[inline]
fn chain_height(env: &Arc<ConcreteEnv>) -> ResponseResult {
todo!()
}

/// [`ReadRequest::GeneratedCoins`].
#[inline]
fn generated_coins(env: &Arc<ConcreteEnv>) -> ResponseResult {
todo!()
}

/// [`ReadRequest::Outputs`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn example_handler_1(env: Arc<ConcreteEnv>, response_sender: ResponseSender) {
let db_result = Ok(Response::Example1);
response_sender.send(db_result).unwrap();
fn outputs(env: &Arc<ConcreteEnv>, map: HashMap<u64, HashSet<u64>>) -> ResponseResult {
todo!()
}

/// [`ReadRequest::NumberOutputsWithAmount`].
/// TODO
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn example_handler_2(env: Arc<ConcreteEnv>, response_sender: ResponseSender, x: usize) {
let db_result = Ok(Response::Example2(x));
response_sender.send(db_result).unwrap();
fn number_outputs_with_amount(env: &Arc<ConcreteEnv>, vec: Vec<u64>) -> ResponseResult {
todo!()
}

/// TODO
/// [`ReadRequest::CheckKIsNotSpent`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn example_handler_3(env: Arc<ConcreteEnv>, response_sender: ResponseSender, x: String) {
let db_result = Ok(Response::Example3(x));
response_sender.send(db_result).unwrap();
fn check_k_is_not_spent(env: &Arc<ConcreteEnv>, set: HashSet<[u8; 32]>) -> ResponseResult {
todo!()
}

/// [`ReadRequest::BlockBatchInRange`].
#[inline]
fn block_batch_in_range(env: &Arc<ConcreteEnv>, range: Range<u64>) -> ResponseResult {
todo!()
}
41 changes: 0 additions & 41 deletions database/src/service/request.rs

This file was deleted.

38 changes: 0 additions & 38 deletions database/src/service/response.rs

This file was deleted.

82 changes: 44 additions & 38 deletions database/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
//---------------------------------------------------------------------------------------------------- Use
use tower::{Service, ServiceExt};

use cuprate_types::service::{ReadRequest, Response, WriteRequest};

use crate::{
config::Config,
service::{init, DatabaseReadHandle, DatabaseWriteHandle, ReadRequest, Response, WriteRequest},
service::{init, DatabaseReadHandle, DatabaseWriteHandle},
};

//---------------------------------------------------------------------------------------------------- Tests
Expand All @@ -34,43 +36,47 @@ fn init_drop() {
let (reader, writer, _tempdir) = init_service();
}

/// Send a read request, and receive a response,
/// asserting the response the expected value.
#[tokio::test]
async fn read_request() {
let (reader, writer, _tempdir) = init_service();
// TODO:
// un-comment and fix these tests when all `{read,write}`
// service functions are implemented.

for (request, expected_response) in [
(ReadRequest::Example1, Response::Example1),
(ReadRequest::Example2(123), Response::Example2(123)),
(
ReadRequest::Example3("hello".into()),
Response::Example3("hello".into()),
),
] {
// This calls `poll_ready()` asserting we have a permit before `call()`.
let response_channel = reader.clone().oneshot(request);
let response = response_channel.await.unwrap();
assert_eq!(response, expected_response);
}
}
// /// Send a read request, and receive a response,
// /// asserting the response the expected value.
// #[tokio::test]
// async fn read_request() {
// let (reader, writer, _tempdir) = init_service();

/// Send a write request, and receive a response,
/// asserting the response the expected value.
#[tokio::test]
async fn write_request() {
let (reader, mut writer, _tempdir) = init_service();
// for (request, expected_response) in [
// (ReadRequest::Example1, Response::Example1),
// (ReadRequest::Example2(123), Response::Example2(123)),
// (
// ReadRequest::Example3("hello".into()),
// Response::Example3("hello".into()),
// ),
// ] {
// // This calls `poll_ready()` asserting we have a permit before `call()`.
// let response_channel = reader.clone().oneshot(request);
// let response = response_channel.await.unwrap();
// assert_eq!(response, expected_response);
// }
// }

for (request, expected_response) in [
(WriteRequest::Example1, Response::Example1),
(WriteRequest::Example2(123), Response::Example2(123)),
(
WriteRequest::Example3("hello".into()),
Response::Example3("hello".into()),
),
] {
let response_channel = writer.call(request);
let response = response_channel.await.unwrap();
assert_eq!(response, expected_response);
}
}
// /// Send a write request, and receive a response,
// /// asserting the response the expected value.
// #[tokio::test]
// async fn write_request() {
// let (reader, mut writer, _tempdir) = init_service();

// for (request, expected_response) in [
// (WriteRequest::Example1, Response::Example1),
// (WriteRequest::Example2(123), Response::Example2(123)),
// (
// WriteRequest::Example3("hello".into()),
// Response::Example3("hello".into()),
// ),
// ] {
// let response_channel = writer.call(request);
// let response = response_channel.await.unwrap();
// assert_eq!(response, expected_response);
// }
// }
Loading