diff --git a/components/chainhook-cli/src/scan/bitcoin.rs b/components/chainhook-cli/src/scan/bitcoin.rs index 3427ef60..563add62 100644 --- a/components/chainhook-cli/src/scan/bitcoin.rs +++ b/components/chainhook-cli/src/scan/bitcoin.rs @@ -1,7 +1,7 @@ use crate::config::{Config, PredicatesApi}; use crate::scan::common::get_block_heights_to_scan; use crate::service::{ - open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status, + connect_to_redis_with_retry, set_confirmed_expiration_status, set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData, }; use chainhook_sdk::bitcoincore_rpc::RpcApi; @@ -71,7 +71,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( let mut predicates_db_conn = match config.http_api { PredicatesApi::On(ref api_config) => { - Some(open_readwrite_predicates_db_conn_or_panic(api_config, ctx)) + Some(connect_to_redis_with_retry(&api_config.database_uri)) } PredicatesApi::Off => None, }; diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index 5c62fdef..c075254e 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -10,8 +10,7 @@ use crate::{ config::{Config, PredicatesApi}, scan::common::get_block_heights_to_scan, service::{ - open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status, - set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData, + connect_to_redis_with_retry, set_confirmed_expiration_status, set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData }, storage::{ get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted, @@ -223,7 +222,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( let mut predicates_db_conn = match config.http_api { PredicatesApi::On(ref api_config) => { - Some(open_readwrite_predicates_db_conn_or_panic(api_config, ctx)) + Some(connect_to_redis_with_retry(&api_config.database_uri)) } PredicatesApi::Off => None, }; diff --git a/components/chainhook-cli/src/service/http_api.rs b/components/chainhook-cli/src/service/http_api.rs index 7aaeb329..af672b78 100644 --- a/components/chainhook-cli/src/service/http_api.rs +++ b/components/chainhook-cli/src/service/http_api.rs @@ -22,7 +22,7 @@ use std::error::Error; use crate::config::PredicatesApiConfig; -use super::{open_readwrite_predicates_db_conn, PredicateStatus}; +use super::{connect_to_redis_with_retry, PredicateStatus}; pub async fn start_predicate_api_server( api_config: PredicatesApiConfig, @@ -87,10 +87,9 @@ fn handle_get_predicates( ctx: &State, ) -> Json { ctx.try_log(|logger| slog::info!(logger, "Handling HTTP GET /v1/chainhooks")); - match open_readwrite_predicates_db_conn(api_config) { - Ok(mut predicates_db_conn) => { - let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, ctx) { - Ok(predicates) => predicates, + let mut predicates_db_conn = connect_to_redis_with_retry(&api_config.database_uri); + let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, ctx) { + Ok(predicates) => predicates, Err(e) => { ctx.try_log(|logger| slog::warn!(logger, "unable to retrieve predicates: {e}")); return Json(json!({ @@ -107,14 +106,8 @@ fn handle_get_predicates( Json(json!({ "status": 200, - "result": serialized_predicates - })) - } - Err(e) => Json(json!({ - "status": 500, - "message": e, - })), - } + "result": serialized_predicates + })) } #[openapi(tag = "Managing Predicates")] @@ -147,17 +140,16 @@ fn handle_create_predicate( let predicate_uuid = predicate.get_uuid().to_string(); - if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) { - if let Ok(Some(_)) = get_entry_from_predicates_db( - &ChainhookInstance::either_stx_or_btc_key(&predicate_uuid), - &mut predicates_db_conn, - ctx, - ) { - return Json(json!({ - "status": 409, - "error": "Predicate uuid already in use", - })) - } + let mut predicates_db_conn = connect_to_redis_with_retry(&api_config.database_uri); + if let Ok(Some(_)) = get_entry_from_predicates_db( + &ChainhookInstance::either_stx_or_btc_key(&predicate_uuid), + &mut predicates_db_conn, + ctx, + ) { + return Json(json!({ + "status": 409, + "error": "Predicate uuid already in use", + })) } let background_job_tx = background_job_tx.inner(); @@ -186,31 +178,24 @@ fn handle_get_predicate( ) }); - match open_readwrite_predicates_db_conn(api_config) { - Ok(mut predicates_db_conn) => { - let (predicate, status) = match get_entry_from_predicates_db( - &ChainhookInstance::either_stx_or_btc_key(&predicate_uuid), - &mut predicates_db_conn, - ctx, - ) { - Ok(Some(predicate_with_status)) => predicate_with_status, - _ => { - return Json(json!({ - "status": 404, - })) - } - }; - let result = serialized_predicate_with_status(&predicate, &status); - Json(json!({ - "status": 200, - "result": result + let mut predicates_db_conn = connect_to_redis_with_retry(&api_config.database_uri); + let (predicate, status) = match get_entry_from_predicates_db( + &ChainhookInstance::either_stx_or_btc_key(&predicate_uuid), + &mut predicates_db_conn, + ctx, + ) { + Ok(Some(predicate_with_status)) => predicate_with_status, + _ => { + return Json(json!({ + "status": 404, })) } - Err(e) => Json(json!({ - "status": 500, - "message": e, - })), - } + }; + let result = serialized_predicate_with_status(&predicate, &status); + Json(json!({ + "status": 200, + "result": result + })) } #[openapi(tag = "Managing Predicates")] @@ -334,16 +319,10 @@ pub fn get_entries_from_predicates_db( } pub fn load_predicates_from_redis( - config: &crate::config::Config, + predicate_db_conn: &mut Connection, ctx: &Context, ) -> Result, String> { - let redis_uri: &str = config.expected_api_database_uri(); - let client = redis::Client::open(redis_uri) - .map_err(|e| format!("unable to connect to redis: {}", e))?; - let mut predicate_db_conn = client - .get_connection() - .map_err(|e| format!("unable to connect to redis: {}", e))?; - get_entries_from_predicates_db(&mut predicate_db_conn, ctx) + get_entries_from_predicates_db(predicate_db_conn, ctx) } pub fn document_predicate_api_server() -> Result { diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index a23e1bbf..81cf0c1a 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -1,7 +1,7 @@ pub(crate) mod http_api; mod runloops; -use crate::config::{Config, PredicatesApi, PredicatesApiConfig}; +use crate::config::{Config, PredicatesApi}; use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server}; use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop}; use crate::storage::signers::{initialize_signers_db, store_signer_db_messages}; @@ -14,6 +14,7 @@ use crate::storage::{ use chainhook_sdk::chainhooks::types::{ChainhookSpecificationNetworkMap, ChainhookStore}; use chainhook_sdk::chainhooks::types::ChainhookInstance; +use chainhook_sdk::monitoring::PrometheusMonitoring; use chainhook_sdk::observer::{ start_event_observer, HookExpirationData, ObserverCommand, ObserverEvent, PredicateDeregisteredEvent, PredicateEvaluationReport, PredicateInterruptedData, @@ -46,64 +47,73 @@ impl Service { observer_commands_tx_rx: Option<(Sender, Receiver)>, ) -> Result<(), String> { let mut chainhook_store = ChainhookStore::new(); + let mut connection = if let PredicatesApi::On(ref config) = self.config.http_api { + Some(connect_to_redis_with_retry(&config.database_uri)) + } else if self.config.is_http_api_enabled() { + Some(connect_to_redis_with_retry(&self.config.expected_api_database_uri())) + } else { + None + }; // store all predicates from Redis that were in the process of scanning when // chainhook was shutdown - we need to resume where we left off let mut leftover_scans = vec![]; // retrieve predicates from Redis, and register each in memory if self.config.is_http_api_enabled() { - let registered_predicates = match load_predicates_from_redis(&self.config, &self.ctx) { - Ok(predicates) => predicates, - Err(e) => { - error!( - self.ctx.expect_logger(), - "Failed loading predicate from storage: {}", - e.to_string() - ); - vec![] - } - }; - for (predicate, status) in registered_predicates.into_iter() { - let predicate_uuid = predicate.uuid().to_string(); - match status { - PredicateStatus::Scanning(scanning_data) => { - leftover_scans.push((predicate.clone(), Some(scanning_data))); - } - PredicateStatus::New => { - leftover_scans.push((predicate.clone(), None)); - } - // predicates that were previously in a streaming state probably - // need to catch up on blocks - PredicateStatus::Streaming(streaming_data) => { - let scanning_data = ScanningData { - number_of_blocks_to_scan: 0, // this is the only data we don't know when converting from streaming => scanning - number_of_blocks_evaluated: streaming_data.number_of_blocks_evaluated, - number_of_times_triggered: streaming_data.number_of_times_triggered, - last_occurrence: streaming_data.last_occurrence, - last_evaluated_block_height: streaming_data.last_evaluated_block_height, - }; - leftover_scans.push((predicate.clone(), Some(scanning_data))); - } - PredicateStatus::UnconfirmedExpiration(_) => {} - PredicateStatus::ConfirmedExpiration(_) | PredicateStatus::Interrupted(_) => { - // Confirmed and Interrupted predicates don't need to be reregistered. - continue; - } - } - match chainhook_store.register_instance(predicate) { - Ok(_) => { - debug!( - self.ctx.expect_logger(), - "Predicate {} retrieved from storage and registered", predicate_uuid, - ); - } + if let Some(predicates_db_conn) = connection.as_mut() { + let registered_predicates = match load_predicates_from_redis(predicates_db_conn, &self.ctx) { + Ok(predicates) => predicates, Err(e) => { - warn!( + error!( self.ctx.expect_logger(), - "Failed to register predicate {} after retrieving from storage: {}", - predicate_uuid, + "Failed loading predicate from storage: {}", e.to_string() ); + vec![] + } + }; + for (predicate, status) in registered_predicates.into_iter() { + let predicate_uuid = predicate.uuid().to_string(); + match status { + PredicateStatus::Scanning(scanning_data) => { + leftover_scans.push((predicate.clone(), Some(scanning_data))); + } + PredicateStatus::New => { + leftover_scans.push((predicate.clone(), None)); + } + // predicates that were previously in a streaming state probably + // need to catch up on blocks + PredicateStatus::Streaming(streaming_data) => { + let scanning_data = ScanningData { + number_of_blocks_to_scan: 0, // this is the only data we don't know when converting from streaming => scanning + number_of_blocks_evaluated: streaming_data.number_of_blocks_evaluated, + number_of_times_triggered: streaming_data.number_of_times_triggered, + last_occurrence: streaming_data.last_occurrence, + last_evaluated_block_height: streaming_data.last_evaluated_block_height, + }; + leftover_scans.push((predicate.clone(), Some(scanning_data))); + } + PredicateStatus::UnconfirmedExpiration(_) => {} + PredicateStatus::ConfirmedExpiration(_) | PredicateStatus::Interrupted(_) => { + // Confirmed and Interrupted predicates don't need to be reregistered. + continue; + } + } + match chainhook_store.register_instance(predicate) { + Ok(_) => { + debug!( + self.ctx.expect_logger(), + "Predicate {} retrieved from storage and registered", predicate_uuid, + ); + } + Err(e) => { + warn!( + self.ctx.expect_logger(), + "Failed to register predicate {} after retrieving from storage: {}", + predicate_uuid, + e.to_string() + ); + } } } } @@ -112,12 +122,12 @@ impl Service { let mut newly_registered_predicates = vec![]; // For each predicate found, register in memory. for predicate in predicates_from_startup.into_iter() { - if let PredicatesApi::On(api_config) = &self.config.http_api { - if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) { + if let PredicatesApi::On(_) = &self.config.http_api { + if let Some(predicates_db_conn) = connection.as_mut() { let uuid = predicate.get_uuid(); if let Ok(Some(_)) = get_entry_from_predicates_db( &ChainhookInstance::either_stx_or_btc_key(uuid), - &mut predicates_db_conn, + predicates_db_conn, &self.ctx, ) { warn!( @@ -126,7 +136,7 @@ impl Service { ); continue; } - }; + } } match chainhook_store.register_instance_from_network_map( ( @@ -287,13 +297,6 @@ impl Service { ); let ctx = self.ctx.clone(); - match self.config.http_api { - PredicatesApi::On(ref api_config) => { - // Test redis connection - open_readwrite_predicates_db_conn(api_config)?; - } - PredicatesApi::Off => {} - }; for predicate_with_last_scanned_block in leftover_scans { match predicate_with_last_scanned_block { @@ -333,24 +336,21 @@ impl Service { // If start block specified, use it. // If no start block specified, depending on the nature the hook, we'd like to retrieve: // - contract-id - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; - update_predicate_spec( - &spec.key(), - &spec, - &mut predicates_db_conn, - &self.ctx, - ); - update_predicate_status( - &spec.key(), - PredicateStatus::New, - &mut predicates_db_conn, - &self.ctx, - ); + if let PredicatesApi::On(_) = self.config.http_api { + if let Some(predicates_db_conn) = connection.as_mut() { + update_predicate_spec( + &spec.key(), + &spec, + predicates_db_conn, + &self.ctx, + ); + update_predicate_status( + &spec.key(), + PredicateStatus::New, + predicates_db_conn, + &self.ctx, + ); + } } match spec { ChainhookInstance::Stacks(predicate_spec) => { @@ -368,132 +368,121 @@ impl Service { } } ObserverEvent::PredicateEnabled(spec) => { - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; - update_predicate_spec( - &spec.key(), - &spec, - &mut predicates_db_conn, - &self.ctx, - ); - set_predicate_streaming_status( - StreamingDataType::FinishedScanning, - &spec.key(), - &mut predicates_db_conn, - &ctx, - ); + if let PredicatesApi::On(_) = self.config.http_api { + if let Some(predicates_db_conn) = connection.as_mut() { + update_predicate_spec( + &spec.key(), + &spec, + predicates_db_conn, + &self.ctx, + ); + set_predicate_streaming_status( + StreamingDataType::FinishedScanning, + &spec.key(), + predicates_db_conn, + &ctx, + ); + } } } ObserverEvent::PredicateDeregistered(PredicateDeregisteredEvent { predicate_uuid, chain, }) => { - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; + if let PredicatesApi::On(_) = self.config.http_api { + if let Some(predicates_db_conn) = connection.as_mut() { + match chain { + Chain::Bitcoin => { + let _ = bitcoin_scan_op_tx + .send(BitcoinScanOp::KillScan(predicate_uuid.clone())); + } + Chain::Stacks => { + let _ = stacks_scan_op_tx + .send(StacksScanOp::KillScan(predicate_uuid.clone())); + } + }; - match chain { - Chain::Bitcoin => { - let _ = bitcoin_scan_op_tx - .send(BitcoinScanOp::KillScan(predicate_uuid.clone())); + let predicate_key = + ChainhookInstance::either_stx_or_btc_key(&predicate_uuid); + let res: Result<(), redis::RedisError> = + predicates_db_conn.del(predicate_key.clone()); + if let Err(e) = res { + warn!( + self.ctx.expect_logger(), + "unable to delete predicate {predicate_key}: {}", + e.to_string() + ); } - Chain::Stacks => { - let _ = stacks_scan_op_tx - .send(StacksScanOp::KillScan(predicate_uuid.clone())); - } - }; - - let predicate_key = - ChainhookInstance::either_stx_or_btc_key(&predicate_uuid); - let res: Result<(), redis::RedisError> = - predicates_db_conn.del(predicate_key.clone()); - if let Err(e) = res { - warn!( - self.ctx.expect_logger(), - "unable to delete predicate {predicate_key}: {}", - e.to_string() - ); } } } ObserverEvent::BitcoinChainEvent((chain_update, report)) => { debug!(self.ctx.expect_logger(), "Bitcoin update not stored"); - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; - - match chain_update { - chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithBlocks( - data, - ) => { - for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = - expire_predicates_for_block( - &Chain::Bitcoin, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) - { - for uuid in expired_predicate_uuids.into_iter() { - let _ = observer_command_tx.send( - ObserverCommand::ExpireBitcoinPredicate( - HookExpirationData { - hook_uuid: uuid, - block_height: confirmed_block - .block_identifier - .index, - }, - ), - ); + if let PredicatesApi::On(_) = self.config.http_api { + if let Some(predicates_db_conn) = connection.as_mut() { + match chain_update { + chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithBlocks( + data, + ) => { + for confirmed_block in &data.confirmed_blocks { + if let Some(expired_predicate_uuids) = + expire_predicates_for_block( + &Chain::Bitcoin, + confirmed_block.block_identifier.index, + predicates_db_conn, + &ctx, + ) + { + for uuid in expired_predicate_uuids.into_iter() { + let _ = observer_command_tx.send( + ObserverCommand::ExpireBitcoinPredicate( + HookExpirationData { + hook_uuid: uuid, + block_height: confirmed_block + .block_identifier + .index, + }, + ), + ); + } } } } - } - chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithReorg( - data, - ) => { - for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = - expire_predicates_for_block( - &Chain::Bitcoin, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) - { - for uuid in expired_predicate_uuids.into_iter() { - let _ = observer_command_tx.send( - ObserverCommand::ExpireBitcoinPredicate( - HookExpirationData { - hook_uuid: uuid, - block_height: confirmed_block - .block_identifier - .index, - }, - ), - ); + chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithReorg( + data, + ) => { + for confirmed_block in &data.confirmed_blocks { + if let Some(expired_predicate_uuids) = + expire_predicates_for_block( + &Chain::Bitcoin, + confirmed_block.block_identifier.index, + predicates_db_conn, + &ctx, + ) + { + for uuid in expired_predicate_uuids.into_iter() { + let _ = observer_command_tx.send( + ObserverCommand::ExpireBitcoinPredicate( + HookExpirationData { + hook_uuid: uuid, + block_height: confirmed_block + .block_identifier + .index, + }, + ), + ); + } } } } } + update_status_from_report( + Chain::Bitcoin, + report, + predicates_db_conn, + &ctx, + ); } - update_status_from_report( - Chain::Bitcoin, - report, - &mut predicates_db_conn, - &ctx, - ); } } ObserverEvent::StacksChainEvent((chain_event, report)) => { @@ -569,94 +558,87 @@ impl Service { } }; - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; - - match &chain_event { - StacksChainEvent::ChainUpdatedWithBlocks(data) => { - for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = - expire_predicates_for_block( - &Chain::Stacks, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) - { - for uuid in expired_predicate_uuids.into_iter() { - let _ = observer_command_tx.send( - ObserverCommand::ExpireStacksPredicate( - HookExpirationData { - hook_uuid: uuid, - block_height: confirmed_block - .block_identifier - .index, - }, - ), - ); + if let PredicatesApi::On(_) = self.config.http_api { + if let Some(predicates_db_conn) = connection.as_mut() { + match &chain_event { + StacksChainEvent::ChainUpdatedWithBlocks(data) => { + for confirmed_block in &data.confirmed_blocks { + if let Some(expired_predicate_uuids) = + expire_predicates_for_block( + &Chain::Stacks, + confirmed_block.block_identifier.index, + predicates_db_conn, + &ctx, + ) + { + for uuid in expired_predicate_uuids.into_iter() { + let _ = observer_command_tx.send( + ObserverCommand::ExpireStacksPredicate( + HookExpirationData { + hook_uuid: uuid, + block_height: confirmed_block + .block_identifier + .index, + }, + ), + ); + } } } } - } - StacksChainEvent::ChainUpdatedWithReorg(data) => { - for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = - expire_predicates_for_block( - &Chain::Stacks, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) - { - for uuid in expired_predicate_uuids.into_iter() { - let _ = observer_command_tx.send( - ObserverCommand::ExpireStacksPredicate( - HookExpirationData { - hook_uuid: uuid, - block_height: confirmed_block - .block_identifier - .index, - }, - ), - ); + StacksChainEvent::ChainUpdatedWithReorg(data) => { + for confirmed_block in &data.confirmed_blocks { + if let Some(expired_predicate_uuids) = + expire_predicates_for_block( + &Chain::Stacks, + confirmed_block.block_identifier.index, + predicates_db_conn, + &ctx, + ) + { + for uuid in expired_predicate_uuids.into_iter() { + let _ = observer_command_tx.send( + ObserverCommand::ExpireStacksPredicate( + HookExpirationData { + hook_uuid: uuid, + block_height: confirmed_block + .block_identifier + .index, + }, + ), + ); + } } } } - } - StacksChainEvent::ChainUpdatedWithMicroblocks(_) - | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} - StacksChainEvent::ChainUpdatedWithNonConsensusEvents(_) => { - // TODO(rafaelcr): Expire signer message predicates when appropriate - } - }; - update_status_from_report( - Chain::Stacks, - report, - &mut predicates_db_conn, - &ctx, - ); + StacksChainEvent::ChainUpdatedWithMicroblocks(_) + | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} + StacksChainEvent::ChainUpdatedWithNonConsensusEvents(_) => { + // TODO(rafaelcr): Expire signer message predicates when appropriate + } + }; + update_status_from_report( + Chain::Stacks, + report, + predicates_db_conn, + &ctx, + ); + } }; } ObserverEvent::PredicateInterrupted(PredicateInterruptedData { predicate_key, error, }) => { - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; - set_predicate_interrupted_status( - error, - &predicate_key, - &mut predicates_db_conn, - &ctx, - ); + if let PredicatesApi::On(_) = self.config.http_api { + if let Some(predicates_db_conn) = connection.as_mut() { + set_predicate_interrupted_status( + error, + &predicate_key, + predicates_db_conn, + &ctx, + ); + } } } ObserverEvent::Terminate => { @@ -1250,33 +1232,32 @@ fn retrieve_predicate_status( } } -pub fn open_readwrite_predicates_db_conn( - config: &PredicatesApiConfig, -) -> Result { - let redis_uri = &config.database_uri; - let client = redis::Client::open(redis_uri.clone()).unwrap(); - client - .get_connection() - .map_err(|e| format!("unable to connect to db: {}", e)) -} - -pub fn open_readwrite_predicates_db_conn_verbose( - config: &PredicatesApiConfig, - ctx: &Context, -) -> Result { - let res = open_readwrite_predicates_db_conn(config); - if let Err(ref e) = res { - error!(ctx.expect_logger(), "{}", e.to_string()); +pub fn connect_to_redis_with_retry(redis_uri: &str) -> Connection { + loop { + let prometheus_monitoring = PrometheusMonitoring::new(); + match redis::Client::open(redis_uri) { + Ok(client) => { + match client.get_connection() { + Ok(connection) => { + prometheus_monitoring.set_is_connected_to_predicates_db(true); + return connection; + } + Err(e) => { + prometheus_monitoring.set_is_connected_to_predicates_db(false); + let error_msg = format!("unable to connect to db: {}", e); + eprintln!("{}", error_msg); + std::thread::sleep(std::time::Duration::from_secs(1)); + } + } + } + Err(e) => { + prometheus_monitoring.set_is_connected_to_predicates_db(false); + let error_msg = format!("unable to open redis client: {}", e); + eprintln!("{}", error_msg); + std::thread::sleep(std::time::Duration::from_secs(1)); + } + } } - res -} - -// todo: evaluate expects -pub fn open_readwrite_predicates_db_conn_or_panic( - config: &PredicatesApiConfig, - ctx: &Context, -) -> Connection { - open_readwrite_predicates_db_conn_verbose(config, ctx).expect("unable to open redis conn") } #[cfg(test)] diff --git a/components/chainhook-cli/src/service/runloops.rs b/components/chainhook-cli/src/service/runloops.rs index c177b563..374c0132 100644 --- a/components/chainhook-cli/src/service/runloops.rs +++ b/components/chainhook-cli/src/service/runloops.rs @@ -19,7 +19,7 @@ use crate::{ bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate, common::PredicateScanResult, stacks::scan_stacks_chainstate_via_rocksdb_using_predicate, }, - service::{open_readwrite_predicates_db_conn_or_panic, set_predicate_interrupted_status}, storage::StacksDbConnections, + service::{connect_to_redis_with_retry, set_predicate_interrupted_status}, storage::StacksDbConnections, }; use super::ScanningData; @@ -99,10 +99,7 @@ pub fn start_stacks_scan_runloop( let error = format!( "Unable to evaluate predicate on Stacks chainstate: {e}" ); - let mut predicates_db_conn = - open_readwrite_predicates_db_conn_or_panic( - api_config, &moved_ctx, - ); + let mut predicates_db_conn = connect_to_redis_with_retry(&api_config.database_uri); set_predicate_interrupted_status( error, &predicate_spec.key(), @@ -183,10 +180,7 @@ pub fn start_bitcoin_scan_runloop( let error = format!( "Unable to evaluate predicate on Bitcoin chainstate: {e}" ); - let mut predicates_db_conn = - open_readwrite_predicates_db_conn_or_panic( - api_config, &moved_ctx, - ); + let mut predicates_db_conn = connect_to_redis_with_retry(&api_config.database_uri); set_predicate_interrupted_status( error, &predicate_spec.key(), diff --git a/components/chainhook-cli/src/service/tests/mod.rs b/components/chainhook-cli/src/service/tests/mod.rs index 2321499a..1105b893 100644 --- a/components/chainhook-cli/src/service/tests/mod.rs +++ b/components/chainhook-cli/src/service/tests/mod.rs @@ -1,4 +1,5 @@ use chainhook_sdk::chainhooks::types::ChainhookSpecificationNetworkMap; +use chainhook_sdk::monitoring::PrometheusMonitoring; use chainhook_sdk::types::Chain; use chainhook_sdk::utils::Context; use rocket::serde::json::Value as JsonValue; @@ -407,6 +408,8 @@ async fn test_stacks_predicate_status_is_updated( .map_err(|e| cleanup_err(e, &working_dir, redis_port)) .unwrap(); + let prometheus_monitoring = PrometheusMonitoring::new(); + assert_eq!(prometheus_monitoring.is_connected_to_predicates_db.get(), true as u64); for i in 1..blocks_to_mine + 1 { mine_stacks_block( stacks_ingestion_port, @@ -494,6 +497,10 @@ async fn test_bitcoin_predicate_status_is_updated( .map_err(|e| cleanup_err(e, &working_dir, redis_port)) .unwrap(); } + + let prometheus_monitoring = PrometheusMonitoring::new(); + assert_eq!(prometheus_monitoring.is_connected_to_predicates_db.get(), true as u64); + sleep(Duration::new(2, 0)); let result = get_predicate_status(uuid, chainhook_service_port) .await diff --git a/components/chainhook-cli/src/service/tests/observer_tests.rs b/components/chainhook-cli/src/service/tests/observer_tests.rs index e06df1d8..3c8a50b5 100644 --- a/components/chainhook-cli/src/service/tests/observer_tests.rs +++ b/components/chainhook-cli/src/service/tests/observer_tests.rs @@ -87,6 +87,73 @@ async fn prometheus_endpoint_returns_encoded_metrics() -> Result<(), String> { .await .map_err(|e| cleanup_err(e, &working_dir, redis_port))?; + + // Define expected metric groups with their expected values + let expected_metrics = [ + // Bitcoin metrics + ("chainhook_btc_block_evaluation_lag", "0"), + ("chainhook_btc_canonical_fork_lag", "0"), + ("chainhook_btc_deregistered_predicates", "0"), + ("chainhook_btc_highest_block_appended", "0"), + ("chainhook_btc_highest_block_evaluated", "0"), + ("chainhook_btc_highest_block_received", "0"), + ("chainhook_btc_last_block_ingestion_time", "0"), + ("chainhook_btc_last_reorg_applied_blocks", "0"), + ("chainhook_btc_last_reorg_rolled_back_blocks", "0"), + ("chainhook_btc_last_reorg_timestamp", "0"), + ("chainhook_btc_registered_predicates", "0"), + + // Stacks metrics + ("chainhook_stx_block_evaluation_lag", "0"), + ("chainhook_stx_canonical_fork_lag", "0"), + ("chainhook_stx_deregistered_predicates", "0"), + ("chainhook_stx_highest_block_appended", "1"), + ("chainhook_stx_highest_block_evaluated", "1"), + ("chainhook_stx_highest_block_received", "1"), + ("chainhook_stx_last_reorg_applied_blocks", "0"), + ("chainhook_stx_last_reorg_rolled_back_blocks", "0"), + ("chainhook_stx_last_reorg_timestamp", "0"), + ("chainhook_stx_registered_predicates", "1"), + ]; + + // Verify each metric exists with the expected value + for (metric_name, expected_value) in expected_metrics { + // Check the gauge line exists with the correct value + let gauge_line = format!("{} {}", metric_name, expected_value); + assert!( + metrics.contains(&gauge_line), + "Metric '{}' with value '{}' not found in response", + metric_name, + expected_value + ); + + // Check that the TYPE line exists + let type_line = format!("# TYPE {} gauge", metric_name); + assert!( + metrics.contains(&type_line), + "Type declaration for '{}' not found", + metric_name + ); + + // Check that the HELP line exists + assert!( + metrics.contains(&format!("# HELP {}", metric_name)), + "Help text for '{}' not found", + metric_name + ); + } + + // Special case for timestamp which will vary + assert!( + metrics.contains("# TYPE chainhook_stx_last_block_ingestion_time gauge") && + metrics.contains("# HELP chainhook_stx_last_block_ingestion_time") && + metrics.contains("chainhook_stx_last_block_ingestion_time "), + "Last block ingestion time metric is missing or incomplete" + ); + + + println!("metrics: {}", metrics); + const EXPECTED: &str = "# HELP chainhook_stx_registered_predicates The number of Stacks predicates that have been registered by the Chainhook node.\n# TYPE chainhook_stx_registered_predicates gauge\nchainhook_stx_registered_predicates 1\n"; assert!(metrics.contains(EXPECTED)); diff --git a/components/chainhook-sdk/src/monitoring.rs b/components/chainhook-sdk/src/monitoring.rs index dfb33ded..740d3373 100644 --- a/components/chainhook-sdk/src/monitoring.rs +++ b/components/chainhook-sdk/src/monitoring.rs @@ -13,9 +13,15 @@ use prometheus::{ }; use rocket::serde::json::{json, Value as JsonValue}; use std::time::{SystemTime, UNIX_EPOCH}; +use lazy_static::lazy_static; type UInt64Gauge = GenericGauge; +// Create a global Registry that can be accessed from anywhere +lazy_static! { + pub static ref GLOBAL_REGISTRY: Registry = Registry::new(); +} + #[derive(Debug, Clone)] pub struct PrometheusMonitoring { pub stx_highest_block_appended: UInt64Gauge, @@ -41,7 +47,7 @@ pub struct PrometheusMonitoring { pub btc_last_block_ingestion_time: UInt64Gauge, pub btc_registered_predicates: UInt64Gauge, pub btc_deregistered_predicates: UInt64Gauge, - pub registry: Registry, + pub is_connected_to_predicates_db: UInt64Gauge, } impl Default for PrometheusMonitoring { @@ -52,123 +58,130 @@ impl Default for PrometheusMonitoring { impl PrometheusMonitoring { pub fn new() -> PrometheusMonitoring { - let registry = Registry::new(); + // Use the global registry instead of creating a new one + let registry = &GLOBAL_REGISTRY; + // stacks metrics let stx_highest_block_appended = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_stx_highest_block_appended", "The highest Stacks block successfully appended to a Chainhook node fork.", ); let stx_highest_block_received = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_stx_highest_block_received", "The highest Stacks block received by the Chainhook node from the Stacks node.", ); let stx_highest_block_evaluated = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_stx_highest_block_evaluated", "The highest Stacks block successfully evaluated against predicates.", ); let stx_canonical_fork_lag = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_stx_canonical_fork_lag", "The difference between the highest Stacks block received and the highest Stacks block appended.", ); let stx_block_evaluation_lag = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_stx_block_evaluation_lag", "The difference between the highest Stacks block appended and the highest Stacks block evaluated.", ); let stx_last_reorg_timestamp = PrometheusMonitoring::create_and_register_int_gauge( - ®istry, + registry, "chainhook_stx_last_reorg_timestamp", "The timestamp of the latest Stacks reorg ingested by the Chainhook node.", ); let stx_last_reorg_applied_blocks = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_stx_last_reorg_applied_blocks", "The number of blocks applied to the Stacks chain as part of the latest Stacks reorg.", ); let stx_last_reorg_rolled_back_blocks = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_stx_last_reorg_rolled_back_blocks", "The number of blocks rolled back from the Stacks chain as part of the latest Stacks reorg.", ); let stx_last_block_ingestion_time = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_stx_last_block_ingestion_time", "The time that the Chainhook node last ingested a Stacks block.", ); let stx_registered_predicates = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_stx_registered_predicates", "The number of Stacks predicates that have been registered by the Chainhook node.", ); let stx_deregistered_predicates = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_stx_deregistered_predicates", "The number of Stacks predicates that have been deregistered by the Chainhook node.", ); // bitcoin metrics let btc_highest_block_appended = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_btc_highest_block_appended", "The highest Bitcoin block successfully appended to a Chainhook node fork.", ); let btc_highest_block_received = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_btc_highest_block_received", "The highest Bitcoin block received by the Chainhook node from the Bitcoin node.", ); let btc_highest_block_evaluated = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_btc_highest_block_evaluated", "The highest Bitcoin block successfully evaluated against predicates.", ); let btc_canonical_fork_lag = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_btc_canonical_fork_lag", "The difference between the highest Bitcoin block received and the highest Bitcoin block appended.", ); let btc_block_evaluation_lag = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_btc_block_evaluation_lag", "The difference between the highest Bitcoin block appended and the highest Bitcoin block evaluated.", ); let btc_last_reorg_timestamp = PrometheusMonitoring::create_and_register_int_gauge( - ®istry, + registry, "chainhook_btc_last_reorg_timestamp", "The timestamp of the latest Bitcoin reorg ingested by the Chainhook node.", ); let btc_last_reorg_applied_blocks = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_btc_last_reorg_applied_blocks", "The number of blocks applied to the Bitcoin chain as part of the latest Bitcoin reorg.", ); let btc_last_reorg_rolled_back_blocks = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_btc_last_reorg_rolled_back_blocks", "The number of blocks rolled back from the Bitcoin chain as part of the latest Bitcoin reorg.", ); let btc_last_block_ingestion_time = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_btc_last_block_ingestion_time", "The time that the Chainhook node last ingested a Bitcoin block.", ); let btc_registered_predicates = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_btc_registered_predicates", "The number of Bitcoin predicates that have been registered by the Chainhook node.", ); let btc_deregistered_predicates = PrometheusMonitoring::create_and_register_uint64_gauge( - ®istry, + registry, "chainhook_btc_deregistered_predicates", "The number of Bitcoin predicates that have been deregistered by the Chainhook node.", ); + let is_connected_to_predicates_db = PrometheusMonitoring::create_and_register_uint64_gauge( + registry, + "chainhook_is_connected_to_predicates_db", + "Whether the Chainhook node is connected to the predicates database.", + ); PrometheusMonitoring { stx_highest_block_appended, @@ -194,7 +207,7 @@ impl PrometheusMonitoring { btc_last_block_ingestion_time, btc_registered_predicates, btc_deregistered_predicates, - registry, + is_connected_to_predicates_db, } } // setup helpers @@ -204,13 +217,26 @@ impl PrometheusMonitoring { help: &str, ) -> UInt64Gauge { let g = UInt64Gauge::new(name, help).unwrap(); - registry.register(Box::new(g.clone())).unwrap(); + + // Instead of directly registering which can fail if already registered + registry.register(Box::new(g.clone())).unwrap_or_else(|e| { + // Only log if it's not already a "duplicate metrics" error + if !e.to_string().contains("already registered") { + eprintln!("Failed to register metric: {}", e); + } + }); g } pub fn create_and_register_int_gauge(registry: &Registry, name: &str, help: &str) -> IntGauge { let g = IntGauge::new(name, help).unwrap(); - registry.register(Box::new(g.clone())).unwrap(); + // Instead of directly registering which can fail if already registered + registry.register(Box::new(g.clone())).unwrap_or_else(|e| { + // Only log if it's not already a "duplicate metrics" error + if !e.to_string().contains("already registered") { + eprintln!("Failed to register metric: {}", e); + } + }); g } @@ -365,6 +391,10 @@ impl PrometheusMonitoring { } } + pub fn set_is_connected_to_predicates_db(&self, is_connected: bool) { + self.is_connected_to_predicates_db.set(is_connected as u64); + } + pub fn get_metrics(&self) -> JsonValue { json!({ "bitcoin": { @@ -396,14 +426,14 @@ impl PrometheusMonitoring { }, "registered_predicates": self.stx_registered_predicates.get(), "deregistered_predicates": self.stx_deregistered_predicates.get(), - } + }, + "is_connected_to_predicates_db": self.is_connected_to_predicates_db.get(), }) } } async fn serve_req( req: Request, - registry: Registry, ctx: Context, ) -> Result, hyper::Error> { match (req.method(), req.uri().path()) { @@ -416,7 +446,8 @@ async fn serve_req( }); let encoder = TextEncoder::new(); - let metric_families = registry.gather(); + // Use the global registry instead of a passed registry + let metric_families = GLOBAL_REGISTRY.gather(); let mut buffer = vec![]; let response = match encoder.encode(&metric_families, &mut buffer) { Ok(_) => Response::builder() @@ -453,15 +484,14 @@ async fn serve_req( } } -pub async fn start_serving_prometheus_metrics(port: u16, registry: Registry, ctx: Context) { +pub async fn start_serving_prometheus_metrics(port: u16, ctx: Context) { let addr = ([0, 0, 0, 0], port).into(); let ctx_clone = ctx.clone(); let make_svc = make_service_fn(|_| { - let registry = registry.clone(); let ctx_clone = ctx_clone.clone(); async move { Ok::<_, hyper::Error>(service_fn(move |r| { - serve_req(r, registry.clone(), ctx_clone.clone()) + serve_req(r, ctx_clone.clone()) })) } }); diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index 1abe47ae..b57bd581 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -911,12 +911,11 @@ pub async fn start_bitcoin_event_observer( ); if let Some(port) = config.prometheus_monitoring_port { - let registry_moved = prometheus_monitoring.registry.clone(); + // let registry_moved = prometheus_monitoring.registry.clone(); let ctx_cloned = ctx.clone(); let _ = std::thread::spawn(move || { hiro_system_kit::nestable_block_on(start_serving_prometheus_metrics( port, - registry_moved, ctx_cloned, )); }); @@ -982,12 +981,11 @@ pub async fn start_stacks_event_observer( ); if let Some(port) = config.prometheus_monitoring_port { - let registry_moved = prometheus_monitoring.registry.clone(); + // let registry_moved = prometheus_monitoring.registry.clone(); let ctx_cloned = ctx.clone(); let _ = std::thread::spawn(move || { hiro_system_kit::nestable_block_on(start_serving_prometheus_metrics( port, - registry_moved, ctx_cloned, )); });