Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ thiserror = "1.0.52"
libc = "0.2.150"
log = "0.4.20"
env_logger = { version = "0.11", default-features = false }
tokio-util = { version = "0.7.10", features = ["rt", "io"] }
tokio-util = { version = "0.7.10", features = ["rt", "io", "compat"] }
futures-macro = "0.3.30"
include_dir = "0.7.3"
mime_guess = "2.0.4"
Expand All @@ -40,3 +40,5 @@ serde_json = "1.0.114"
image = { version = "0.25.1", default-features = false, features = ["png", "gif"] }
tempfile = "3.10.1"
simple_logger = "5.0.0"
async_zip = { version = "0.0.17", features = ["tokio"] }
anyhow = "1.0.98"
3 changes: 2 additions & 1 deletion bin/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::diag::run_diag_read_thread;
use crate::error::RayhunterError;
use crate::pcap::get_pcap;
use crate::qmdl_store::RecordingStore;
use crate::server::{get_qmdl, serve_static, ServerState};
use crate::server::{get_qmdl, get_zip, serve_static, ServerState};
use crate::stats::get_system_stats;

use analysis::{
Expand Down Expand Up @@ -46,6 +46,7 @@ fn get_router() -> AppRouter {
Router::new()
.route("/api/pcap/{name}", get(get_pcap))
.route("/api/qmdl/{name}", get(get_qmdl))
.route("/api/zip/{name}", get(get_zip))
.route("/api/system-stats", get(get_system_stats))
.route("/api/qmdl-manifest", get(get_qmdl_manifest))
.route("/api/start-recording", post(start_recording))
Expand Down
70 changes: 40 additions & 30 deletions bin/src/pcap.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use crate::ServerState;

use anyhow::Error;
use axum::body::Body;
use axum::extract::{Path, State};
use axum::http::header::CONTENT_TYPE;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use futures::TryStreamExt;
use log::error;
use rayhunter::diag::DataType;
use rayhunter::gsmtap_parser;
use rayhunter::pcap::GsmtapPcapWriter;
use rayhunter::qmdl::QmdlReader;
use std::sync::Arc;
use std::{future, pin::pin};
use tokio::io::duplex;
use tokio::io::{duplex, AsyncRead, AsyncWrite};
use tokio_util::io::ReaderStream;

// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data
Expand Down Expand Up @@ -42,39 +41,50 @@ pub async fn get_pcap(
// the QMDL reader should stop at the last successfully written data chunk
// (entry.size_bytes)
let (reader, writer) = duplex(1024);
let mut pcap_writer = GsmtapPcapWriter::new(writer).await.unwrap();
pcap_writer.write_iface_header().await.unwrap();

tokio::spawn(async move {
let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes));
let mut messages_stream = pin!(reader
.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));

while let Some(container) = messages_stream
.try_next()
.await
.expect("failed getting QMDL container")
{
for maybe_msg in container.into_messages() {
match maybe_msg {
Ok(msg) => {
let maybe_gsmtap_msg =
gsmtap_parser::parse(msg).expect("error parsing gsmtap message");
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
pcap_writer
.write_gsmtap_message(gsmtap_msg, timestamp)
.await
.expect("error writing pcap packet");
}
}
Err(e) => error!("error parsing message: {:?}", e),
}
}
if let Err(e) = generate_pcap_data(writer, qmdl_file, qmdl_size_bytes).await {
error!("failed to generate PCAP: {:?}", e);
}
});

let headers = [(CONTENT_TYPE, "application/vnd.tcpdump.pcap")];
let body = Body::from_stream(ReaderStream::new(reader));
Ok((headers, body).into_response())
}

pub async fn generate_pcap_data<R, W>(
writer: W,
qmdl_file: R,
qmdl_size_bytes: usize,
) -> Result<(), Error>
where
W: AsyncWrite + Unpin + Send,
R: AsyncRead + Unpin,
{
let mut pcap_writer = GsmtapPcapWriter::new(writer).await?;
pcap_writer.write_iface_header().await?;

let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes));
while let Some(container) = reader.get_next_messages_container().await? {
if container.data_type != DataType::UserSpace {
continue;
}

for maybe_msg in container.into_messages() {
match maybe_msg {
Ok(msg) => {
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?;
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
pcap_writer
.write_gsmtap_message(gsmtap_msg, timestamp)
.await?;
}
}
Err(e) => error!("error parsing message: {:?}", e),
}
}
}

Ok(())
}
203 changes: 202 additions & 1 deletion bin/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use anyhow::Error;
use async_zip::tokio::write::ZipFileWriter;
use async_zip::Compression;
use async_zip::ZipEntryBuilder;
use axum::body::Body;
use axum::extract::Path;
use axum::extract::State;
use axum::http::header::{self, CONTENT_LENGTH, CONTENT_TYPE};
use axum::http::{HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response};
use include_dir::{include_dir, Dir};
use log::error;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::io::{copy, duplex, AsyncReadExt};
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use tokio_util::io::ReaderStream;

use crate::analysis::{AnalysisCtrlMessage, AnalysisStatus};
use crate::pcap::generate_pcap_data;
use crate::qmdl_store::RecordingStore;
use crate::{display, DiagDeviceCtrlMessage};

Expand Down Expand Up @@ -76,3 +83,197 @@ pub async fn serve_static(
.unwrap(),
}
}

