Skip to content

Commit 28de77a

Browse files
committed
chore(cubestore): Add simple allocator tracking stats for benchmarks
1 parent 1b18344 commit 28de77a

File tree

4 files changed

+188
-77
lines changed

4 files changed

+188
-77
lines changed

rust/cubestore/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ members = [
99
"cuberpc",
1010
"cuberockstore",
1111
]
12+
13+
[profile.bench]
14+
debug = true

rust/cubestore/cubestore/benches/cachestore_queue.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
1+
use criterion::{criterion_group, BenchmarkId, Criterion};
22
use cubestore::cachestore::{
33
CacheStore, QueueAddPayload, QueueItemStatus, QueueKey, RocksCacheStore,
44
};
@@ -7,6 +7,13 @@ use cubestore::CubeError;
77
use std::sync::Arc;
88
use tokio::runtime::{Builder, Runtime};
99

10+
mod tracking_allocator;
11+
12+
use tracking_allocator::TrackingAllocator;
13+
14+
#[global_allocator]
15+
static ALLOCATOR: TrackingAllocator = TrackingAllocator::new();
16+
1017
fn prepare_cachestore(name: &str) -> Result<Arc<RocksCacheStore>, CubeError> {
1118
let config = Config::test(&name).update_config(|mut config| {
1219
// disable periodic eviction
@@ -188,6 +195,7 @@ fn do_get_bench(
188195
}
189196

190197
fn do_benches(c: &mut Criterion) {
198+
ALLOCATOR.reset_stats();
191199
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
192200

193201
do_insert_bench(c, &runtime, 512, 64);
@@ -201,4 +209,8 @@ fn do_benches(c: &mut Criterion) {
201209
}
202210

203211
criterion_group!(benches, do_benches);
204-
criterion_main!(benches);
212+
213+
fn main() {
214+
benches();
215+
ALLOCATOR.print_stats();
216+
}

rust/cubestore/cubestore/benches/metastore.rs

Lines changed: 50 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
1+
use criterion::{criterion_group, BenchmarkId, Criterion};
22
use cubestore::config::Config;
33
use cubestore::metastore::{BaseRocksStoreFs, Column, ColumnType, MetaStore, RocksMetaStore};
44
use cubestore::remotefs::LocalDirRemoteFs;
@@ -8,6 +8,13 @@ use std::fs;
88
use std::sync::Arc;
99
use tokio::runtime::{Builder, Runtime};
1010

11+
mod tracking_allocator;
12+
13+
use tracking_allocator::TrackingAllocator;
14+
15+
#[global_allocator]
16+
static ALLOCATOR: TrackingAllocator = TrackingAllocator::new();
17+
1118
fn prepare_metastore(name: &str) -> Result<Arc<RocksMetaStore>, CubeError> {
1219
let config = Config::test(name);
1320

@@ -97,118 +104,86 @@ async fn bench_get_tables_with_path(
97104
}
98105
}
99106

100-
fn benchmark_get_tables_with_path_small(c: &mut Criterion, runtime: &Runtime) {
107+
fn do_get_tables_with_path_bench(
108+
c: &mut Criterion,
109+
runtime: &Runtime,
110+
num_schemas: usize,
111+
tables_per_schema: usize,
112+
iterations: usize,
113+
) {
114+
let total_tables = num_schemas * tables_per_schema;
101115
let metastore = runtime.block_on(async {
102-
let metastore = prepare_metastore("get_tables_with_path_small").unwrap();
103-
populate_metastore(&metastore, 10, 10).await.unwrap(); // 100 tables
116+
let metastore = prepare_metastore(&format!("get_tables_with_path_{}", total_tables)).unwrap();
117+
populate_metastore(&metastore, num_schemas, tables_per_schema).await.unwrap();
104118
metastore
105119
});
106120

107121
c.bench_with_input(
108-
BenchmarkId::new("get_tables_with_path_small_include_non_ready_true", 100),
109-
&100,
122+
BenchmarkId::new("get_tables_with_path_include_non_ready_true", total_tables),
123+
&iterations,
110124
|b, &iterations| {
111125
b.to_async(runtime)
112126
.iter(|| bench_get_tables_with_path(&metastore, true, iterations));
113127
},
114128
);
115129

116130
c.bench_with_input(
117-
BenchmarkId::new("get_tables_with_path_small_include_non_ready_false", 100),
118-
&100,
131+
BenchmarkId::new("get_tables_with_path_include_non_ready_false", total_tables),
132+
&iterations,
119133
|b, &iterations| {
120134
b.to_async(runtime)
121135
.iter(|| bench_get_tables_with_path(&metastore, false, iterations));
122136
},
123137
);
124138
}
125139

126-
fn benchmark_get_tables_with_path_medium(c: &mut Criterion, runtime: &Runtime) {
127-
let metastore = runtime.block_on(async {
128-
let metastore = prepare_metastore("get_tables_with_path_medium").unwrap();
129-
populate_metastore(&metastore, 50, 20).await.unwrap(); // 1,000 tables
130-
metastore
131-
});
132-
133-
c.bench_with_input(
134-
BenchmarkId::new("get_tables_with_path_medium_include_non_ready_true", 50),
135-
&50,
136-
|b, &iterations| {
137-
b.to_async(runtime)
138-
.iter(|| bench_get_tables_with_path(&metastore, true, iterations));
139-
},
140-
);
141-
142-
c.bench_with_input(
143-
BenchmarkId::new("get_tables_with_path_medium_include_non_ready_false", 50),
144-
&50,
145-
|b, &iterations| {
146-
b.to_async(runtime)
147-
.iter(|| bench_get_tables_with_path(&metastore, false, iterations));
148-
},
149-
);
140+
async fn do_cold_cache_test(num_schemas: usize, tables_per_schema: usize) {
141+
let fresh_metastore = prepare_metastore("cold_cache_fresh").unwrap();
142+
populate_metastore(&fresh_metastore, num_schemas, tables_per_schema).await.unwrap();
143+
let result = fresh_metastore.get_tables_with_path(false).await;
144+
assert!(result.is_ok());
150145
}
151146

152-
fn benchmark_get_tables_with_path_large(c: &mut Criterion, runtime: &Runtime) {
153-
let metastore = runtime.block_on(async {
154-
let metastore = prepare_metastore("get_tables_with_path_large").unwrap();
155-
populate_metastore(&metastore, 25, 1000).await.unwrap(); // 25,000 tables
156-
metastore
157-
});
158-
159-
c.bench_with_input(
160-
BenchmarkId::new("get_tables_with_path_large_include_non_ready_true", 10),
161-
&10,
162-
|b, &iterations| {
163-
b.to_async(runtime)
164-
.iter(|| bench_get_tables_with_path(&metastore, true, iterations));
165-
},
166-
);
167-
168-
c.bench_with_input(
169-
BenchmarkId::new("get_tables_with_path_large_include_non_ready_false", 10),
170-
&10,
171-
|b, &iterations| {
172-
b.to_async(runtime)
173-
.iter(|| bench_get_tables_with_path(&metastore, false, iterations));
174-
},
175-
);
147+
async fn do_warm_cache_test(metastore: &Arc<RocksMetaStore>) {
148+
let result = metastore.get_tables_with_path(false).await;
149+
assert!(result.is_ok());
176150
}
177151

178-
fn cold_vs_warm_cache_benchmark(c: &mut Criterion, runtime: &Runtime) {
152+
fn do_cold_vs_warm_cache_bench(
153+
c: &mut Criterion,
154+
runtime: &Runtime,
155+
num_schemas: usize,
156+
tables_per_schema: usize,
157+
) {
179158
let metastore = runtime.block_on(async {
180159
let metastore = prepare_metastore("cold_warm_cache").unwrap();
181-
populate_metastore(&metastore, 20, 50).await.unwrap(); // 1,000 tables
160+
populate_metastore(&metastore, num_schemas, tables_per_schema).await.unwrap();
182161
metastore
183162
});
184163

185-
// Cold cache benchmark (first call)
186164
c.bench_function("get_tables_with_path_cold_cache", |b| {
187-
b.to_async(runtime).iter(|| async {
188-
let fresh_metastore = prepare_metastore("cold_cache_fresh").unwrap();
189-
populate_metastore(&fresh_metastore, 20, 50).await.unwrap();
190-
let result = fresh_metastore.get_tables_with_path(false).await;
191-
assert!(result.is_ok());
192-
});
165+
b.to_async(runtime).iter(|| do_cold_cache_test(num_schemas, tables_per_schema));
193166
});
194167

195-
// Warm cache benchmark (subsequent calls)
196168
c.bench_function("get_tables_with_path_warm_cache", |b| {
197-
b.to_async(runtime).iter(|| async {
198-
let result = metastore.get_tables_with_path(false).await;
199-
assert!(result.is_ok());
200-
});
169+
b.to_async(runtime).iter(|| do_warm_cache_test(&metastore));
201170
});
202171
}
203172

204173
fn do_benches(c: &mut Criterion) {
174+
ALLOCATOR.reset_stats();
205175
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
206176

207-
benchmark_get_tables_with_path_small(c, &runtime);
208-
benchmark_get_tables_with_path_medium(c, &runtime);
209-
benchmark_get_tables_with_path_large(c, &runtime);
210-
cold_vs_warm_cache_benchmark(c, &runtime);
177+
do_get_tables_with_path_bench(c, &runtime, 10, 10, 100);
178+
do_get_tables_with_path_bench(c, &runtime, 50, 20, 50);
179+
do_get_tables_with_path_bench(c, &runtime, 25, 1000, 10);
180+
181+
do_cold_vs_warm_cache_bench(c, &runtime, 20, 50);
211182
}
212183

213184
criterion_group!(benches, do_benches);
214-
criterion_main!(benches);
185+
186+
fn main() {
187+
benches();
188+
ALLOCATOR.print_stats();
189+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use std::alloc::{GlobalAlloc, Layout, System};
2+
use std::sync::atomic::{AtomicUsize, Ordering};
3+
4+
pub struct TrackingAllocator {
5+
inner: System,
6+
allocations: AtomicUsize,
7+
deallocations: AtomicUsize,
8+
reallocations: AtomicUsize,
9+
current_allocated: AtomicUsize,
10+
peak_allocated: AtomicUsize,
11+
total_allocated: AtomicUsize,
12+
}
13+
14+
impl TrackingAllocator {
15+
pub const fn new() -> Self {
16+
Self {
17+
inner: System,
18+
allocations: AtomicUsize::new(0),
19+
deallocations: AtomicUsize::new(0),
20+
reallocations: AtomicUsize::new(0),
21+
current_allocated: AtomicUsize::new(0),
22+
peak_allocated: AtomicUsize::new(0),
23+
total_allocated: AtomicUsize::new(0),
24+
}
25+
}
26+
27+
pub fn reset_stats(&self) {
28+
self.allocations.store(0, Ordering::Relaxed);
29+
self.deallocations.store(0, Ordering::Relaxed);
30+
self.reallocations.store(0, Ordering::Relaxed);
31+
self.current_allocated.store(0, Ordering::Relaxed);
32+
self.peak_allocated.store(0, Ordering::Relaxed);
33+
self.total_allocated.store(0, Ordering::Relaxed);
34+
}
35+
36+
37+
pub fn print_stats(&self) {
38+
let allocations = self.allocations.load(Ordering::Relaxed);
39+
let deallocations = self.deallocations.load(Ordering::Relaxed);
40+
let reallocations = self.reallocations.load(Ordering::Relaxed);
41+
let current_allocated = self.current_allocated.load(Ordering::Relaxed);
42+
let peak_allocated = self.peak_allocated.load(Ordering::Relaxed);
43+
let total_allocated = self.total_allocated.load(Ordering::Relaxed);
44+
45+
println!("=== FINAL MEMORY STATISTICS ===");
46+
println!("Total allocations: {}", allocations);
47+
println!("Total deallocations: {}", deallocations);
48+
println!("Total reallocations: {}", reallocations);
49+
println!("Current allocated: {} bytes ({:.2} MB)", current_allocated, current_allocated as f64 / 1024.0 / 1024.0);
50+
println!("Peak allocated: {} bytes ({:.2} MB)", peak_allocated, peak_allocated as f64 / 1024.0 / 1024.0);
51+
println!("Total allocated: {} bytes ({:.2} MB)", total_allocated, total_allocated as f64 / 1024.0 / 1024.0);
52+
println!("===============================");
53+
}
54+
55+
fn update_allocated(&self, size: usize, is_allocation: bool) {
56+
if is_allocation {
57+
self.allocations.fetch_add(1, Ordering::Relaxed);
58+
self.total_allocated.fetch_add(size, Ordering::Relaxed);
59+
let current = self.current_allocated.fetch_add(size, Ordering::Relaxed) + size;
60+
61+
// Update peak if current exceeds it
62+
let mut peak = self.peak_allocated.load(Ordering::Relaxed);
63+
while current > peak {
64+
match self.peak_allocated.compare_exchange_weak(
65+
peak,
66+
current,
67+
Ordering::Relaxed,
68+
Ordering::Relaxed,
69+
) {
70+
Ok(_) => break,
71+
Err(new_peak) => peak = new_peak,
72+
}
73+
}
74+
} else {
75+
self.deallocations.fetch_add(1, Ordering::Relaxed);
76+
self.current_allocated.fetch_sub(size, Ordering::Relaxed);
77+
}
78+
}
79+
}
80+
81+
unsafe impl GlobalAlloc for TrackingAllocator {
82+
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
83+
let ptr = self.inner.alloc(layout);
84+
if !ptr.is_null() {
85+
self.update_allocated(layout.size(), true);
86+
}
87+
ptr
88+
}
89+
90+
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
91+
self.inner.dealloc(ptr, layout);
92+
self.update_allocated(layout.size(), false);
93+
}
94+
95+
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
96+
let new_ptr = self.inner.realloc(ptr, layout, new_size);
97+
if !new_ptr.is_null() {
98+
self.reallocations.fetch_add(1, Ordering::Relaxed);
99+
100+
// Update counters: subtract old size, add new size
101+
self.current_allocated.fetch_sub(layout.size(), Ordering::Relaxed);
102+
self.total_allocated.fetch_add(new_size, Ordering::Relaxed);
103+
let current = self.current_allocated.fetch_add(new_size, Ordering::Relaxed) + new_size;
104+
105+
// Update peak if current exceeds it
106+
let mut peak = self.peak_allocated.load(Ordering::Relaxed);
107+
while current > peak {
108+
match self.peak_allocated.compare_exchange_weak(
109+
peak,
110+
current,
111+
Ordering::Relaxed,
112+
Ordering::Relaxed,
113+
) {
114+
Ok(_) => break,
115+
Err(new_peak) => peak = new_peak,
116+
}
117+
}
118+
}
119+
new_ptr
120+
}
121+
}

0 commit comments

Comments
 (0)