From 00e3491fc6dd753bbe5cbb97a75e5343f9039f25 Mon Sep 17 00:00:00 2001 From: Simon Fondrie-Teitler Date: Wed, 17 Sep 2025 18:44:04 -0400 Subject: [PATCH] Use a cancellation token for restart logic as well --- daemon/src/main.rs | 29 +++++++---------------------- daemon/src/server.rs | 30 +++++++++--------------------- 2 files changed, 16 insertions(+), 43 deletions(-) diff --git a/daemon/src/main.rs b/daemon/src/main.rs index a121d47c..5195062d 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -13,7 +13,6 @@ mod stats; use std::net::SocketAddr; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; use crate::config::{parse_args, parse_config}; use crate::diag::run_diag_read_thread; @@ -43,8 +42,8 @@ use rayhunter::diag_device::DiagDevice; use stats::get_log; use tokio::net::TcpListener; use tokio::select; +use tokio::sync::RwLock; use tokio::sync::mpsc::{self, Sender}; -use tokio::sync::{RwLock, oneshot}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; @@ -126,12 +125,9 @@ async fn init_qmdl_store(config: &config::Config) -> Result, - daemon_restart_rx: oneshot::Receiver<()>, - should_restart_flag: Arc, shutdown_token: CancellationToken, qmdl_store_lock: Arc>, analysis_tx: Sender, @@ -144,17 +140,9 @@ fn run_shutdown_thread( if let Err(err) = res { error!("Unable to listen for shutdown signal: {err}"); } - - should_restart_flag.store(false, Ordering::Relaxed); - } - res = daemon_restart_rx => { - if let Err(err) = res { - error!("Unable to listen for shutdown signal: {err}"); - } - - should_restart_flag.store(true, Ordering::Relaxed); } - }; + _ = shutdown_token.cancelled() => {} + } let mut qmdl_store = qmdl_store_lock.write().await; if qmdl_store.current_entry.is_some() { @@ -209,7 +197,8 @@ async fn run_with_config( let (diag_tx, diag_rx) = mpsc::channel::(1); let (ui_update_tx, ui_update_rx) = mpsc::channel::(1); let (analysis_tx, analysis_rx) = mpsc::channel::(5); - let shutdown_token = CancellationToken::new(); + let restart_token = CancellationToken::new(); + let shutdown_token = restart_token.child_token(); let notification_service = NotificationService::new(config.ntfy_url.clone()); @@ -255,7 +244,6 @@ async fn run_with_config( ); } - let (daemon_restart_tx, daemon_restart_rx) = oneshot::channel::<()>(); let analysis_status_lock = Arc::new(RwLock::new(analysis_status)); run_analysis_thread( &task_tracker, @@ -264,13 +252,10 @@ async fn run_with_config( analysis_status_lock.clone(), config.analyzers.clone(), ); - let should_restart_flag = Arc::new(AtomicBool::new(false)); run_shutdown_thread( &task_tracker, diag_tx.clone(), - daemon_restart_rx, - should_restart_flag.clone(), shutdown_token.clone(), qmdl_store_lock.clone(), analysis_tx.clone(), @@ -283,7 +268,7 @@ async fn run_with_config( diag_device_ctrl_sender: diag_tx, analysis_status_lock, analysis_sender: analysis_tx, - daemon_restart_tx: Arc::new(RwLock::new(Some(daemon_restart_tx))), + daemon_restart_token: restart_token.clone(), ui_update_sender: Some(ui_update_tx), }); run_server(&task_tracker, state, shutdown_token.clone()).await; @@ -292,7 +277,7 @@ async fn run_with_config( task_tracker.wait().await; info!("see you space cowboy..."); - Ok(should_restart_flag.load(Ordering::Relaxed)) + Ok(restart_token.is_cancelled()) } #[cfg(test)] diff --git a/daemon/src/server.rs b/daemon/src/server.rs index 64b4d8b6..b9ba5bb7 100644 --- a/daemon/src/server.rs +++ b/daemon/src/server.rs @@ -13,10 +13,11 @@ use log::{error, warn}; use std::sync::Arc; use tokio::fs::write; use tokio::io::{AsyncReadExt, copy, duplex}; +use tokio::sync::RwLock; use tokio::sync::mpsc::Sender; -use tokio::sync::{RwLock, oneshot}; use tokio_util::compat::FuturesAsyncWriteCompatExt; use tokio_util::io::ReaderStream; +use tokio_util::sync::CancellationToken; use crate::DiagDeviceCtrlMessage; use crate::analysis::{AnalysisCtrlMessage, AnalysisStatus}; @@ -32,7 +33,7 @@ pub struct ServerState { pub diag_device_ctrl_sender: Sender, pub analysis_status_lock: Arc>, pub analysis_sender: Sender, - pub daemon_restart_tx: Arc>>>, + pub daemon_restart_token: CancellationToken, pub ui_update_sender: Option>, } @@ -128,24 +129,11 @@ pub async fn set_config( })?; // Trigger daemon restart after writing config - let mut restart_tx = state.daemon_restart_tx.write().await; - if let Some(sender) = restart_tx.take() { - sender.send(()).map_err(|_| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - "couldn't send restart signal".to_string(), - ) - })?; - Ok(( - StatusCode::ACCEPTED, - "wrote config and triggered restart".to_string(), - )) - } else { - Ok(( - StatusCode::ACCEPTED, - "wrote config but restart already triggered".to_string(), - )) - } + state.daemon_restart_token.cancel(); + Ok(( + StatusCode::ACCEPTED, + "wrote config and triggered restart".to_string(), + )) } pub async fn get_zip( @@ -326,7 +314,7 @@ mod tests { diag_device_ctrl_sender: tx, analysis_status_lock: Arc::new(RwLock::new(analysis_status)), analysis_sender: analysis_tx, - daemon_restart_tx: Arc::new(RwLock::new(None)), + daemon_restart_token: CancellationToken::new(), ui_update_sender: None, }) }