pub async fn get_zip(
State(state): State<Arc<ServerState>>,
Path(entry_name): Path<String>,
) -> Result<Response, (StatusCode, String)> {
let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned();
let (entry_index, qmdl_size_bytes) = {
let qmdl_store = state.qmdl_store_lock.read().await;
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or((
StatusCode::NOT_FOUND,
format!("couldn't find entry with name {}", qmdl_idx),
))?;

if entry.qmdl_size_bytes == 0 {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
"QMDL file is empty, try again in a bit!".to_string(),
));
}

(entry_index, entry.qmdl_size_bytes)
};

let qmdl_store_lock = state.qmdl_store_lock.clone();

let (reader, writer) = duplex(8192);

tokio::spawn(async move {
let result: Result<(), Error> = async {
let mut zip = ZipFileWriter::with_tokio(writer);

// Add QMDL file
{
let entry =
ZipEntryBuilder::new(format!("{qmdl_idx}.qmdl").into(), Compression::Stored);
// FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does
// not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed
// once https://github.com/Majored/rs-async-zip/pull/160 is released.
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();

let mut qmdl_file = {
let qmdl_store = qmdl_store_lock.read().await;
qmdl_store
.open_entry_qmdl(entry_index)
.await?
.take(qmdl_size_bytes as u64)
};

copy(&mut qmdl_file, &mut entry_writer).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooh copy seems handy

entry_writer.into_inner().close().await?;
}

// Add PCAP file
{
let entry =
ZipEntryBuilder::new(format!("{qmdl_idx}.pcapng").into(), Compression::Stored);
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();

let qmdl_file_for_pcap = {
let qmdl_store = qmdl_store_lock.read().await;
qmdl_store
.open_entry_qmdl(entry_index)
.await?
.take(qmdl_size_bytes as u64)
};

if let Err(e) =
generate_pcap_data(&mut entry_writer, qmdl_file_for_pcap, qmdl_size_bytes).await
{
// if we fail to generate the PCAP file, we should still continue and give the
// user the QMDL.
error!("Failed to generate PCAP: {:?}", e);
}

entry_writer.into_inner().close().await?;
}

zip.close().await?;
Ok(())
}
.await;

if let Err(e) = result {
error!("Error generating ZIP file: {:?}", e);
}
});

let headers = [(CONTENT_TYPE, "application/zip")];
let body = Body::from_stream(ReaderStream::new(reader));
Ok((headers, body).into_response())
}

#[cfg(test)]
mod tests {
use super::*;
use async_zip::base::read::mem::ZipFileReader;
use axum::extract::{Path, State};
use std::io::Cursor;
use tempfile::TempDir;

async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) {
let temp_dir = TempDir::new().unwrap();
let store_path = temp_dir.path().to_path_buf();
let store = crate::qmdl_store::RecordingStore::create(&store_path)
.await
.unwrap();
(temp_dir, Arc::new(RwLock::new(store)))
}

async fn create_test_entry_with_data(
store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>,
test_data: &[u8],
) -> String {
let entry_name = {
let mut store = store_lock.write().await;
let (mut qmdl_file, _analysis_file) = store.new_entry().await.unwrap();

if !test_data.is_empty() {
use tokio::io::AsyncWriteExt;
qmdl_file.write_all(test_data).await.unwrap();
qmdl_file.flush().await.unwrap();
}

let current_entry = store.current_entry.unwrap();
let entry = &store.manifest.entries[current_entry];
let entry_name = entry.name.clone();

store
.update_entry_qmdl_size(current_entry, test_data.len())
.await
.unwrap();
entry_name
};

let mut store = store_lock.write().await;
store.close_current_entry().await.unwrap();
entry_name
}

fn create_test_server_state(
store_lock: Arc<RwLock<crate::qmdl_store::RecordingStore>>,
) -> Arc<ServerState> {
let (tx, _rx) = tokio::sync::mpsc::channel(1);
let (ui_tx, _ui_rx) = tokio::sync::mpsc::channel(1);
let (analysis_tx, _analysis_rx) = tokio::sync::mpsc::channel(1);

let analysis_status = {
let store = store_lock.try_read().unwrap();
crate::analysis::AnalysisStatus::new(&*store)
};

Arc::new(ServerState {
qmdl_store_lock: store_lock,
diag_device_ctrl_sender: tx,
ui_update_sender: ui_tx,
analysis_status_lock: Arc::new(RwLock::new(analysis_status)),
analysis_sender: analysis_tx,
debug_mode: true,
})
}

#[tokio::test]
async fn test_get_zip_success() {
let (_temp_dir, store_lock) = create_test_qmdl_store().await;
let test_qmdl_data = vec![0x7E, 0x00, 0x00, 0x00, 0x10, 0x00, 0x7E];
let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await;
let state = create_test_server_state(store_lock);

let result = get_zip(State(state), Path(entry_name.clone())).await;

assert!(result.is_ok());
let response = result.unwrap();

let headers = response.headers();
assert_eq!(headers.get("content-type").unwrap(), "application/zip");

let body = response.into_body();
let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap();

let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap();

let filenames = zip_reader
.file()
.entries()
.iter()
.map(|entry| entry.filename().as_str().unwrap().to_owned())
.collect::<Vec<String>>();

assert_eq!(
filenames,
vec![format!("{entry_name}.qmdl"), format!("{entry_name}.pcapng"),]
);
}
}
Loading
Loading