Skip to content

Commit 4ec7f70

Browse files
rymncxgreenx
andauthored
fix(relayer): ensure local state is updated after logs downloaded (#2829)
## Linked Issues/PRs <!-- List of related issues/PRs --> ## Description <!-- List of detailed changes --> delayed block production. includes deterministic start of block production ## Checklist - [ ] Breaking changes are clearly marked as such in the PR description and changelog - [ ] New behavior is reflected in tests - [ ] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [ ] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? --------- Co-authored-by: green <xgreenx9999@gmail.com>
1 parent 798d890 commit 4ec7f70

File tree

21 files changed

+617
-184
lines changed

21 files changed

+617
-184
lines changed

.cargo/audit.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
[advisories]
22
ignore = [
3-
"RUSTSEC-2024-0421" # https://github.com/FuelLabs/fuel-core/issues/2488
4-
]
3+
"RUSTSEC-2024-0421", # https://github.com/FuelLabs/fuel-core/issues/2488
4+
"RUSTSEC-2025-0009", # https://github.com/FuelLabs/fuel-core/issues/2814
5+
]

crates/fuel-core/src/service.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{
33
sync::Arc,
44
};
55

6+
use adapters::block_production_trigger::BlockProductionTrigger;
67
pub use config::{
78
Config,
89
DbType,
@@ -74,11 +75,7 @@ pub struct SharedState {
7475
pub network: Option<fuel_core_p2p::service::SharedState>,
7576
#[cfg(feature = "relayer")]
7677
/// The Relayer shared state.
77-
pub relayer: Option<
78-
fuel_core_relayer::SharedState<
79-
Database<crate::database::database_description::relayer::Relayer>,
80-
>,
81-
>,
78+
pub relayer: Option<fuel_core_relayer::SharedState>,
8279
/// The GraphQL shared state.
8380
pub graph_ql: crate::fuel_core_graphql_api::api_service::SharedState,
8481
/// The underlying database.
@@ -138,11 +135,23 @@ impl FuelService {
138135
// initialize sub services
139136
tracing::info!("Initializing sub services");
140137
database.sync_aux_db_heights(shutdown_listener)?;
141-
let (services, shared) = sub_services::init_sub_services(&config, database)?;
138+
139+
let block_production_trigger = BlockProductionTrigger::new();
140+
141+
let (services, shared) = sub_services::init_sub_services(
142+
&config,
143+
database,
144+
block_production_trigger.clone(),
145+
)?;
142146

143147
let sub_services = Arc::new(services);
144148
let task = Task::new(sub_services.clone(), shared.clone())?;
145-
let runner = ServiceRunner::new(task);
149+
let runner = ServiceRunner::new_with_params(
150+
task,
151+
TaskParams {
152+
block_production_trigger,
153+
},
154+
);
146155
let bound_address = runner.shared.graph_ql.bound_address;
147156

148157
Ok(FuelService {
@@ -396,12 +405,17 @@ impl Task {
396405
}
397406
}
398407

408+
#[derive(Default)]
409+
struct TaskParams {
410+
block_production_trigger: BlockProductionTrigger,
411+
}
412+
399413
#[async_trait::async_trait]
400414
impl RunnableService for Task {
401415
const NAME: &'static str = "FuelService";
402416
type SharedData = SharedState;
403417
type Task = Task;
404-
type TaskParams = ();
418+
type TaskParams = TaskParams;
405419

406420
fn shared_data(&self) -> Self::SharedData {
407421
self.shared.clone()
@@ -410,7 +424,7 @@ impl RunnableService for Task {
410424
async fn into_task(
411425
mut self,
412426
watcher: &StateWatcher,
413-
_: Self::TaskParams,
427+
params: Self::TaskParams,
414428
) -> anyhow::Result<Self::Task> {
415429
let mut watcher = watcher.clone();
416430

@@ -424,6 +438,9 @@ impl RunnableService for Task {
424438
}
425439
}
426440
}
441+
442+
params.block_production_trigger.send_trigger();
443+
427444
Ok(self)
428445
}
429446
}

crates/fuel-core/src/service/adapters.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ pub mod shared_sequencer;
7474
pub mod sync;
7575
pub mod txpool;
7676

77+
pub mod block_production_trigger;
78+
7779
#[derive(Debug, Clone)]
7880
pub struct ConsensusParametersProvider {
7981
shared_state: consensus_parameters_provider::SharedState,
@@ -407,7 +409,9 @@ impl ConsensusAdapter {
407409
#[derive(Clone)]
408410
pub struct MaybeRelayerAdapter {
409411
#[cfg(feature = "relayer")]
410-
pub relayer_synced: Option<fuel_core_relayer::SharedState<Database<Relayer>>>,
412+
pub relayer_synced: Option<fuel_core_relayer::SharedState>,
413+
#[cfg(feature = "relayer")]
414+
pub relayer_database: Database<Relayer>,
411415
#[cfg(feature = "relayer")]
412416
pub da_deploy_height: fuel_core_types::blockchain::primitives::DaBlockHeight,
413417
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use fuel_core_poa::ports::TriggerBlockProduction;
2+
use std::sync::Arc;
3+
4+
#[derive(Clone)]
5+
pub struct BlockProductionTrigger {
6+
notifier: Arc<tokio::sync::Notify>,
7+
}
8+
9+
impl TriggerBlockProduction for BlockProductionTrigger {
10+
async fn wait_for_trigger(&self) {
11+
self.notifier.notified().await;
12+
}
13+
}
14+
15+
impl BlockProductionTrigger {
16+
pub fn new() -> Self {
17+
Self {
18+
notifier: Arc::new(tokio::sync::Notify::new()),
19+
}
20+
}
21+
22+
pub fn send_trigger(&self) {
23+
self.notifier.notify_one();
24+
}
25+
}
26+
27+
impl Default for BlockProductionTrigger {
28+
fn default() -> Self {
29+
Self::new()
30+
}
31+
}

crates/fuel-core/src/service/adapters/consensus_module.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl RelayerPort for MaybeRelayerAdapter {
6767
) -> anyhow::Result<()> {
6868
#[cfg(feature = "relayer")]
6969
{
70-
if let Some(sync) = self.relayer_synced.as_ref() {
70+
if let Some(sync) = &self.relayer_synced {
7171
let current_height = sync.get_finalized_da_height();
7272
anyhow::ensure!(
7373
da_height.saturating_sub(*current_height) <= **_max_da_lag,

crates/fuel-core/src/service/adapters/producer.rs

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ impl fuel_core_producer::ports::Relayer for MaybeRelayerAdapter {
132132
) -> anyhow::Result<DaBlockHeight> {
133133
#[cfg(feature = "relayer")]
134134
{
135-
if let Some(sync) = self.relayer_synced.as_ref() {
135+
if let Some(sync) = &self.relayer_synced {
136136
sync.await_at_least_synced(height).await?;
137137
let highest = sync.get_finalized_da_height();
138138
Ok(highest)
@@ -157,14 +157,21 @@ impl fuel_core_producer::ports::Relayer for MaybeRelayerAdapter {
157157
) -> anyhow::Result<RelayerBlockInfo> {
158158
#[cfg(feature = "relayer")]
159159
{
160-
if let Some(sync) = self.relayer_synced.as_ref() {
161-
get_gas_cost_and_transactions_number_for_height(**height, sync)
162-
} else {
163-
Ok(RelayerBlockInfo {
164-
gas_cost: 0,
165-
tx_count: 0,
166-
})
167-
}
160+
let (gas_cost, tx_count) = self
161+
.relayer_database
162+
.get_events(height)?
163+
.iter()
164+
.fold((0u64, 0u64), |(gas_cost, tx_count), event| {
165+
let gas_cost = gas_cost.saturating_add(event.cost());
166+
let tx_count = match event {
167+
fuel_core_types::services::relayer::Event::Message(_) => tx_count,
168+
fuel_core_types::services::relayer::Event::Transaction(_) => {
169+
tx_count.saturating_add(1)
170+
}
171+
};
172+
(gas_cost, tx_count)
173+
});
174+
Ok(RelayerBlockInfo { gas_cost, tx_count })
168175
}
169176
#[cfg(not(feature = "relayer"))]
170177
{
@@ -181,35 +188,6 @@ impl fuel_core_producer::ports::Relayer for MaybeRelayerAdapter {
181188
}
182189
}
183190

184-
#[cfg(feature = "relayer")]
185-
fn get_gas_cost_and_transactions_number_for_height(
186-
height: u64,
187-
sync: &fuel_core_relayer::SharedState<
188-
crate::database::Database<
189-
crate::database::database_description::relayer::Relayer,
190-
>,
191-
>,
192-
) -> anyhow::Result<RelayerBlockInfo> {
193-
let da_height = DaBlockHeight(height);
194-
let (gas_cost, tx_count) = sync
195-
.database()
196-
.storage::<fuel_core_relayer::storage::EventsHistory>()
197-
.get(&da_height)?
198-
.unwrap_or_default()
199-
.iter()
200-
.fold((0u64, 0u64), |(gas_cost, tx_count), event| {
201-
let gas_cost = gas_cost.saturating_add(event.cost());
202-
let tx_count = match event {
203-
fuel_core_types::services::relayer::Event::Message(_) => tx_count,
204-
fuel_core_types::services::relayer::Event::Transaction(_) => {
205-
tx_count.saturating_add(1)
206-
}
207-
};
208-
(gas_cost, tx_count)
209-
});
210-
Ok(RelayerBlockInfo { gas_cost, tx_count })
211-
}
212-
213191
impl fuel_core_producer::ports::BlockProducerDatabase for OnChainIterableKeyValueView {
214192
fn latest_height(&self) -> Option<BlockHeight> {
215193
self.latest_height().ok()

crates/fuel-core/src/service/adapters/relayer.rs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,26 @@ use crate::database::{
33
Database,
44
};
55
use fuel_core_relayer::ports::Transactional;
6-
use fuel_core_storage::transactional::{
7-
HistoricalView,
8-
IntoTransaction,
9-
StorageTransaction,
6+
use fuel_core_storage::{
7+
transactional::{
8+
AtomicView,
9+
HistoricalView,
10+
IntoTransaction,
11+
StorageTransaction,
12+
},
13+
Result as StorageResult,
14+
StorageAsRef,
15+
};
16+
use fuel_core_types::{
17+
blockchain::primitives::DaBlockHeight,
18+
services::relayer::Event,
1019
};
11-
use fuel_core_types::blockchain::primitives::DaBlockHeight;
1220

1321
impl Transactional for Database<Relayer> {
14-
type Transaction<'a> = StorageTransaction<&'a mut Self> where Self: 'a;
22+
type Transaction<'a>
23+
= StorageTransaction<&'a mut Self>
24+
where
25+
Self: 'a;
1526

1627
fn transaction(&mut self) -> Self::Transaction<'_> {
1728
self.into_transaction()
@@ -21,3 +32,16 @@ impl Transactional for Database<Relayer> {
2132
HistoricalView::latest_height(self)
2233
}
2334
}
35+
36+
impl Database<Relayer> {
37+
pub fn get_events(&self, da_height: &DaBlockHeight) -> StorageResult<Vec<Event>> {
38+
let events = self
39+
.latest_view()?
40+
.storage_as_ref::<fuel_core_relayer::storage::EventsHistory>()
41+
.get(da_height)?
42+
.unwrap_or_default()
43+
.into_owned();
44+
45+
Ok(events)
46+
}
47+
}

crates/fuel-core/src/service/sub_services.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::{
1717
schema::build_schema,
1818
service::{
1919
adapters::{
20+
block_production_trigger::BlockProductionTrigger,
2021
consensus_module::poa::InDirectoryPredefinedBlocks,
2122
consensus_parameters_provider,
2223
fuel_gas_price_provider::FuelGasPriceProvider,
@@ -66,6 +67,7 @@ pub type PoAService = fuel_core_poa::Service<
6667
SignMode,
6768
InDirectoryPredefinedBlocks,
6869
SystemTime,
70+
BlockProductionTrigger,
6971
>;
7072
#[cfg(feature = "p2p")]
7173
pub type P2PService = fuel_core_p2p::service::Service<Database, TxPoolAdapter>;
@@ -86,6 +88,7 @@ pub const DEFAULT_GAS_PRICE_CHANGE_PERCENT: u16 = 10;
8688
pub fn init_sub_services(
8789
config: &Config,
8890
database: CombinedDatabase,
91+
block_production_trigger: BlockProductionTrigger,
8992
) -> anyhow::Result<(SubServices, SharedState)> {
9093
let chain_config = config.snapshot_reader.chain_config();
9194
let chain_id = chain_config.consensus_parameters.chain_id();
@@ -158,6 +161,8 @@ pub fn init_sub_services(
158161
#[cfg(feature = "relayer")]
159162
relayer_synced: relayer_service.as_ref().map(|r| r.shared.clone()),
160163
#[cfg(feature = "relayer")]
164+
relayer_database: database.relayer().clone(),
165+
#[cfg(feature = "relayer")]
161166
da_deploy_height: config.relayer.as_ref().map_or(
162167
DaBlockHeight(RelayerConfig::DEFAULT_DA_DEPLOY_HEIGHT),
163168
|config| config.da_deploy_height,
@@ -295,6 +300,7 @@ pub fn init_sub_services(
295300
signer,
296301
predefined_blocks,
297302
SystemTime,
303+
block_production_trigger,
298304
)
299305
});
300306
let poa_adapter = PoAAdapter::new(poa.as_ref().map(|service| service.shared.clone()));
@@ -376,10 +382,6 @@ pub fn init_sub_services(
376382
Box::new(consensus_parameters_provider_service),
377383
];
378384

379-
if let Some(poa) = poa {
380-
services.push(Box::new(poa));
381-
}
382-
383385
#[cfg(feature = "relayer")]
384386
if let Some(relayer) = relayer_service {
385387
services.push(Box::new(relayer));
@@ -398,5 +400,10 @@ pub fn init_sub_services(
398400
services.push(Box::new(graph_ql));
399401
services.push(Box::new(graphql_worker));
400402

403+
// always make sure that the block producer is inserted last
404+
if let Some(poa) = poa {
405+
services.push(Box::new(poa));
406+
}
407+
401408
Ok((services, shared))
402409
}

crates/services/consensus_module/poa/src/ports.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,3 +140,42 @@ impl PredefinedBlocks for InMemoryPredefinedBlocks {
140140
pub trait GetTime: Send + Sync {
141141
fn now(&self) -> Tai64;
142142
}
143+
144+
pub trait TriggerBlockProduction: Send + Sync {
145+
fn wait_for_trigger(&self) -> impl core::future::Future<Output = ()> + Send;
146+
}
147+
148+
pub(crate) struct BlockProductionTrigger<TBP> {
149+
notifier: TBP,
150+
notified: std::sync::Arc<std::sync::atomic::AtomicBool>,
151+
}
152+
153+
impl<TBP> TriggerBlockProduction for BlockProductionTrigger<TBP>
154+
where
155+
TBP: TriggerBlockProduction,
156+
{
157+
/// Cache the notification to avoid waiting for the trigger multiple times.
158+
async fn wait_for_trigger(&self) {
159+
if self.notified.load(std::sync::atomic::Ordering::Acquire) {
160+
return;
161+
}
162+
self.notifier.wait_for_trigger().await;
163+
// this is the first and only time we are notified
164+
tracing::info!("Block Production has been triggered.");
165+
166+
self.notified
167+
.store(true, std::sync::atomic::Ordering::Release);
168+
}
169+
}
170+
171+
impl<TBP> BlockProductionTrigger<TBP>
172+
where
173+
TBP: TriggerBlockProduction,
174+
{
175+
pub(crate) fn new(notifier: TBP) -> Self {
176+
Self {
177+
notifier,
178+
notified: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
179+
}
180+
}
181+
}

0 commit comments

Comments
 (0)