Skip to content

Commit e4f93bc

Browse files
authored
feat(sv-publisher): Remove historical gaps check on service start (#474)
1 parent cb52162 commit e4f93bc

File tree

3 files changed

+201
-172
lines changed

3 files changed

+201
-172
lines changed

services/publisher/src/history.rs

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
use std::sync::Arc;
2+
3+
use fuel_message_broker::NatsMessageBroker;
4+
use fuel_streams_core::types::*;
5+
use fuel_streams_domains::infra::Db;
6+
use fuel_web_utils::{shutdown::ShutdownController, telemetry::Telemetry};
7+
use tokio::{sync::Semaphore, task::JoinSet};
8+
use tokio_util::sync::CancellationToken;
9+
10+
use crate::{
11+
error::PublishError,
12+
gaps::{find_next_block_to_save, BlockHeightGap},
13+
metrics::Metrics,
14+
publish::publish_block,
15+
};
16+
17+
pub async fn process_historical_gaps(
18+
from_block: BlockHeight,
19+
db: &Arc<Db>,
20+
message_broker: &Arc<NatsMessageBroker>,
21+
fuel_core: &Arc<dyn FuelCoreLike>,
22+
last_block_height: &Arc<BlockHeight>,
23+
shutdown: &Arc<ShutdownController>,
24+
telemetry: &Arc<Telemetry<Metrics>>,
25+
) -> Result<
26+
impl std::future::Future<Output = Result<(), PublishError>>,
27+
anyhow::Error,
28+
> {
29+
let gaps = Arc::new(
30+
find_next_block_to_save(&db, *last_block_height.clone()).await?,
31+
);
32+
tracing::info!("Found {} block gaps to process", gaps.len());
33+
for gap in gaps.iter() {
34+
tracing::info!("Gap: {} to {}", gap.start, gap.end);
35+
}
36+
Ok(process_historical_blocks(
37+
from_block.into(),
38+
&message_broker,
39+
&fuel_core,
40+
&last_block_height,
41+
&gaps,
42+
shutdown.token().clone(),
43+
&telemetry,
44+
))
45+
}
46+
47+
fn process_historical_blocks(
48+
from_block: BlockHeight,
49+
message_broker: &Arc<NatsMessageBroker>,
50+
fuel_core: &Arc<dyn FuelCoreLike>,
51+
last_block_height: &Arc<BlockHeight>,
52+
gaps: &Arc<Vec<BlockHeightGap>>,
53+
token: CancellationToken,
54+
telemetry: &Arc<Telemetry<Metrics>>,
55+
) -> impl std::future::Future<Output = Result<(), PublishError>> {
56+
let message_broker = message_broker.clone();
57+
let fuel_core = fuel_core.clone();
58+
let gaps = gaps.to_vec();
59+
let last_block_height = *last_block_height.clone();
60+
let telemetry = telemetry.clone();
61+
62+
async move {
63+
if token.is_cancelled() {
64+
tracing::info!("Historical block processor received shutdown signal before starting");
65+
return Ok(());
66+
}
67+
68+
let Some(processed_gaps) =
69+
get_historical_block_range(from_block, &gaps, last_block_height)
70+
else {
71+
return Ok(());
72+
};
73+
74+
process_blocks_with_join_set(
75+
processed_gaps,
76+
message_broker,
77+
fuel_core,
78+
telemetry,
79+
token,
80+
)
81+
.await;
82+
83+
Ok(())
84+
}
85+
}
86+
87+
async fn process_blocks_with_join_set(
88+
processed_gaps: Vec<BlockHeightGap>,
89+
message_broker: Arc<NatsMessageBroker>,
90+
fuel_core: Arc<dyn FuelCoreLike>,
91+
telemetry: Arc<Telemetry<Metrics>>,
92+
token: CancellationToken,
93+
) {
94+
let semaphore = Arc::new(Semaphore::new(32));
95+
let mut join_set = JoinSet::new();
96+
97+
// Spawn tasks for each block height
98+
'outer: for gap in processed_gaps {
99+
for height in *gap.start..=*gap.end {
100+
if token.is_cancelled() {
101+
break 'outer;
102+
}
103+
104+
let message_broker = message_broker.clone();
105+
let fuel_core = fuel_core.clone();
106+
let telemetry = telemetry.clone();
107+
let semaphore = semaphore.clone();
108+
let token = token.clone();
109+
110+
join_set.spawn(async move {
111+
if token.is_cancelled() {
112+
return Ok(());
113+
}
114+
115+
let _permit = semaphore.acquire().await.unwrap();
116+
let height = height.into();
117+
let sealed_block = fuel_core.get_sealed_block(height)?;
118+
let sealed_block = Arc::new(sealed_block);
119+
120+
publish_block(
121+
&message_broker,
122+
&fuel_core,
123+
&sealed_block,
124+
&telemetry,
125+
None,
126+
)
127+
.await
128+
});
129+
}
130+
}
131+
132+
tracing::info!("Waiting for remaining tasks to complete...");
133+
134+
while let Some(result) = join_set.join_next().await {
135+
if token.is_cancelled() {
136+
tracing::info!(
137+
"Shutdown signal received, aborting remaining tasks..."
138+
);
139+
join_set.abort_all();
140+
break;
141+
}
142+
143+
match result {
144+
Ok(Ok(_)) => {
145+
tracing::debug!("Block processed successfully")
146+
}
147+
Ok(Err(e)) => {
148+
tracing::error!("Error processing block: {:?}", e)
149+
}
150+
Err(e) => {
151+
tracing::error!("Task error: {:?}", e);
152+
}
153+
}
154+
}
155+
156+
if token.is_cancelled() {
157+
tracing::info!("Waiting for aborted tasks to complete...");
158+
while let Some(result) = join_set.join_next().await {
159+
if let Err(e) = result {
160+
tracing::debug!("Aborted task error: {:?}", e);
161+
}
162+
}
163+
}
164+
}
165+
166+
fn get_historical_block_range(
167+
from_block: BlockHeight,
168+
gaps: &[BlockHeightGap],
169+
last_block_height: BlockHeight,
170+
) -> Option<Vec<BlockHeightGap>> {
171+
if gaps.is_empty() {
172+
return None;
173+
}
174+
175+
let mut processed_gaps = Vec::new();
176+
for gap in gaps {
177+
let start = std::cmp::max(from_block, gap.start);
178+
let end = std::cmp::min(gap.end, last_block_height);
179+
180+
if start <= end {
181+
processed_gaps.push(BlockHeightGap { start, end });
182+
}
183+
}
184+
185+
if processed_gaps.is_empty() {
186+
tracing::info!("No historical blocks to process");
187+
return None;
188+
}
189+
190+
let total_blocks: u32 = processed_gaps
191+
.iter()
192+
.map(|gap| *gap.end - *gap.start + 1)
193+
.sum();
194+
195+
tracing::info!(
196+
"Processing {total_blocks} historical blocks from {} gaps",
197+
processed_gaps.len()
198+
);
199+
Some(processed_gaps)
200+
}

services/publisher/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod cli;
22
pub mod error;
33
pub mod gaps;
4+
pub mod history;
45
pub mod metrics;
56
pub mod publish;
67
pub mod recover;

0 commit comments

Comments
 (0)