Skip to content

Commit 6f0c77c

Browse files
authored
feat(sv-consumer): Ack on msg if block is already processed (#489)
1 parent 317b5cb commit 6f0c77c

File tree

1 file changed

+102
-65
lines changed

1 file changed

+102
-65
lines changed

services/consumer/src/executor/block_executor.rs

Lines changed: 102 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use fuel_streams_core::{
2121
FuelStreams,
2222
};
2323
use fuel_streams_domains::{
24-
blocks::BlockDbItem,
24+
blocks::{BlockDbItem, BlocksQuery},
2525
infra::{
2626
db::{Db, DbTransaction},
2727
record::{PacketBuilder, RecordEntity, RecordPacket},
@@ -130,10 +130,33 @@ impl BlockExecutor {
130130
let msg_payload = msg_payload.clone();
131131
let telemetry = telemetry.clone();
132132
async move {
133+
let query = BlocksQuery {
134+
height: Some(msg_payload.block_height()),
135+
..Default::default()
136+
};
137+
let block = Block::find_one(db.pool_ref(), &query).await;
138+
if block.is_ok() {
139+
tracing::info!(
140+
"[#{}] Block already processed",
141+
msg_payload.block_height()
142+
);
143+
let _ = msg.ack().await.map_err(|e| {
144+
tracing::error!("Failed to ack message: {:?}", e);
145+
ConsumerError::MessageBrokerClient(e)
146+
});
147+
tracing::info!(
148+
"[#{}] Message acknowledged",
149+
msg_payload.block_height()
150+
);
151+
drop(permit);
152+
return;
153+
}
133154
let _ =
134155
handle_streams_task(&fuel_streams, &packets, &msg_payload)
135156
.await;
136157
let result = handle_stores(&db, &packets, &msg_payload).await;
158+
// Drop semaphore as soon as store is completed
159+
drop(permit);
137160
let result = match result {
138161
Ok(stats) => {
139162
if stats.error.is_none() {
@@ -159,7 +182,6 @@ impl BlockExecutor {
159182
Ok(Ok::<_, ConsumerError>(ProcessResult::Store(result))),
160183
&telemetry,
161184
);
162-
drop(permit);
163185
}
164186
});
165187

@@ -201,6 +223,45 @@ impl BlockExecutor {
201223
}
202224
}
203225

226+
async fn handle_insertions(
227+
tx: &mut DbTransaction,
228+
packets: &Arc<Vec<RecordPacket>>,
229+
) -> Result<(), ConsumerError> {
230+
// First insert blocks
231+
for packet in packets.iter() {
232+
let subject_id = packet.subject_id();
233+
let entity = RecordEntity::from_subject_id(&subject_id)?;
234+
match entity {
235+
RecordEntity::Block => {
236+
let db_item = BlockDbItem::try_from(packet)?;
237+
Block::insert_with_transaction(tx, &db_item).await?;
238+
}
239+
RecordEntity::Message => {
240+
let db_item = MessageDbItem::try_from(packet)?;
241+
Message::insert_with_transaction(tx, &db_item).await?;
242+
}
243+
RecordEntity::Transaction => {
244+
let db_item = TransactionDbItem::try_from(packet)?;
245+
Transaction::insert_with_transaction(tx, &db_item).await?;
246+
}
247+
RecordEntity::Input => {
248+
let db_item = InputDbItem::try_from(packet)?;
249+
Input::insert_with_transaction(tx, &db_item).await?;
250+
}
251+
RecordEntity::Output => {
252+
let db_item = OutputDbItem::try_from(packet)?;
253+
Output::insert_with_transaction(tx, &db_item).await?;
254+
}
255+
RecordEntity::Receipt => {
256+
let db_item = ReceiptDbItem::try_from(packet)?;
257+
Receipt::insert_with_transaction(tx, &db_item).await?;
258+
}
259+
_ => {}
260+
}
261+
}
262+
Ok(())
263+
}
264+
204265
async fn handle_stores(
205266
db: &Arc<Db>,
206267
packets: &Arc<Vec<RecordPacket>>,
@@ -212,74 +273,50 @@ async fn handle_stores(
212273
let result = retry_service
213274
.with_retry("store_insertions", || async {
214275
let mut tx = db.pool.begin().await?;
276+
// Insert blocks, messages, transactions, inputs, outputs, and receipts
277+
match handle_insertions(&mut tx, packets).await {
278+
Ok(_) => {
279+
let block_propagation_ms =
280+
stats.calculate_block_propagation_ms();
281+
update_block_propagation_ms(
282+
&mut tx,
283+
block_height,
284+
block_propagation_ms,
285+
)
286+
.await?;
287+
tx.commit().await?;
288+
// Then, insert separately predicates and UTXOs
289+
for packet in packets.iter() {
290+
let subject_id = packet.subject_id();
291+
let entity =
292+
RecordEntity::from_subject_id(&subject_id)?;
215293

216-
// First insert blocks
217-
for packet in packets.iter() {
218-
let subject_id = packet.subject_id();
219-
let entity = RecordEntity::from_subject_id(&subject_id)?;
220-
221-
match entity {
222-
RecordEntity::Block => {
223-
let db_item = BlockDbItem::try_from(packet)?;
224-
Block::insert_with_transaction(&mut tx, &db_item)
225-
.await?;
226-
}
227-
RecordEntity::Message => {
228-
let db_item = MessageDbItem::try_from(packet)?;
229-
Message::insert_with_transaction(&mut tx, &db_item)
230-
.await?;
231-
}
232-
RecordEntity::Transaction => {
233-
let db_item = TransactionDbItem::try_from(packet)?;
234-
Transaction::insert_with_transaction(&mut tx, &db_item)
235-
.await?;
236-
}
237-
RecordEntity::Input => {
238-
let db_item = InputDbItem::try_from(packet)?;
239-
Input::insert_with_transaction(&mut tx, &db_item)
240-
.await?;
241-
}
242-
RecordEntity::Output => {
243-
let db_item = OutputDbItem::try_from(packet)?;
244-
Output::insert_with_transaction(&mut tx, &db_item)
245-
.await?;
246-
}
247-
RecordEntity::Receipt => {
248-
let db_item = ReceiptDbItem::try_from(packet)?;
249-
Receipt::insert_with_transaction(&mut tx, &db_item)
250-
.await?;
294+
match entity {
295+
RecordEntity::Predicate => {
296+
let mut db_item =
297+
PredicateDbItem::try_from(packet)?;
298+
Predicate::upsert_as_relation(db, &mut db_item)
299+
.await?;
300+
}
301+
RecordEntity::Utxo => {
302+
let db_item = UtxoDbItem::try_from(packet)?;
303+
Utxo::insert(db.pool_ref(), &db_item).await?;
304+
}
305+
_ => {}
306+
}
251307
}
252-
_ => {}
308+
Ok(packets.len())
253309
}
254-
}
255-
let block_propagation_ms = stats.calculate_block_propagation_ms();
256-
update_block_propagation_ms(
257-
&mut tx,
258-
block_height,
259-
block_propagation_ms,
260-
)
261-
.await?;
262-
tx.commit().await?;
263-
264-
// Then, insert separately predicates and UTXOs
265-
for packet in packets.iter() {
266-
let subject_id = packet.subject_id();
267-
let entity = RecordEntity::from_subject_id(&subject_id)?;
268-
269-
match entity {
270-
RecordEntity::Predicate => {
271-
let mut db_item = PredicateDbItem::try_from(packet)?;
272-
Predicate::upsert_as_relation(db, &mut db_item).await?;
273-
}
274-
RecordEntity::Utxo => {
275-
let db_item = UtxoDbItem::try_from(packet)?;
276-
Utxo::insert(db.pool_ref(), &db_item).await?;
277-
}
278-
_ => {}
310+
Err(e) => {
311+
tracing::error!(
312+
"[#{}] Failed to insert packets: {:?}",
313+
block_height,
314+
e
315+
);
316+
tx.rollback().await?;
317+
Err(e)
279318
}
280319
}
281-
282-
Ok(packets.len())
283320
})
284321
.await;
285322

0 commit comments

Comments
 (0)