Skip to content

Commit 8a6fcd8

Browse files
committed
fix(sv-consumer): Remove semaphore to avoid blocking process
1 parent 6f0c77c commit 8a6fcd8

File tree

1 file changed

+10
-17
lines changed

1 file changed

+10
-17
lines changed

services/consumer/src/executor/block_executor.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use fuel_web_utils::{
4242
telemetry::Telemetry,
4343
};
4444
use futures::{future::try_join_all, StreamExt};
45-
use tokio::{sync::Semaphore, task::JoinError};
45+
use tokio::task::{JoinError, JoinSet};
4646
use tokio_util::sync::CancellationToken;
4747

4848
use super::{
@@ -60,7 +60,6 @@ pub struct BlockExecutor {
6060
db: Arc<Db>,
6161
message_broker: Arc<NatsMessageBroker>,
6262
fuel_streams: Arc<FuelStreams>,
63-
semaphore: Arc<Semaphore>,
6463
telemetry: Arc<Telemetry<Metrics>>,
6564
concurrent_tasks: usize,
6665
}
@@ -73,10 +72,8 @@ impl BlockExecutor {
7372
telemetry: Arc<Telemetry<Metrics>>,
7473
concurrent_tasks: usize,
7574
) -> Self {
76-
let semaphore = Arc::new(Semaphore::new(concurrent_tasks));
7775
Self {
7876
db,
79-
semaphore,
8077
message_broker: message_broker.clone(),
8178
fuel_streams: fuel_streams.clone(),
8279
telemetry,
@@ -96,14 +93,13 @@ impl BlockExecutor {
9693

9794
while !token.is_cancelled() {
9895
let mut messages = queue.subscribe(self.concurrent_tasks).await?;
96+
let mut join_set = JoinSet::new();
9997
while let Some(msg) = messages.next().await {
10098
let msg = msg?;
101-
let semaphore = self.semaphore.clone();
102-
let permit = match semaphore.clone().acquire_owned().await {
103-
Ok(p) => p,
104-
Err(_) => continue,
105-
};
106-
self.spawn_processing_tasks(msg, permit).await?;
99+
self.spawn_processing_tasks(msg, &mut join_set).await?;
100+
}
101+
while let Some(result) = join_set.join_next().await {
102+
result??;
107103
}
108104
}
109105

@@ -116,7 +112,7 @@ impl BlockExecutor {
116112
async fn spawn_processing_tasks(
117113
&self,
118114
msg: Box<dyn NatsMessage>,
119-
permit: tokio::sync::OwnedSemaphorePermit,
115+
join_set: &mut JoinSet<Result<(), ConsumerError>>,
120116
) -> Result<(), ConsumerError> {
121117
let db = self.db.clone();
122118
let fuel_streams = self.fuel_streams.clone();
@@ -125,7 +121,7 @@ impl BlockExecutor {
125121
let packets = Self::build_packets(&msg_payload);
126122
let telemetry = self.telemetry.clone();
127123

128-
tokio::spawn({
124+
join_set.spawn({
129125
let packets: Arc<Vec<RecordPacket>> = packets.clone();
130126
let msg_payload = msg_payload.clone();
131127
let telemetry = telemetry.clone();
@@ -148,15 +144,12 @@ impl BlockExecutor {
148144
"[#{}] Message acknowledged",
149145
msg_payload.block_height()
150146
);
151-
drop(permit);
152-
return;
147+
return Ok(());
153148
}
154149
let _ =
155150
handle_streams_task(&fuel_streams, &packets, &msg_payload)
156151
.await;
157152
let result = handle_stores(&db, &packets, &msg_payload).await;
158-
// Drop semaphore as soon as store is completed
159-
drop(permit);
160153
let result = match result {
161154
Ok(stats) => {
162155
if stats.error.is_none() {
@@ -182,9 +175,9 @@ impl BlockExecutor {
182175
Ok(Ok::<_, ConsumerError>(ProcessResult::Store(result))),
183176
&telemetry,
184177
);
178+
Ok(())
185179
}
186180
});
187-
188181
Ok(())
189182
}
190183

0 commit comments

Comments
 (0)