Skip to content

Commit 39acd32

Browse files
committed
added retry mechanism
1 parent 50593f8 commit 39acd32

File tree

8 files changed

+128
-36
lines changed

8 files changed

+128
-36
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ lru = "0.12.5"
2626
bimap = { version = "0.6.3", features = ["serde"] }
2727
prometheus_exporter = "0.8.5"
2828
subtle-encoding = "0.5.1"
29+
thiserror = "1.0.56"
30+
tokio-retry2 = { version = "0.5", features = ["jitter", "tracing"] }
2931

3032
[build-dependencies]
3133
vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] }

src/checks/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@ use block::BlockHeigtCheck;
66
use epoch::EpochCheck;
77
use total_supply_native::TotalSupplyNativeCheck;
88

9-
109
pub enum Checks {
1110
BlockHeightCheck(BlockHeigtCheck),
1211
EpochCheck(EpochCheck),
13-
TotalSupplyNative(TotalSupplyNativeCheck)
12+
TotalSupplyNative(TotalSupplyNativeCheck),
1413
}
1514

1615
pub fn all_checks() -> Vec<Checks> {

src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ pub struct AppConfig {
1717
#[clap(long, env, default_value_t = 9184)]
1818
pub prometheus_port: u64,
1919

20+
#[clap(long, env, default_value_t = 1)]
21+
pub initial_block_height: u64,
22+
2023
#[clap(long, env, default_value_t = 5)]
2124
pub sleep_for: u64,
2225

src/error.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use tokio_retry2::RetryError;
2+
3+
pub trait AsRetryError<T> {
4+
fn into_retry_error(self) -> Result<T, RetryError<std::io::Error>>;
5+
}
6+
7+
impl<T> AsRetryError<T> for anyhow::Result<T> {
8+
#[inline]
9+
fn into_retry_error(self) -> Result<T, RetryError<std::io::Error>> {
10+
self.map_err(|reason| {
11+
tracing::error!(?reason, "RPC error");
12+
let err = std::io::Error::new(std::io::ErrorKind::Other, reason.to_string());
13+
RetryError::Transient {
14+
err,
15+
retry_after: None,
16+
}
17+
})
18+
}
19+
}

src/main.rs

Lines changed: 79 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,30 @@
11
pub mod apprise;
22
pub mod checks;
33
pub mod config;
4+
pub mod error;
45
pub mod log;
56
pub mod rpc;
67
pub mod shared;
78
pub mod state;
89

9-
use std::net::SocketAddr;
10+
use std::{net::SocketAddr, sync::Arc};
1011

1112
use anyhow::Context;
1213
use apprise::AppRise;
1314
use checks::all_checks;
1415
use clap::Parser;
1516
use config::AppConfig;
17+
use error::AsRetryError;
1618
use prometheus_exporter::prometheus::Registry;
1719
use rpc::Rpc;
1820
use shared::checksums::Checksums;
1921
use state::State;
22+
use tokio::sync::RwLock;
23+
use tokio_retry2::{strategy::ExponentialBackoff, Retry};
24+
25+
fn notify(err: &std::io::Error, duration: std::time::Duration) {
26+
tracing::info!("Error {err:?} occurred at {duration:?}");
27+
}
2028

2129
#[tokio::main]
2230
async fn main() -> anyhow::Result<()> {
@@ -36,37 +44,74 @@ async fn main() -> anyhow::Result<()> {
3644
checksums.add(code_path, code);
3745
}
3846

39-
let mut state = State::new(checksums);
40-
let registry = state.prometheus_registry();
47+
let state = Arc::new(RwLock::new(State::new(
48+
checksums,
49+
config.initial_block_height,
50+
)));
51+
let unlocked_state = state.read().await;
52+
let registry = unlocked_state.prometheus_registry();
53+
drop(unlocked_state);
4154

4255
start_prometheus_exporter(registry, config.prometheus_port)?;
4356

57+
let retry_strategy = retry_strategy();
58+
4459
loop {
45-
let pre_state = state.clone();
46-
47-
let native_token = rpc.query_native_token().await?;
48-
let block_height = state.next_block_height();
49-
let epoch = rpc.query_current_epoch(block_height).await?.unwrap_or(0);
50-
let block = rpc
51-
.query_block(block_height, &state.checksums, epoch)
52-
.await?;
53-
let total_supply_native = rpc.query_total_supply(&native_token).await?;
54-
55-
state.update(block, total_supply_native);
56-
57-
for check_kind in all_checks() {
58-
let check_res = match check_kind {
59-
checks::Checks::BlockHeightCheck(check) => check.run(&pre_state, &state).await,
60-
checks::Checks::EpochCheck(check) => check.run(&pre_state, &state).await,
61-
checks::Checks::TotalSupplyNative(check) => check.run(&pre_state, &state).await,
62-
};
63-
if let Err(error) = check_res {
64-
tracing::error!("Error: {}", error.to_string());
65-
apprise.send_to_slack(error.to_string()).await?;
66-
}
67-
}
68-
69-
tracing::info!("Done block {}", block_height);
60+
Retry::spawn_notify(
61+
retry_strategy.clone(),
62+
|| async {
63+
let pre_state_lock = state.read().await;
64+
let block_height = pre_state_lock.next_block_height();
65+
let checksums = pre_state_lock.checksums.clone();
66+
let pre_state = pre_state_lock.clone();
67+
drop(pre_state_lock);
68+
69+
let native_token = rpc.query_native_token().await.into_retry_error()?;
70+
let epoch = rpc
71+
.query_current_epoch(block_height)
72+
.await
73+
.into_retry_error()?
74+
.unwrap_or(0);
75+
let block = rpc
76+
.query_block(block_height, &checksums, epoch)
77+
.await
78+
.into_retry_error()?;
79+
let total_supply_native = rpc
80+
.query_total_supply(&native_token)
81+
.await
82+
.into_retry_error()?;
83+
84+
let mut post_state_lock = state.write().await;
85+
post_state_lock.update(block, total_supply_native);
86+
let post_state = post_state_lock.clone();
87+
drop(post_state_lock);
88+
89+
for check_kind in all_checks() {
90+
let check_res = match check_kind {
91+
checks::Checks::BlockHeightCheck(check) => {
92+
check.run(&pre_state, &post_state).await
93+
}
94+
checks::Checks::EpochCheck(check) => {
95+
check.run(&pre_state, &post_state).await
96+
}
97+
checks::Checks::TotalSupplyNative(check) => {
98+
check.run(&pre_state, &post_state).await
99+
}
100+
};
101+
if let Err(error) = check_res {
102+
tracing::error!("Error: {}", error.to_string());
103+
apprise
104+
.send_to_slack(error.to_string())
105+
.await
106+
.into_retry_error()?;
107+
}
108+
}
109+
110+
Ok(())
111+
},
112+
notify,
113+
)
114+
.await?;
70115
}
71116
}
72117

@@ -80,3 +125,9 @@ fn start_prometheus_exporter(registry: Registry, port: u64) -> anyhow::Result<()
80125

81126
Ok(())
82127
}
128+
129+
fn retry_strategy() -> ExponentialBackoff {
130+
ExponentialBackoff::from_millis(1000)
131+
.factor(1)
132+
.max_delay_millis(10000)
133+
}

src/rpc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::str::FromStr;
33
use anyhow::Context;
44
use futures::FutureExt;
55
use namada_sdk::{
6-
address::Address as NamadaAddress, hash::Hash, ibc::context::client, io::Client, rpc,
6+
address::Address as NamadaAddress, hash::Hash, io::Client, rpc,
77
state::Key,
88
};
99
use tendermint_rpc::{HttpClient, Url};

src/state.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ impl PrometheusMetrics {
5656
.register(Box::new(block_height_counter.clone()))
5757
.unwrap();
5858
registry.register(Box::new(epoch_counter.clone())).unwrap();
59-
registry.register(Box::new(total_supply_native_token.clone())).unwrap();
59+
registry
60+
.register(Box::new(total_supply_native_token.clone()))
61+
.unwrap();
6062

6163
Self {
6264
block_height_counter,
@@ -76,9 +78,9 @@ impl PrometheusMetrics {
7678
}
7779

7880
impl State {
79-
pub fn new(checksums: Checksums) -> Self {
81+
pub fn new(checksums: Checksums, block_height: u64) -> Self {
8082
Self {
81-
latest_block_height: None,
83+
latest_block_height: Some(block_height),
8284
latest_epoch: None,
8385
latest_total_supply_native: None,
8486
checksums,
@@ -115,9 +117,11 @@ impl State {
115117
if let Some(total_supply) = self.latest_total_supply_native {
116118
self.metrics
117119
.total_supply_native_token
118-
.inc_by(total_supply - total_supply);
120+
.inc_by(total_supply_native - total_supply);
119121
} else {
120-
self.metrics.total_supply_native_token.inc_by(total_supply_native);
122+
self.metrics
123+
.total_supply_native_token
124+
.inc_by(total_supply_native);
121125
}
122126
self.latest_total_supply_native = Some(total_supply_native);
123127

0 commit comments

Comments
 (0)