Skip to content

Commit 1bd3446

Browse files
authored
fix(sv-consumer): Act on messages if db error unique constrains (#481)
1 parent 3d22e81 commit 1bd3446

File tree

1 file changed

+19
-8
lines changed

1 file changed

+19
-8
lines changed

services/consumer/src/executor/block_executor.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ use super::{
5353
};
5454
use crate::{errors::ConsumerError, metrics::Metrics};
5555

56-
const MAX_CONCURRENT_TASKS: usize = 30;
57-
const BATCH_SIZE: usize = 30;
56+
const MAX_CONCURRENT_TASKS: usize = 32;
57+
const BATCH_SIZE: usize = 100;
5858

5959
#[derive(Debug)]
6060
enum ProcessResult {
@@ -101,14 +101,14 @@ impl BlockExecutor {
101101

102102
while !token.is_cancelled() {
103103
let mut messages = queue.subscribe(BATCH_SIZE).await?;
104+
let mut join_set = JoinSet::new();
104105
while let Some(msg) = messages.next().await {
105-
let mut join_set = JoinSet::new();
106106
let msg = msg?;
107107
self.spawn_processing_tasks(msg, &mut join_set).await?;
108-
// Wait for all spawned tasks to complete before processing next message
109-
while let Some(result) = join_set.join_next().await {
110-
Self::handle_task_result(result, &telemetry).await?;
111-
}
108+
}
109+
// Wait for all spawned tasks to complete before processing next message
110+
while let Some(result) = join_set.join_next().await {
111+
Self::handle_task_result(result, &telemetry).await?;
112112
}
113113
}
114114

@@ -289,7 +289,18 @@ async fn handle_stores(
289289

290290
match result {
291291
Ok(packet_count) => Ok(stats.finish(packet_count)),
292-
Err(e) => Ok(stats.finish_with_error(e)),
292+
Err(e) => {
293+
if let ConsumerError::Sqlx(sqlx::Error::Database(db_error)) = &e {
294+
if db_error.is_unique_violation() {
295+
tracing::info!(
296+
block_height = %msg_payload.block_height(),
297+
"Ignoring unique constraint violation - block already processed"
298+
);
299+
return Ok(stats.finish(packets.len()));
300+
}
301+
}
302+
Ok(stats.finish_with_error(e))
303+
}
293304
}
294305
}
295306

0 commit comments

Comments
 (0)