Skip to content

Commit f395620

Browse files
authored
feat(sv-consumer): Introduce concurrent tasks configuration (#483)
1 parent 03ef211 commit f395620

File tree

7 files changed

+116
-77
lines changed

7 files changed

+116
-77
lines changed

crates/domains/src/infra/db/db_impl.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub static DB_POOL_SIZE: LazyLock<usize> = LazyLock::new(|| {
4646
dotenvy::var("DB_POOL_SIZE")
4747
.ok()
4848
.and_then(|val| val.parse().ok())
49-
.unwrap_or(2000)
49+
.unwrap_or(110)
5050
});
5151

5252
pub static DB_ACQUIRE_TIMEOUT: LazyLock<usize> = LazyLock::new(|| {
@@ -105,6 +105,7 @@ impl Db {
105105
.options(Self::connect_opts(opts));
106106

107107
sqlx::postgres::PgPoolOptions::new()
108+
.min_connections(2)
108109
.max_connections(opts.pool_size.unwrap_or_default())
109110
.acquire_timeout(opts.acquire_timeout.unwrap_or_default())
110111
.idle_timeout(opts.idle_timeout)

crates/message-broker/src/nats.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ impl NatsMessageBroker {
5959
Self::new(&opts).await
6060
}
6161

62+
pub async fn setup_with_opts(
63+
opts: &NatsOpts,
64+
) -> Result<Arc<NatsMessageBroker>, MessageBrokerError> {
65+
let broker = Self::new(opts).await?;
66+
broker.setup_queues().await?;
67+
Ok(broker.arc())
68+
}
69+
6270
pub async fn setup(
6371
url: &str,
6472
namespace: Option<&str>,

services/consumer/src/cli.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,12 @@ pub struct Cli {
3737
help = "Enable metrics"
3838
)]
3939
pub use_metrics: bool,
40+
/// Max concurrent tasks
41+
#[arg(
42+
long,
43+
env = "CONCURRENT_TASKS",
44+
default_value = "300",
45+
help = "Number of concurrent tasks, this is the number of total blocks processed in parallel, this will also be reflected in the number of connections to the database"
46+
)]
47+
pub concurrent_tasks: usize,
4048
}

services/consumer/src/executor/block_executor.rs

Lines changed: 40 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,9 @@ use super::{
5454
};
5555
use crate::{errors::ConsumerError, metrics::Metrics};
5656

57-
const MAX_CONCURRENT_TASKS: usize = 32;
58-
const BATCH_SIZE: usize = 100;
59-
6057
#[derive(Debug)]
6158
enum ProcessResult {
6259
Store(Result<BlockStats, ConsumerError>),
63-
Stream(Result<BlockStats, ConsumerError>),
6460
}
6561

6662
pub struct BlockExecutor {
@@ -69,6 +65,7 @@ pub struct BlockExecutor {
6965
fuel_streams: Arc<FuelStreams>,
7066
semaphore: Arc<Semaphore>,
7167
telemetry: Arc<Telemetry<Metrics>>,
68+
concurrent_tasks: usize,
7269
}
7370

7471
impl BlockExecutor {
@@ -77,14 +74,16 @@ impl BlockExecutor {
7774
message_broker: &Arc<NatsMessageBroker>,
7875
fuel_streams: &Arc<FuelStreams>,
7976
telemetry: Arc<Telemetry<Metrics>>,
77+
concurrent_tasks: usize,
8078
) -> Self {
81-
let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_TASKS));
79+
let semaphore = Arc::new(Semaphore::new(concurrent_tasks));
8280
Self {
8381
db,
8482
semaphore,
8583
message_broker: message_broker.clone(),
8684
fuel_streams: fuel_streams.clone(),
8785
telemetry,
86+
concurrent_tasks,
8887
}
8988
}
9089

@@ -94,22 +93,27 @@ impl BlockExecutor {
9493
) -> Result<(), ConsumerError> {
9594
tracing::info!(
9695
"Starting consumer with max concurrent tasks: {}",
97-
MAX_CONCURRENT_TASKS
96+
self.concurrent_tasks
9897
);
9998

10099
let telemetry = self.telemetry.clone();
101100
let queue = NatsQueue::BlockImporter(self.message_broker.clone());
101+
let mut join_set = JoinSet::new();
102102

103103
while !token.is_cancelled() {
104-
let mut messages = queue.subscribe(BATCH_SIZE).await?;
105-
let mut join_set = JoinSet::new();
106-
while let Some(msg) = messages.next().await {
107-
let msg = msg?;
108-
self.spawn_processing_tasks(msg, &mut join_set).await?;
109-
}
110-
// Wait for all spawned tasks to complete before processing next message
111-
while let Some(result) = join_set.join_next().await {
112-
Self::handle_task_result(result, &telemetry).await?;
104+
tokio::select! {
105+
msg_result = queue.subscribe(self.concurrent_tasks) => {
106+
let mut messages = msg_result?;
107+
while let Some(msg) = messages.next().await {
108+
let _permit = self.semaphore.acquire().await?;
109+
let msg = msg?;
110+
self.spawn_processing_tasks(msg, &mut join_set,)
111+
.await?;
112+
}
113+
}
114+
Some(result) = join_set.join_next() => {
115+
Self::handle_task_result(result, &telemetry).await?;
116+
}
113117
}
114118
}
115119

@@ -125,44 +129,45 @@ impl BlockExecutor {
125129
join_set: &mut JoinSet<Result<ProcessResult, ConsumerError>>,
126130
) -> Result<(), ConsumerError> {
127131
let db = self.db.clone();
128-
let semaphore = self.semaphore.clone();
129132
let fuel_streams = self.fuel_streams.clone();
130133
let payload = msg.payload();
131134
let msg_payload = MsgPayload::decode_json(&payload)?.arc();
132135
let packets = Self::build_packets(&msg_payload);
133136

134137
join_set.spawn({
135-
let semaphore = semaphore.clone();
136138
let packets = packets.clone();
137139
let msg_payload = msg_payload.clone();
138140
async move {
139-
let _permit = semaphore.acquire().await?;
140141
let result = handle_stores(&db, &packets, &msg_payload).await;
141142
if let Ok(stats) = result {
142143
if stats.error.is_none() {
143-
msg.ack().await.map_err(|e| {
144-
tracing::error!("Failed to ack message: {:?}", e);
145-
ConsumerError::MessageBrokerClient(e)
146-
})?;
144+
tokio::spawn(async move {
145+
let _ = msg.ack().await.map_err(|e| {
146+
tracing::error!(
147+
"Failed to ack message: {:?}",
148+
e
149+
);
150+
ConsumerError::MessageBrokerClient(e)
151+
});
152+
tracing::info!(
153+
"[#{}] Message acknowledged",
154+
stats.block_height
155+
);
156+
});
147157
}
148158
return Ok::<_, ConsumerError>(ProcessResult::Store(Ok(
149159
stats,
150160
)));
151161
}
152-
Ok::<_, ConsumerError>(ProcessResult::Store(result))
153-
}
154-
});
155-
156-
join_set.spawn({
157-
let semaphore = semaphore.clone();
158-
let packets = packets.clone();
159-
let msg_payload = msg_payload.clone();
160-
let fuel_streams = fuel_streams.clone();
161-
async move {
162-
let _permit = semaphore.acquire_owned().await?;
163-
let result =
162+
let result_streams =
164163
handle_streams(&fuel_streams, &packets, &msg_payload).await;
165-
Ok(ProcessResult::Stream(result))
164+
if let Ok(stream_stats) = result_streams {
165+
match &stream_stats.error {
166+
Some(error) => stream_stats.log_error(error),
167+
None => stream_stats.log_success(),
168+
}
169+
}
170+
Ok::<_, ConsumerError>(ProcessResult::Store(result))
166171
}
167172
});
168173

@@ -185,13 +190,6 @@ impl BlockExecutor {
185190
None => store_stats.log_success(),
186191
}
187192
}
188-
Ok(Ok(ProcessResult::Stream(stream_result))) => {
189-
let stream_stats = stream_result?;
190-
match &stream_stats.error {
191-
Some(error) => stream_stats.log_error(error),
192-
None => stream_stats.log_success(),
193-
}
194-
}
195193
Ok(Err(e)) => tracing::error!("Task error: {}", e),
196194
Err(e) => tracing::error!("Task panicked: {}", e),
197195
}

services/consumer/src/main.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@ async fn main() -> anyhow::Result<()> {
2929
let cli = Cli::parse();
3030
let shutdown = Arc::new(ShutdownController::new());
3131
shutdown.clone().spawn_signal_handler();
32-
3332
// Initialize shared resources
34-
let db = setup_db(&cli.db_url).await?;
35-
let message_broker = NatsMessageBroker::setup(&cli.nats_url, None).await?;
33+
let db = setup_db(&cli.db_url, cli.concurrent_tasks as u32).await?;
34+
// 2 minutes before returning to the message broker
35+
let opts = fuel_message_broker::NatsOpts::new(cli.nats_url.clone())
36+
.with_ack_wait(120);
37+
let message_broker = NatsMessageBroker::setup_with_opts(&opts).await?;
3638
let metrics = Metrics::new(None)?;
3739
let telemetry = Telemetry::new(Some(metrics)).await?;
3840
telemetry.start().await?;
@@ -48,6 +50,7 @@ async fn main() -> anyhow::Result<()> {
4850
&message_broker,
4951
&fuel_streams,
5052
Arc::clone(&telemetry),
53+
cli.concurrent_tasks,
5154
);
5255

5356
tokio::select! {
@@ -70,9 +73,13 @@ async fn main() -> anyhow::Result<()> {
7073
Ok(())
7174
}
7275

73-
async fn setup_db(db_url: &str) -> Result<Arc<Db>, ConsumerError> {
76+
async fn setup_db(
77+
db_url: &str,
78+
concurrent_tasks: u32,
79+
) -> Result<Arc<Db>, ConsumerError> {
7480
let db = Db::new(DbConnectionOpts {
7581
connection_str: db_url.to_string(),
82+
pool_size: Some(concurrent_tasks),
7683
..Default::default()
7784
})
7885
.await?;

services/publisher/src/history.rs

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use fuel_message_broker::NatsMessageBroker;
44
use fuel_streams_core::types::*;
55
use fuel_streams_domains::infra::Db;
66
use fuel_web_utils::{shutdown::ShutdownController, telemetry::Telemetry};
7-
use tokio::time::{interval, Duration};
7+
use tokio::{
8+
task::JoinSet,
9+
time::{interval, Duration},
10+
};
811
use tokio_util::sync::CancellationToken;
912

1013
use crate::{
@@ -141,9 +144,9 @@ async fn process_blocks(
141144
'outer: for gap in processed_gaps {
142145
tracing::info!("Processing gap: {} to {}", gap.start, gap.end);
143146
let heights = (*gap.start..=*gap.end).collect::<Vec<_>>();
144-
let heights_chucks = heights.chunks(50);
147+
let heights_chunks = heights.chunks(100);
145148

146-
for heights in heights_chucks {
149+
for heights in heights_chunks {
147150
tracing::info!(
148151
"Processing chunk: {:?} - {:?}",
149152
heights.first(),
@@ -154,47 +157,56 @@ async fn process_blocks(
154157
break 'outer;
155158
}
156159

160+
let mut join_set: JoinSet<Result<Option<u32>, PublishError>> =
161+
JoinSet::new();
157162
for height in heights {
158163
tracing::info!("Processing block height: {}", height);
159-
160-
if token.is_cancelled() {
161-
break 'outer;
162-
}
163-
164164
let message_broker = message_broker.clone();
165165
let fuel_core = fuel_core.clone();
166166
let telemetry = telemetry.clone();
167+
let block_height = *height;
167168
let sealed_block =
168169
fuel_core.get_sealed_block((*height).into())?;
169170
let sealed_block = Arc::new(sealed_block);
171+
let token = token.clone();
170172

171-
tracing::info!("Publishing block height: {}", height);
172-
let result = publish_block(
173-
&message_broker,
174-
&fuel_core,
175-
&sealed_block,
176-
&telemetry,
177-
None,
178-
)
179-
.await;
180-
181-
match result {
182-
Ok(_) => {
183-
tracing::info!(
184-
"Block {} published successfully",
185-
height
186-
);
173+
join_set.spawn(async move {
174+
if token.is_cancelled() {
175+
return Ok(None);
187176
}
188-
Err(e) => {
177+
tracing::info!("Publishing block height: {}", block_height);
178+
publish_block(
179+
&message_broker,
180+
&fuel_core,
181+
&sealed_block,
182+
&telemetry,
183+
None,
184+
)
185+
.await
186+
.map(|_| Some(block_height))
187+
.map_err(|e| {
189188
tracing::error!(
190189
"Error publishing block {}: {:?}",
191-
height,
190+
block_height,
192191
e
193192
);
194-
return Err(e.into());
195-
}
193+
e
194+
})
195+
});
196+
}
197+
while let Some(result) = join_set.join_next().await {
198+
if let Ok(Ok(Some(block_height))) = result {
199+
tracing::info!(
200+
"Block {} published successfully",
201+
block_height
202+
);
196203
}
197204
}
205+
tracing::info!(
206+
"Finished processing chunk: {:?} - {:?}",
207+
heights.first(),
208+
heights.last()
209+
);
198210
}
199211
}
200212

tests/tests/services/consumer.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,13 @@ async fn test_consumer_inserting_records() -> anyhow::Result<()> {
287287
let message_broker = Arc::clone(&message_broker);
288288
let fuel_streams = Arc::clone(&fuel_streams);
289289
let telemetry = Telemetry::new(None).await?;
290-
let block_executor =
291-
BlockExecutor::new(db, &message_broker, &fuel_streams, telemetry);
290+
let block_executor = BlockExecutor::new(
291+
db,
292+
&message_broker,
293+
&fuel_streams,
294+
telemetry,
295+
100,
296+
);
292297
async move { block_executor.start(shutdown.token()).await }
293298
});
294299

0 commit comments

Comments
 (0)