Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 7 additions & 22 deletions daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ mod stats;

use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use crate::config::{parse_args, parse_config};
use crate::diag::run_diag_read_thread;
Expand Down Expand Up @@ -43,8 +42,8 @@ use rayhunter::diag_device::DiagDevice;
use stats::get_log;
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::RwLock;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::{RwLock, oneshot};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
Expand Down Expand Up @@ -126,12 +125,9 @@ async fn init_qmdl_store(config: &config::Config) -> Result<RecordingStore, Rayh
// Start a thread that'll track when user hits ctrl+c. When that happens,
// trigger various cleanup tasks, including sending signals to other threads to
// shutdown
#[allow(clippy::too_many_arguments)]
fn run_shutdown_thread(
task_tracker: &TaskTracker,
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
daemon_restart_rx: oneshot::Receiver<()>,
should_restart_flag: Arc<AtomicBool>,
shutdown_token: CancellationToken,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
analysis_tx: Sender<AnalysisCtrlMessage>,
Expand All @@ -144,17 +140,9 @@ fn run_shutdown_thread(
if let Err(err) = res {
error!("Unable to listen for shutdown signal: {err}");
}

should_restart_flag.store(false, Ordering::Relaxed);
}
res = daemon_restart_rx => {
if let Err(err) = res {
error!("Unable to listen for shutdown signal: {err}");
}

should_restart_flag.store(true, Ordering::Relaxed);
}
};
_ = shutdown_token.cancelled() => {}
}

let mut qmdl_store = qmdl_store_lock.write().await;
if qmdl_store.current_entry.is_some() {
Expand Down Expand Up @@ -209,7 +197,8 @@ async fn run_with_config(
let (diag_tx, diag_rx) = mpsc::channel::<DiagDeviceCtrlMessage>(1);
let (ui_update_tx, ui_update_rx) = mpsc::channel::<display::DisplayState>(1);
let (analysis_tx, analysis_rx) = mpsc::channel::<AnalysisCtrlMessage>(5);
let shutdown_token = CancellationToken::new();
let restart_token = CancellationToken::new();
let shutdown_token = restart_token.child_token();

let notification_service = NotificationService::new(config.ntfy_url.clone());

Expand Down Expand Up @@ -255,7 +244,6 @@ async fn run_with_config(
);
}

let (daemon_restart_tx, daemon_restart_rx) = oneshot::channel::<()>();
let analysis_status_lock = Arc::new(RwLock::new(analysis_status));
run_analysis_thread(
&task_tracker,
Expand All @@ -264,13 +252,10 @@ async fn run_with_config(
analysis_status_lock.clone(),
config.analyzers.clone(),
);
let should_restart_flag = Arc::new(AtomicBool::new(false));

run_shutdown_thread(
&task_tracker,
diag_tx.clone(),
daemon_restart_rx,
should_restart_flag.clone(),
shutdown_token.clone(),
qmdl_store_lock.clone(),
analysis_tx.clone(),
Expand All @@ -283,7 +268,7 @@ async fn run_with_config(
diag_device_ctrl_sender: diag_tx,
analysis_status_lock,
analysis_sender: analysis_tx,
daemon_restart_tx: Arc::new(RwLock::new(Some(daemon_restart_tx))),
daemon_restart_token: restart_token.clone(),
ui_update_sender: Some(ui_update_tx),
});
run_server(&task_tracker, state, shutdown_token.clone()).await;
Expand All @@ -292,7 +277,7 @@ async fn run_with_config(
task_tracker.wait().await;

info!("see you space cowboy...");
Ok(should_restart_flag.load(Ordering::Relaxed))
Ok(restart_token.is_cancelled())
}

#[cfg(test)]
Expand Down
30 changes: 9 additions & 21 deletions daemon/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use log::{error, warn};
use std::sync::Arc;
use tokio::fs::write;
use tokio::io::{AsyncReadExt, copy, duplex};
use tokio::sync::RwLock;
use tokio::sync::mpsc::Sender;
use tokio::sync::{RwLock, oneshot};
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use tokio_util::io::ReaderStream;
use tokio_util::sync::CancellationToken;

use crate::DiagDeviceCtrlMessage;
use crate::analysis::{AnalysisCtrlMessage, AnalysisStatus};
Expand All @@ -32,7 +33,7 @@ pub struct ServerState {
pub diag_device_ctrl_sender: Sender<DiagDeviceCtrlMessage>,
pub analysis_status_lock: Arc<RwLock<AnalysisStatus>>,
pub analysis_sender: Sender<AnalysisCtrlMessage>,
pub daemon_restart_tx: Arc<RwLock<Option<oneshot::Sender<()>>>>,
pub daemon_restart_token: CancellationToken,
pub ui_update_sender: Option<Sender<DisplayState>>,
}

Expand Down Expand Up @@ -128,24 +129,11 @@ pub async fn set_config(
})?;

// Trigger daemon restart after writing config
let mut restart_tx = state.daemon_restart_tx.write().await;
if let Some(sender) = restart_tx.take() {
sender.send(()).map_err(|_| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"couldn't send restart signal".to_string(),
)
})?;
Ok((
StatusCode::ACCEPTED,
"wrote config and triggered restart".to_string(),
))
} else {
Ok((
StatusCode::ACCEPTED,
"wrote config but restart already triggered".to_string(),
))
}
state.daemon_restart_token.cancel();
Ok((
StatusCode::ACCEPTED,
"wrote config and triggered restart".to_string(),
))
}

pub async fn get_zip(
Expand Down Expand Up @@ -326,7 +314,7 @@ mod tests {
diag_device_ctrl_sender: tx,
analysis_status_lock: Arc::new(RwLock::new(analysis_status)),
analysis_sender: analysis_tx,
daemon_restart_tx: Arc::new(RwLock::new(None)),
daemon_restart_token: CancellationToken::new(),
ui_update_sender: None,
})
}
Expand Down
Loading