-
Notifications
You must be signed in to change notification settings - Fork 162
add empty chunk handling, importing python crate dependencies, tx filtering by address, support for new datatypes in python crate #147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
9e4d9c4
3a2a533
7add8df
cc7b4e3
5facb1f
e0bdbb4
afebb53
16a7322
79775f9
22781c3
501a1c8
2fccee4
1938448
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -331,4 +331,4 @@ fn process_appearances( | |
} | ||
|
||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,4 +98,4 @@ pub(crate) fn process_contracts( | |
} | ||
} | ||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,4 +120,4 @@ fn process_erc721_transfers( | |
} | ||
} | ||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,4 +149,4 @@ fn process_logs(logs: Vec<Log>, columns: &mut Logs, schema: &Table) -> R<()> { | |
} | ||
|
||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -106,4 +106,4 @@ pub(crate) fn process_native_transfers( | |
} | ||
} | ||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,14 @@ | ||
use crate::*; | ||
use ethers::prelude::*; | ||
use polars::prelude::*; | ||
use std::collections::HashSet; | ||
use std::sync::Mutex; | ||
use lazy_static::lazy_static; | ||
|
||
lazy_static! { | ||
static ref TRANSACTIONS_SET: Mutex<HashSet<Vec<u8>>> = Mutex::new(HashSet::new()); | ||
} | ||
|
||
|
||
/// columns for transactions | ||
#[cryo_to_df::to_df(Datatype::Transactions)] | ||
|
@@ -11,6 +19,7 @@ pub struct Transactions { | |
transaction_index: Vec<Option<u64>>, | ||
transaction_hash: Vec<Vec<u8>>, | ||
nonce: Vec<u64>, | ||
address: Vec<Vec<u8>>, | ||
from_address: Vec<Vec<u8>>, | ||
to_address: Vec<Option<Vec<u8>>>, | ||
value: Vec<U256>, | ||
|
@@ -55,7 +64,8 @@ impl Dataset for Transactions { | |
} | ||
|
||
fn optional_parameters() -> Vec<Dim> { | ||
vec![Dim::FromAddress, Dim::ToAddress] | ||
vec![Dim::Address,Dim::FromAddress, Dim::ToAddress] | ||
//vec![Dim::FromAddress, Dim::ToAddress] | ||
} | ||
} | ||
|
||
|
@@ -65,8 +75,10 @@ pub type TransactionAndReceipt = (Transaction, Option<TransactionReceipt>); | |
#[async_trait::async_trait] | ||
impl CollectByBlock for Transactions { | ||
type Response = (Block<Transaction>, Vec<TransactionAndReceipt>, bool); | ||
//println!("impl"); | ||
|
||
async fn extract(request: Params, source: Arc<Source>, query: Arc<Query>) -> R<Self::Response> { | ||
|
||
let block = source | ||
.fetcher | ||
.get_block_with_txs(request.block_number()?) | ||
|
@@ -77,8 +89,10 @@ impl CollectByBlock for Transactions { | |
// 1. collect transactions and filter them if optional parameters are supplied | ||
// filter by from_address | ||
let from_filter: Box<dyn Fn(&Transaction) -> bool + Send> = | ||
if let Some(from_address) = &request.from_address { | ||
Box::new(move |tx| tx.from.as_bytes() == from_address) | ||
if let Some(from_address) = &request.from_address { | ||
Box::new(move |tx| { | ||
from_address == tx.from.as_bytes() | ||
}) | ||
} else { | ||
Box::new(|_| true) | ||
}; | ||
|
@@ -89,16 +103,37 @@ impl CollectByBlock for Transactions { | |
} else { | ||
Box::new(|_| true) | ||
}; | ||
|
||
// filter by address | ||
let addr_filter: Box<dyn Fn(&Transaction) -> bool + Send> = | ||
if let Some(address) = &request.address { | ||
Box::new(move |tx| { | ||
let mut transactions_set = TRANSACTIONS_SET.lock().unwrap(); | ||
|
||
if transactions_set.contains(&tx.hash.as_bytes().to_vec()) { | ||
false | ||
} else { | ||
let condition = tx.to.as_ref().map_or(false, |x| x.as_bytes() == address) || | ||
tx.from.as_bytes() == address; | ||
if condition { | ||
transactions_set.insert(tx.hash.as_bytes().to_vec()); | ||
} | ||
condition | ||
} | ||
}) | ||
} else { | ||
Box::new(|_| true) | ||
}; | ||
|
||
let transactions = | ||
block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).collect(); | ||
block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).filter(addr_filter).collect(); | ||
|
||
// 2. collect receipts if necessary | ||
// if transactions are filtered fetch by set of transaction hashes, else fetch all receipts | ||
// in block | ||
let receipts: Vec<Option<_>> = | ||
if schema.has_column("gas_used") | schema.has_column("success") { | ||
// receipts required | ||
let receipts = if request.from_address.is_some() || request.to_address.is_some() { | ||
let receipts = if request.from_address.is_some() || request.to_address.is_some() || request.address.is_some() { | ||
source.get_tx_receipts(&transactions).await? | ||
} else { | ||
source.get_tx_receipts_in_block(&block).await? | ||
|
@@ -108,8 +143,8 @@ impl CollectByBlock for Transactions { | |
vec![None; block.transactions.len()] | ||
}; | ||
|
||
let transactions_with_receips = transactions.into_iter().zip(receipts).collect(); | ||
Ok((block, transactions_with_receips, query.exclude_failed)) | ||
let transactions_with_receipts = transactions.into_iter().zip(receipts).collect(); | ||
Ok((block, transactions_with_receipts, query.exclude_failed)) | ||
} | ||
|
||
fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> { | ||
|
davidthegardens marked this conversation as resolved.
Show resolved
Hide resolved
|
Uh oh!
There was an error while loading. Please reload this page.