Skip to content

Commit b2b518b

Browse files
committed
fix(sv-consumer): Ensure active set of tasks is always following configuration
1 parent 8a6fcd8 commit b2b518b

File tree

1 file changed

+16
-8
lines changed

1 file changed

+16
-8
lines changed

services/consumer/src/executor/block_executor.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,24 @@ impl BlockExecutor {
9090
self.concurrent_tasks
9191
);
9292
let queue = NatsQueue::BlockImporter(self.message_broker.clone());
93+
let mut active_tasks = 0;
94+
let mut join_set = JoinSet::new();
9395

9496
while !token.is_cancelled() {
95-
let mut messages = queue.subscribe(self.concurrent_tasks).await?;
96-
let mut join_set = JoinSet::new();
97-
while let Some(msg) = messages.next().await {
98-
let msg = msg?;
99-
self.spawn_processing_tasks(msg, &mut join_set).await?;
100-
}
101-
while let Some(result) = join_set.join_next().await {
102-
result??;
97+
tracing::info!("Active tasks: {}", active_tasks);
98+
let query_tasks = (self.concurrent_tasks - active_tasks).min(1);
99+
tokio::select! {
100+
msg_result = queue.subscribe(query_tasks) => {
101+
let mut messages = msg_result?;
102+
while let Some(msg) = messages.next().await {
103+
active_tasks += 1;
104+
self.spawn_processing_tasks(msg?, &mut join_set)
105+
.await?;
106+
}
107+
}
108+
Some(_) = join_set.join_next() => {
109+
active_tasks -= 1;
110+
}
103111
}
104112
}
105113

0 commit comments

Comments
 (0)