Skip to content

Commit dd9a310

Browse files
committed
log throughput
1 parent 1600ee4 commit dd9a310

File tree

1 file changed

+35
-14
lines changed

1 file changed

+35
-14
lines changed

iris-mpc-store/src/s3_importer.rs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,15 @@ use async_trait::async_trait;
33
use aws_sdk_s3::{primitives::ByteStream, Client};
44
use futures::{stream, Stream, StreamExt};
55
use iris_mpc_common::{IRIS_CODE_LENGTH, MASK_CODE_LENGTH};
6-
use std::{mem, pin::Pin, sync::Arc};
6+
use std::{
7+
mem,
8+
pin::Pin,
9+
sync::{
10+
atomic::{AtomicUsize, Ordering},
11+
Arc,
12+
},
13+
time::Instant,
14+
};
715
use tokio::io::AsyncReadExt;
816

917
const SINGLE_ELEMENT_SIZE: usize = IRIS_CODE_LENGTH * mem::size_of::<u16>() * 2
@@ -131,24 +139,32 @@ pub async fn fetch_and_parse_chunks(
131139
.map(|num| format!("{}/{}.bin", prefix_name, num))
132140
.collect();
133141
tracing::info!("Generated {} chunk names", chunks.len());
142+
let total_bytes = Arc::new(AtomicUsize::new(0));
143+
let now = Instant::now();
134144

135145
let result_stream = stream::iter(chunks)
136-
.map(move |chunk| async move {
137-
let mut object_stream = store.get_object(&chunk).await?.into_async_read();
138-
let mut records = Vec::with_capacity(last_snapshot_details.chunk_size as usize);
139-
let mut buf = vec![0u8; SINGLE_ELEMENT_SIZE];
140-
loop {
141-
match object_stream.read_exact(&mut buf).await {
142-
Ok(_) => {
143-
let iris = StoredIris::from_bytes(&buf);
144-
records.push(iris);
146+
.map({
147+
let total_bytes_clone = total_bytes.clone();
148+
move |chunk| {
149+
let counter = total_bytes_clone.clone();
150+
async move {
151+
let mut object_stream = store.get_object(&chunk).await?.into_async_read();
152+
let mut records = Vec::with_capacity(last_snapshot_details.chunk_size as usize);
153+
let mut buf = vec![0u8; SINGLE_ELEMENT_SIZE];
154+
loop {
155+
match object_stream.read_exact(&mut buf).await {
156+
Ok(_) => {
157+
let iris = StoredIris::from_bytes(&buf);
158+
records.push(iris);
159+
counter.fetch_add(SINGLE_ELEMENT_SIZE, Ordering::Relaxed);
160+
}
161+
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
162+
Err(e) => return Err(e.into()),
163+
}
145164
}
146-
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
147-
Err(e) => return Err(e.into()),
165+
Ok::<_, eyre::Error>(stream::iter(records))
148166
}
149167
}
150-
151-
Ok::<_, eyre::Error>(stream::iter(records))
152168
})
153169
.buffer_unordered(concurrency)
154170
.flat_map(|result| match result {
@@ -157,6 +173,11 @@ pub async fn fetch_and_parse_chunks(
157173
})
158174
.boxed();
159175

176+
tracing::info!(
177+
"Overall download throughput: {:.2} Gbps",
178+
total_bytes.load(Ordering::Relaxed) as f32 * 8.0 / 1e9 / now.elapsed().as_secs_f32()
179+
);
180+
160181
result_stream
161182
}
162183

0 commit comments

Comments
 (0)