Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,022 changes: 772 additions & 250 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ cryo_cli = { version = "0.3.2", path = "./crates/cli" }
cryo_freeze = { version = "0.3.2", path = "./crates/freeze" }
cryo_to_df = { version = "0.3.2", path = "./crates/to_df" }

alloy = { version = "0.6.4", features = [
alloy = { version = "1.0.9", features = [
"full",
"rpc-types-trace",
"provider-ws",
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ fn get_after_str() -> String {
<white><bold>cryo help</bold></white>"#
);
let post_subcommands = " <DATASET(S)> display info about a dataset";
format!("{}{}{}", header, subcommands, post_subcommands)
format!("{header}{subcommands}{post_subcommands}")
}

fn get_datatype_help() -> &'static str {
Expand Down
29 changes: 16 additions & 13 deletions crates/cli/src/parse/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(crate) async fn parse_blocks(
for path in files {
let column = if path.contains(':') {
path.split(':')
.last()
.next_back()
.ok_or(ParseError::ParseError("could not parse txs path column".to_string()))?
} else {
"block_number"
Expand Down Expand Up @@ -395,8 +395,8 @@ mod tests {
use std::path::PathBuf;

use alloy::{
providers::{IpcConnect, ProviderBuilder},
transports::ipc::MockIpcServer,
providers::{Provider, ProviderBuilder},
transports::ipc::{IpcConnect, MockIpcServer},
};

use super::*;
Expand All @@ -412,7 +412,7 @@ mod tests {
mock_ipc_path: PathBuf,
) {
let ipc = IpcConnect::new(mock_ipc_path);
let provider = ProviderBuilder::new().on_ipc(ipc).await.unwrap().boxed();
let provider = ProviderBuilder::new().connect_ipc(ipc).await.unwrap().erased();
let source = Source {
provider,
semaphore: Arc::new(None),
Expand Down Expand Up @@ -478,7 +478,7 @@ mod tests {
mock_ipc_path: PathBuf,
) {
let ipc = IpcConnect::new(mock_ipc_path);
let provider = ProviderBuilder::new().on_ipc(ipc).await.unwrap().boxed();
let provider = ProviderBuilder::new().connect_ipc(ipc).await.unwrap().erased();
let source = Arc::new(Source {
provider,
chain_id: 1,
Expand All @@ -498,11 +498,11 @@ mod tests {
);
}
BlockInputTest::WithoutMock((inputs, expected)) => {
println!("RES {:?}", res);
println!("inputs {:?}", inputs);
println!("EXPECTED {:?}", expected);
println!("RES {res:?}");
println!("inputs {inputs:?}");
println!("EXPECTED {expected:?}");
let actual = block_input_test_executor(inputs, expected, source.clone()).await;
println!("ACTUAL {:?}", actual);
println!("ACTUAL {actual:?}");
assert_eq!(actual, res);
}
}
Expand All @@ -518,8 +518,8 @@ mod tests {
assert_eq!(block_chunks.len(), expected.len());
for (i, block_chunk) in block_chunks.iter().enumerate() {
let expected_chunk = &expected[i];
println!("BLOCK_CHUNK {:?}", block_chunk);
println!("EXCPECTED_CHUNK {:?}", expected_chunk);
println!("BLOCK_CHUNK {block_chunk:?}");
println!("EXCPECTED_CHUNK {expected_chunk:?}");
match expected_chunk {
BlockChunk::Numbers(expected_block_numbers) => {
assert!(matches!(block_chunk, BlockChunk::Numbers { .. }));
Expand Down Expand Up @@ -554,8 +554,11 @@ mod tests {
tests: Vec<(BlockNumberTest<'_>, bool)>,
mock_ipc_path: PathBuf,
) {
let provider =
ProviderBuilder::new().on_ipc(IpcConnect::new(mock_ipc_path)).await.unwrap().boxed();
let provider = ProviderBuilder::new()
.connect_ipc(IpcConnect::new(mock_ipc_path))
.await
.unwrap()
.erased();
let source = Source {
provider,
semaphore: Arc::new(None),
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/src/parse/file_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub(crate) fn parse_file_output(args: &Args, source: &Source) -> Result<FileOutp
})?;
match fs::create_dir_all(&output_dir) {
Ok(_) => {}
Err(e) => return Err(ParseError::ParseError(format!("Error creating directory: {}", e))),
Err(e) => return Err(ParseError::ParseError(format!("Error creating directory: {e}"))),
};

let label = &args.label;
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/src/parse/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub(crate) async fn parse_partitions(
};
let mut partitions = chunk
.partition_with_labels(labels, partition_by.clone())
.map_err(|e| ParseError::ParseError(format!("could not partition labels ({})", e)))?;
.map_err(|e| ParseError::ParseError(format!("could not partition labels ({e})")))?;

match args.chunk_order.as_deref() {
None => {}
Expand Down
11 changes: 4 additions & 7 deletions crates/cli/src/parse/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ pub(crate) fn parse_schemas(
.map(|schema| (*datatype, schema))
.map_err(|e| {
ParseError::ParseError(format!(
"Failed to get schema for datatype: {:?}, {:?}",
datatype, e
"Failed to get schema for datatype: {datatype:?}, {e:?}"
))
})
})
Expand Down Expand Up @@ -100,7 +99,7 @@ fn parse_u256_types(args: &Args) -> Result<Vec<U256Type>, ParseError> {
"u32" | "uint32" => Ok(U256Type::U32),
"u64" | "uint64" => Ok(U256Type::U64),
"decimal128" | "d128" => Ok(U256Type::Decimal128),
_ => Err(ParseError::ParseError(format!("invalid u256 type: {}", raw))),
_ => Err(ParseError::ParseError(format!("invalid u256 type: {raw}"))),
}
})
.collect()
Expand Down Expand Up @@ -129,8 +128,7 @@ fn ensure_included_columns(
}
if !unknown_columns.is_empty() {
return Err(ParseError::ParseError(format!(
"datatypes do not support these columns: {:?}",
unknown_columns
"datatypes do not support these columns: {unknown_columns:?}"
)))
}
Ok(())
Expand All @@ -157,8 +155,7 @@ fn ensure_excluded_columns(
}
if !unknown_columns.is_empty() {
return Err(ParseError::ParseError(format!(
"datatypes do not support these columns: {:?}",
unknown_columns
"datatypes do not support these columns: {unknown_columns:?}"
)))
}
Ok(())
Expand Down
21 changes: 9 additions & 12 deletions crates/cli/src/parse/source.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::env;
use std::{env, sync::Arc};

use crate::args::Args;
use alloy::{
providers::{Provider, ProviderBuilder, RootProvider},
rpc::client::{BuiltInConnectionString, ClientBuilder, RpcClient},
transports::{layers::RetryBackoffLayer, BoxTransport},
providers::{Provider, ProviderBuilder},
rpc::client::ClientBuilder,
transports::layers::RetryBackoffLayer,
};
use cryo_freeze::{ParseError, Source, SourceLabels};
use governor::{Quota, RateLimiter};
use polars::prelude::*;
use std::num::NonZeroU32;

pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
Expand All @@ -19,14 +18,12 @@ pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
args.initial_backoff,
args.compute_units_per_second,
);
let connect: BuiltInConnectionString = rpc_url.parse().map_err(ParseError::ProviderError)?;
let client: RpcClient<BoxTransport> = ClientBuilder::default()
let client = ClientBuilder::default()
.layer(retry_layer)
.connect_boxed(connect)
.connect(&rpc_url)
.await
.map_err(ParseError::ProviderError)?
.boxed();
let provider: RootProvider<BoxTransport> = ProviderBuilder::default().on_client(client);
.map_err(ParseError::ProviderError)?;
let provider = ProviderBuilder::default().connect_client(client).erased();
let chain_id = provider.get_chain_id().await.map_err(ParseError::ProviderError)?;
let rate_limiter = match args.requests_per_second {
Some(rate_limit) => match (NonZeroU32::new(1), NonZeroU32::new(rate_limit)) {
Expand Down Expand Up @@ -79,7 +76,7 @@ pub(crate) fn parse_rpc_url(args: &Args) -> Result<String, ParseError> {
match endpoint {
Ok(endpoint) => endpoint.map(|endpoint| endpoint.url),
Err(e) => {
eprintln!("Could not load MESC data: {}", e);
eprintln!("Could not load MESC data: {e}");
None
}
}
Expand Down
19 changes: 8 additions & 11 deletions crates/cli/src/parse/timestamps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) async fn parse_timestamps(
for path in files {
let column = if path.contains(':') {
path.split(':')
.last()
.next_back()
.ok_or(ParseError::ParseError("could not parse txs path column".to_string()))?
} else {
"timestamp"
Expand Down Expand Up @@ -325,9 +325,9 @@ mod tests {
use std::num::NonZeroU32;

use alloy::{
providers::ProviderBuilder,
rpc::client::{BuiltInConnectionString, ClientBuilder, RpcClient},
transports::{layers::RetryBackoffLayer, BoxTransport},
providers::{Provider, ProviderBuilder},
rpc::client::ClientBuilder,
transports::layers::RetryBackoffLayer,
};
use governor::{Quota, RateLimiter};

Expand All @@ -345,16 +345,13 @@ mod tests {
let max_concurrent_requests = 100;
let retry_layer =
RetryBackoffLayer::new(max_retry, initial_backoff, compute_units_per_second);
let connect: BuiltInConnectionString =
rpc_url.parse().map_err(ParseError::ProviderError).unwrap();
let client: RpcClient<BoxTransport> = ClientBuilder::default()
let client = ClientBuilder::default()
.layer(retry_layer)
.connect_boxed(connect)
.connect(&rpc_url)
.await
.map_err(ParseError::ProviderError)
.unwrap()
.boxed();
let provider = ProviderBuilder::default().on_client(client);
.unwrap();
let provider = ProviderBuilder::default().connect_client(client).erased();
let quota = Quota::per_second(NonZeroU32::new(15).unwrap())
.allow_burst(NonZeroU32::new(1).unwrap());
let rate_limiter = Some(RateLimiter::direct(quota));
Expand Down
4 changes: 2 additions & 2 deletions crates/cli/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ fn print_syntax_help() {
(default column name is <white><bold>transaction_hash</bold></white>)
- can use multiple parquet files <white><bold>--txs ./path/to/ethereum__logs*.parquet</bold></white>"#
);
println!("{}", content);
println!("{content}");
}

/// Handle detailed help by parsing schemas and printing dataset information.
Expand All @@ -129,7 +129,7 @@ fn handle_detailed_help(args: args::Args) -> Result<(), CollectError> {
if let Some(schema) = schemas.get(&datatype) {
cryo_freeze::print_dataset_info(datatype, schema);
} else {
return Err(err(format!("missing schema for datatype: {:?}", datatype).as_str()));
return Err(err(format!("missing schema for datatype: {datatype:?}").as_str()));
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/freeze/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ fn main() {
let git_description =
get_git_description().unwrap_or_else(|_| env!("CARGO_PKG_VERSION").to_string());

println!("cargo:rustc-env=GIT_DESCRIPTION={}", git_description);
println!("cargo:rustc-env=GIT_DESCRIPTION={git_description}");
}

fn get_git_description() -> Result<String, std::io::Error> {
Expand All @@ -18,6 +18,6 @@ fn get_git_description() -> Result<String, std::io::Error> {

Ok(description)
} else {
Err(std::io::Error::new(std::io::ErrorKind::Other, "Git command failed"))
Err(std::io::Error::other("Git command failed"))
}
}
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/address_appearances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl CollectByTransaction for AddressAppearances {
fn name(log: &Log) -> Option<&'static str> {
let event = log.topic0().unwrap();
if event == *ERC20::Transfer::SIGNATURE_HASH {
if log.data().data.len() > 0 {
if !log.data().data.is_empty() {
Some("erc20_transfer")
} else if log.topics().len() == 4 {
Some("erc721_transfer")
Expand Down
4 changes: 2 additions & 2 deletions crates/freeze/src/datasets/erc721_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl CollectByBlock for Erc721Transfers {
let filter = Filter { topics, ..request.ethers_log_filter()? };
let logs = source.get_logs(&filter).await?;

Ok(logs.into_iter().filter(|x| x.topics().len() == 4 && x.data().data.len() == 0).collect())
Ok(logs.into_iter().filter(|x| x.topics().len() == 4 && x.data().data.is_empty()).collect())
}

fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
Expand All @@ -97,7 +97,7 @@ impl CollectByTransaction for Erc721Transfers {

fn is_erc721_transfer(log: &Log) -> bool {
log.topics().len() == 4 &&
log.data().data.len() == 0 &&
log.data().data.is_empty() &&
log.topics()[0] == ERC721::Transfer::SIGNATURE_HASH
}

Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn process_logs(logs: Vec<Log>, columns: &mut Logs, schema: &Table) -> R<()> {
if let (Some(decoder), Some(indexed_keys), Some(body_keys)) =
(&schema.log_decoder, &indexed_keys, &body_keys)
{
match decoder.event.decode_log(&log.inner.data, true) {
match decoder.event.decode_log(&log.inner.data) {
Ok(log) => {
// for param in log.indexed {
// if decode_keys.contains(param.name.as_str()) {
Expand Down
14 changes: 7 additions & 7 deletions crates/freeze/src/datasets/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl CollectByBlock for Transactions {
// filter by from_address
let from_filter: Box<dyn Fn(&Transaction) -> bool + Send> =
if let Some(from_address) = &request.from_address {
Box::new(move |tx| tx.from == Address::from_slice(from_address))
Box::new(move |tx| tx.inner.signer() == Address::from_slice(from_address))
} else {
Box::new(|_| true)
};
Expand Down Expand Up @@ -228,7 +228,7 @@ pub(crate) fn process_transaction(
store!(schema, columns, block_number, tx.block_number.map(|x| x as u32));
store!(schema, columns, transaction_index, tx.transaction_index);
store!(schema, columns, transaction_hash, tx.inner.tx_hash().to_vec());
store!(schema, columns, from_address, tx.from.to_vec());
store!(schema, columns, from_address, tx.inner.signer().to_vec());
store!(
schema,
columns,
Expand All @@ -255,7 +255,7 @@ pub(crate) fn process_transaction(
}
// in alloy eip2718_encoded_length is rlp_encoded_length
store!(schema, columns, n_rlp_bytes, tx.inner.eip2718_encoded_length() as u32);
store!(schema, columns, gas_used, receipt.as_ref().map(|r| r.gas_used as u64));
store!(schema, columns, gas_used, receipt.as_ref().map(|r| r.gas_used));
// store!(schema, columns, gas_price, Some(receipt.unwrap().effective_gas_price as u64));
store!(schema, columns, gas_price, gas_price);
store!(schema, columns, transaction_type, tx.inner.tx_type() as u32);
Expand All @@ -277,15 +277,15 @@ pub(crate) fn process_transaction(
}

fn get_max_fee_per_gas(tx: &Transaction) -> Option<u64> {
match &tx.inner {
match tx.inner.as_ref() {
alloy::consensus::TxEnvelope::Legacy(_) => None,
alloy::consensus::TxEnvelope::Eip2930(_) => None,
_ => Some(tx.inner.max_fee_per_gas() as u64),
}
}

pub(crate) fn get_gas_price(block: &Block, tx: &Transaction) -> Option<u64> {
match &tx.inner {
match tx.inner.as_ref() {
alloy::consensus::TxEnvelope::Legacy(_) => tx.gas_price().map(|gas_price| gas_price as u64),
alloy::consensus::TxEnvelope::Eip2930(_) => {
tx.gas_price().map(|gas_price| gas_price as u64)
Expand All @@ -310,9 +310,9 @@ fn tx_success(tx: &Transaction, receipt: &Option<TransactionReceipt>) -> R<bool>
if let Some(r) = receipt {
Ok(r.gas_used == 0)
} else {
return Err(err("could not determine status of transaction"))
Err(err("could not determine status of transaction"))
}
} else {
return Err(err("could not determine status of transaction"))
Err(err("could not determine status of transaction"))
}
}
2 changes: 1 addition & 1 deletion crates/freeze/src/freeze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ async fn freeze_partitions(
completed.push(partition)
}
Ok((partition, Err(e))) => errored.push((Some(partition), e)),
Err(e) => errored.push((None, err(format!("error joining chunks: {:?}", e).as_str()))),
Err(e) => errored.push((None, err(format!("error joining chunks: {e:?}").as_str()))),
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/types/chunks/number_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl ChunkData for NumberChunk {
type Inner = u64;

fn format_item(value: Self::Inner) -> Result<String, ChunkError> {
Ok(format!("{:0>8}", value))
Ok(format!("{value:0>8}"))
}

fn size(&self) -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/types/datatypes/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ impl MultiDatatype {

/// name
pub fn name(&self) -> String {
format!("{}", heck::AsSnakeCase(format!("{:?}", self)))
format!("{}", heck::AsSnakeCase(format!("{self:?}")))
}
}
Loading