Skip to content

Commit 8a791f8

Browse files
committed
fix(sv-consumer): Ensure block is process before act on it and get next block
1 parent b53253a commit 8a791f8

File tree

1 file changed

+16
-23
lines changed

1 file changed

+16
-23
lines changed

services/consumer/src/executor/block_executor.rs

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ use super::{
5353
};
5454
use crate::{errors::ConsumerError, metrics::Metrics};
5555

56-
const MAX_CONCURRENT_TASKS: usize = 32;
57-
const BATCH_SIZE: usize = 100;
56+
const MAX_CONCURRENT_TASKS: usize = 30;
57+
const BATCH_SIZE: usize = 30;
5858

5959
#[derive(Debug)]
6060
enum ProcessResult {
@@ -91,7 +91,6 @@ impl BlockExecutor {
9191
&self,
9292
token: &CancellationToken,
9393
) -> Result<(), ConsumerError> {
94-
let mut join_set = JoinSet::new();
9594
tracing::info!(
9695
"Starting consumer with max concurrent tasks: {}",
9796
MAX_CONCURRENT_TASKS
@@ -101,26 +100,18 @@ impl BlockExecutor {
101100
let queue = NatsQueue::BlockImporter(self.message_broker.clone());
102101

103102
while !token.is_cancelled() {
104-
tokio::select! {
105-
msg_result = queue.subscribe(BATCH_SIZE) => {
106-
let mut messages = msg_result?;
107-
while let Some(msg) = messages.next().await {
108-
let msg = msg?;
109-
self.spawn_processing_tasks(msg, &mut join_set,)
110-
.await?;
111-
}
112-
}
113-
Some(result) = join_set.join_next() => {
103+
let mut messages = queue.subscribe(BATCH_SIZE).await?;
104+
while let Some(msg) = messages.next().await {
105+
let mut join_set = JoinSet::new();
106+
let msg = msg?;
107+
self.spawn_processing_tasks(msg, &mut join_set).await?;
108+
// Wait for all spawned tasks to complete before processing next message
109+
while let Some(result) = join_set.join_next().await {
114110
Self::handle_task_result(result, &telemetry).await?;
115111
}
116112
}
117113
}
118114

119-
// Wait for all tasks to finish
120-
while let Some(result) = join_set.join_next().await {
121-
Self::handle_task_result(result, &telemetry).await?;
122-
}
123-
124115
tracing::info!("Stopping broker ...");
125116
shutdown_broker_with_timeout(&self.message_broker).await;
126117
tracing::info!("Broker stopped successfully!");
@@ -138,13 +129,20 @@ impl BlockExecutor {
138129
let payload = msg.payload();
139130
let msg_payload = MsgPayload::decode_json(&payload)?.arc();
140131
let packets = Self::build_packets(&msg_payload);
132+
141133
join_set.spawn({
142134
let semaphore = semaphore.clone();
143135
let packets = packets.clone();
144136
let msg_payload = msg_payload.clone();
145137
async move {
146138
let _permit = semaphore.acquire().await?;
147139
let result = handle_stores(&db, &packets, &msg_payload).await;
140+
if result.is_ok() {
141+
msg.ack().await.map_err(|e| {
142+
tracing::error!("Failed to ack message: {:?}", e);
143+
ConsumerError::MessageBrokerClient(e)
144+
})?;
145+
}
148146
Ok::<_, ConsumerError>(ProcessResult::Store(result))
149147
}
150148
});
@@ -162,11 +160,6 @@ impl BlockExecutor {
162160
}
163161
});
164162

165-
msg.ack().await.map_err(|e| {
166-
tracing::error!("Failed to ack message: {:?}", e);
167-
ConsumerError::MessageBrokerClient(e)
168-
})?;
169-
170163
Ok(())
171164
}
172165

0 commit comments

Comments
 (0)