Skip to content

Commit f487ceb

Browse files
committed
fix(repo): Dune integration
1 parent 325f3e5 commit f487ceb

File tree

12 files changed

+662
-133
lines changed

12 files changed

+662
-133
lines changed

crates/domains/migrations/20250108203444_create_transactions_table.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ CREATE INDEX IF NOT EXISTS idx_transactions_created_at ON transactions (created_
106106
-- Composite indexes
107107
CREATE INDEX IF NOT EXISTS idx_transactions_status_block_height ON transactions (status, block_height);
108108
CREATE INDEX IF NOT EXISTS idx_transactions_type_block_height ON transactions (type, block_height);
109+
CREATE INDEX IF NOT EXISTS idx_transactions_tx_id_block_height ON transactions (tx_id, block_height);
109110
CREATE INDEX IF NOT EXISTS idx_transactions_ordering ON transactions (block_height, tx_index);
110111

111112
-- ------------------------------------------------------------------------------

crates/domains/src/blocks/repository.rs

Lines changed: 213 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
use std::sync::Arc;
2-
31
use async_trait::async_trait;
42
use fuel_data_parser::DataEncoder;
53
use fuel_streams_types::{BlockHeight, BlockTimestamp};
6-
use sqlx::{Acquire, PgExecutor, Postgres};
4+
use sqlx::{Acquire, PgExecutor, Postgres, Row};
75

86
use super::{Block, BlockDbItem, BlocksQuery};
97
use crate::{
@@ -12,7 +10,7 @@ use crate::{
1210
repository::{Repository, RepositoryResult},
1311
QueryOptions,
1412
},
15-
transactions::{Transaction, TransactionsQuery},
13+
transactions::{Transaction, TransactionDbItem},
1614
};
1715

1816
#[async_trait]
@@ -167,16 +165,19 @@ impl Block {
167165
Ok(record.map(|(height,)| height.into()).unwrap_or_default())
168166
}
169167

170-
pub async fn find_in_height_range(
171-
db: &Db,
168+
pub async fn find_in_height_range<'c, E>(
169+
executor: E,
172170
start_height: BlockHeight,
173171
end_height: BlockHeight,
174172
options: &QueryOptions,
175-
) -> RepositoryResult<Vec<BlockDbItem>> {
176-
let select = "SELECT * FROM blocks".to_string();
173+
) -> RepositoryResult<Vec<BlockDbItem>>
174+
where
175+
E: PgExecutor<'c> + Acquire<'c, Database = Postgres>,
176+
{
177+
let select = "SELECT *".to_string();
177178
let mut query_builder = sqlx::QueryBuilder::new(select);
178179
query_builder
179-
.push(" WHERE block_height >= ")
180+
.push(" FROM blocks WHERE block_height >= ")
180181
.push_bind(start_height.into_inner() as i64)
181182
.push(" AND block_height <= ")
182183
.push_bind(end_height.into_inner() as i64);
@@ -189,34 +190,215 @@ impl Block {
189190

190191
query_builder.push(" ORDER BY block_height ASC");
191192
let query = query_builder.build_query_as::<BlockDbItem>();
192-
let records = query.fetch_all(&db.pool).await?;
193+
let records = query.fetch_all(executor).await?;
193194

194195
Ok(records)
195196
}
196197

197-
pub async fn transactions_from_db(
198+
pub async fn transactions_from_block<'c, E>(
198199
&self,
199-
db: &Arc<Db>,
200-
) -> RepositoryResult<Vec<Transaction>> {
201-
let mut all_transactions =
202-
Vec::with_capacity(self.transaction_ids.len());
203-
for chunk in self.transaction_ids.chunks(50) {
204-
let mut db_tx = db.pool_ref().begin().await?;
205-
for tx_id in chunk {
206-
let txs_query = TransactionsQuery {
207-
block_height: Some(self.height),
208-
tx_id: Some(tx_id.to_owned()),
209-
..Default::default()
210-
};
211-
let db_item =
212-
Transaction::find_one_with_db_tx(&mut db_tx, &txs_query)
213-
.await?;
214-
let transaction = Transaction::decode_json(&db_item.value)?;
215-
all_transactions.push(transaction);
200+
executor: E,
201+
) -> RepositoryResult<Vec<Transaction>>
202+
where
203+
E: PgExecutor<'c> + Acquire<'c, Database = Postgres>,
204+
{
205+
let db_items = sqlx::query_as::<_, TransactionDbItem>(
206+
r#"
207+
SELECT *
208+
FROM transactions
209+
WHERE block_height = $1
210+
ORDER BY tx_index ASC
211+
"#,
212+
)
213+
.bind(self.height.into_inner() as i64)
214+
.fetch_all(executor)
215+
.await?;
216+
217+
let mut transactions = Vec::with_capacity(db_items.len());
218+
for db_item in db_items {
219+
let transaction = Transaction::decode_json(&db_item.value)?;
220+
transactions.push(transaction);
221+
}
222+
223+
Ok(transactions)
224+
}
225+
226+
pub async fn find_blocks_with_transactions<'c, E>(
227+
executor: E,
228+
start_height: BlockHeight,
229+
end_height: BlockHeight,
230+
options: &QueryOptions,
231+
) -> RepositoryResult<Vec<(BlockDbItem, Vec<TransactionDbItem>)>>
232+
where
233+
E: PgExecutor<'c> + Acquire<'c, Database = Postgres>,
234+
{
235+
let mut query = String::from(
236+
r#"
237+
SELECT
238+
b.value as b_value, b.subject as b_subject, b.block_height as b_block_height,
239+
b.block_da_height as b_block_da_height, b.version as b_version, b.producer_address as b_producer_address,
240+
b.header_application_hash as b_header_application_hash,
241+
b.header_consensus_parameters_version as b_header_consensus_parameters_version,
242+
b.header_da_height as b_header_da_height, b.header_event_inbox_root as b_header_event_inbox_root,
243+
b.header_message_outbox_root as b_header_message_outbox_root,
244+
b.header_message_receipt_count as b_header_message_receipt_count,
245+
b.header_prev_root as b_header_prev_root,
246+
b.header_state_transition_bytecode_version as b_header_state_transition_bytecode_version,
247+
b.header_time as b_header_time, b.header_transactions_count as b_header_transactions_count,
248+
b.header_transactions_root as b_header_transactions_root, b.header_version as b_header_version,
249+
b.consensus_chain_config_hash as b_consensus_chain_config_hash,
250+
b.consensus_coins_root as b_consensus_coins_root, b.consensus_type as b_consensus_type,
251+
b.consensus_contracts_root as b_consensus_contracts_root,
252+
b.consensus_messages_root as b_consensus_messages_root,
253+
b.consensus_signature as b_consensus_signature,
254+
b.consensus_transactions_root as b_consensus_transactions_root,
255+
b.block_time as b_block_time, b.created_at as b_created_at,
256+
b.block_propagation_ms as b_block_propagation_ms,
257+
t.value as t_value, t.subject as t_subject, t.block_height as t_block_height,
258+
t.tx_id as t_tx_id, t.tx_index as t_tx_index, t.type as t_type,
259+
t.status as t_status, t.created_at as t_created_at,
260+
t.script_gas_limit as script_gas_limit, t.mint_amount as mint_amount,
261+
t.mint_asset_id as mint_asset_id, t.mint_gas_price as mint_gas_price,
262+
t.receipts_root as receipts_root, t.script as script, t.script_data as script_data,
263+
t.salt as salt, t.bytecode_witness_index as bytecode_witness_index,
264+
t.bytecode_root as bytecode_root, t.subsection_index as subsection_index,
265+
t.subsections_number as subsections_number, t.upgrade_purpose as upgrade_purpose,
266+
t.blob_id as blob_id, t.is_blob as is_blob, t.is_create as is_create,
267+
t.is_mint as is_mint, t.is_script as is_script, t.is_upgrade as is_upgrade,
268+
t.is_upload as is_upload, t.raw_payload as raw_payload, t.tx_pointer as tx_pointer,
269+
t.maturity as maturity, t.script_length as script_length, t.script_data_length as script_data_length,
270+
t.storage_slots_count as storage_slots_count, t.proof_set_count as proof_set_count,
271+
t.witnesses_count as witnesses_count, t.inputs_count as inputs_count,
272+
t.outputs_count as outputs_count, t.block_time as t_block_time
273+
FROM blocks b
274+
LEFT JOIN transactions t ON b.block_height = t.block_height
275+
WHERE b.block_height >= $1 AND b.block_height <= $2
276+
"#,
277+
);
278+
279+
if let Some(ns) = options.namespace.as_ref() {
280+
query.push_str(&format!(" AND b.subject LIKE '{}%'", ns));
281+
}
282+
283+
query.push_str(" ORDER BY b.block_height ASC, t.tx_index ASC");
284+
285+
let rows = sqlx::query(&query)
286+
.bind(start_height.into_inner() as i64)
287+
.bind(end_height.into_inner() as i64)
288+
.fetch_all(executor)
289+
.await?;
290+
291+
// Process rows into blocks with transactions
292+
let mut result = Vec::new();
293+
let mut current_block: Option<BlockDbItem> = None;
294+
let mut current_txs: Vec<TransactionDbItem> = Vec::new();
295+
296+
for row in rows {
297+
let block_height: i64 = row.get("b_block_height");
298+
if current_block.as_ref().is_none_or(|b| {
299+
b.block_height.into_inner() as i64 != block_height
300+
}) {
301+
if let Some(block) = current_block {
302+
result.push((block, current_txs));
303+
}
304+
current_block = Some(BlockDbItem {
305+
value: row.get("b_value"),
306+
subject: row.get("b_subject"),
307+
block_height: (row.get::<i64, _>("b_block_height") as u64)
308+
.into(),
309+
block_da_height: row.get("b_block_da_height"),
310+
version: row.get("b_version"),
311+
producer_address: row.get("b_producer_address"),
312+
header_application_hash: row
313+
.get("b_header_application_hash"),
314+
header_consensus_parameters_version: row
315+
.get("b_header_consensus_parameters_version"),
316+
header_da_height: row.get("b_header_da_height"),
317+
header_event_inbox_root: row
318+
.get("b_header_event_inbox_root"),
319+
header_message_outbox_root: row
320+
.get("b_header_message_outbox_root"),
321+
header_message_receipt_count: row
322+
.get("b_header_message_receipt_count"),
323+
header_prev_root: row.get("b_header_prev_root"),
324+
header_state_transition_bytecode_version: row
325+
.get("b_header_state_transition_bytecode_version"),
326+
header_time: row.get("b_header_time"),
327+
header_transactions_count: row
328+
.get("b_header_transactions_count"),
329+
header_transactions_root: row
330+
.get("b_header_transactions_root"),
331+
header_version: row.get("b_header_version"),
332+
consensus_chain_config_hash: row
333+
.get("b_consensus_chain_config_hash"),
334+
consensus_coins_root: row.get("b_consensus_coins_root"),
335+
consensus_type: row.get("b_consensus_type"),
336+
consensus_contracts_root: row
337+
.get("b_consensus_contracts_root"),
338+
consensus_messages_root: row
339+
.get("b_consensus_messages_root"),
340+
consensus_signature: row.get("b_consensus_signature"),
341+
consensus_transactions_root: row
342+
.get("b_consensus_transactions_root"),
343+
block_time: row.get("b_block_time"),
344+
created_at: row.get("b_created_at"),
345+
block_propagation_ms: row.get("b_block_propagation_ms"),
346+
});
347+
current_txs = Vec::new();
348+
}
349+
350+
let tx_value: Option<Vec<u8>> = row.get("t_value");
351+
if tx_value.is_some() {
352+
current_txs.push(TransactionDbItem {
353+
value: row.get("t_value"),
354+
subject: row.get("t_subject"),
355+
block_height: (row.get::<i64, _>("t_block_height") as u64)
356+
.into(),
357+
tx_id: row.get("t_tx_id"),
358+
tx_index: row.get("t_tx_index"),
359+
r#type: row.get("t_type"),
360+
status: row.get("t_status"),
361+
created_at: row.get("t_created_at"),
362+
script_gas_limit: row.get("script_gas_limit"),
363+
mint_amount: row.get("mint_amount"),
364+
mint_asset_id: row.get("mint_asset_id"),
365+
mint_gas_price: row.get("mint_gas_price"),
366+
receipts_root: row.get("receipts_root"),
367+
script: row.get("script"),
368+
script_data: row.get("script_data"),
369+
salt: row.get("salt"),
370+
bytecode_witness_index: row.get("bytecode_witness_index"),
371+
bytecode_root: row.get("bytecode_root"),
372+
subsection_index: row.get("subsection_index"),
373+
subsections_number: row.get("subsections_number"),
374+
upgrade_purpose: row.get("upgrade_purpose"),
375+
blob_id: row.get("blob_id"),
376+
is_blob: row.get("is_blob"),
377+
is_create: row.get("is_create"),
378+
is_mint: row.get("is_mint"),
379+
is_script: row.get("is_script"),
380+
is_upgrade: row.get("is_upgrade"),
381+
is_upload: row.get("is_upload"),
382+
raw_payload: row.get("raw_payload"),
383+
tx_pointer: row.get("tx_pointer"),
384+
maturity: row.get("maturity"),
385+
script_length: row.get("script_length"),
386+
script_data_length: row.get("script_data_length"),
387+
storage_slots_count: row.get("storage_slots_count"),
388+
proof_set_count: row.get("proof_set_count"),
389+
witnesses_count: row.get("witnesses_count"),
390+
inputs_count: row.get("inputs_count"),
391+
outputs_count: row.get("outputs_count"),
392+
block_time: row.get("t_block_time"),
393+
});
216394
}
217-
db_tx.commit().await?;
218395
}
219-
Ok(all_transactions)
396+
397+
if let Some(block) = current_block {
398+
result.push((block, current_txs));
399+
}
400+
401+
Ok(result)
220402
}
221403
}
222404

@@ -564,7 +746,7 @@ pub mod tests {
564746
options.with_namespace(Some(namespace));
565747

566748
let results = Block::find_in_height_range(
567-
&db,
749+
db.pool_ref(),
568750
start_height,
569751
end_height,
570752
&options,

crates/domains/src/messages/repository.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ mod tests {
122122
assert_eq!(result.amount, expected.amount);
123123
assert_eq!(result.data, expected.data);
124124
assert_eq!(result.da_height, expected.da_height);
125-
assert_eq!(result.block_time, expected.block_time);
126125
}
127126

128127
pub async fn insert_message(

0 commit comments

Comments
 (0)