Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/v/cloud_topics/batch_cache/batch_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ batch_cache::get(const model::topic_id_partition& tidp, model::offset o) {
return std::nullopt;
}

void batch_cache::evict_up_to(
const model::topic_id_partition& tidp, model::offset o) {
if (_lm == nullptr) {
return;
}
_gate.check();
if (auto it = _index.find(tidp); it != _index.end()) {
it->second->evict_up_to(o);
}
}

ss::future<> batch_cache::cleanup_index_entries() {
// NOTE: the memory is reclaimed asynchronously. In some cases
// the index may no longer reference any live entries. If this
Expand Down
3 changes: 3 additions & 0 deletions src/v/cloud_topics/batch_cache/batch_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class batch_cache {
std::optional<model::record_batch>
get(const model::topic_id_partition& tidp, model::offset o);

// Evict all cached entries for the partition with base_offset <= o.
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch_cache::evict_up_to is documented as evicting entries with base_offset <= o, but under the hood it delegates to storage::batch_cache_index::evict_up_to, which evicts whole ranges. If multiple batches were coalesced into the same range, this can evict additional batches with base_offset > o. Please adjust the comment to reflect the range-granularity behavior so callers have an accurate contract.

Suggested change
// Evict all cached entries for the partition with base_offset <= o.
// Evict cached entries for the partition up to offset `o`. Eviction is
// performed at the granularity of coalesced index ranges, so if multiple
// batches share a range, this may evict some batches with base_offset > o.

Copilot uses AI. Check for mistakes.
void evict_up_to(const model::topic_id_partition& tidp, model::offset o);

private:
// Remove dead index entries
ss::future<> cleanup_index_entries();
Expand Down
95 changes: 95 additions & 0 deletions src/v/cloud_topics/batch_cache/tests/batch_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,98 @@ TEST_F(batch_cache_test_fixture, test_batch_cache_topic_recreation) {
retrieved1_again->header().record_count,
retrieved2->header().record_count);
}

// Create a single-record batch whose record is large enough (> 32KiB
// range_size) so each batch gets its own range in the cache. This prevents
// small-batch coalescing from invalidating unrelated entries when a shared
// range is evicted.
model::record_batch make_large_batch(model::offset o) {
model::test::record_batch_spec spec{
.offset = o,
.allow_compression = false,
.count = 1,
.record_sizes = {{33_KiB}},
};
return model::test::make_random_batch(spec);
}

TEST_F(batch_cache_test_fixture, test_evict_up_to_removes_matching_entries) {
auto tidp = model::topic_id_partition{
model::topic_id::create(), model::partition_id(0)};

auto batch0 = make_large_batch(model::offset(0));
auto batch1 = make_large_batch(model::offset(1));
auto batch2 = make_large_batch(model::offset(2));
_cache.put(tidp, batch0);
_cache.put(tidp, batch1);
_cache.put(tidp, batch2);

// All three should be retrievable
ASSERT_TRUE(_cache.get(tidp, model::offset(0)).has_value());
ASSERT_TRUE(_cache.get(tidp, model::offset(1)).has_value());
ASSERT_TRUE(_cache.get(tidp, model::offset(2)).has_value());

// Evict up to offset 1 (removes batches at 0 and 1)
_cache.evict_up_to(tidp, model::offset(1));

ASSERT_FALSE(_cache.get(tidp, model::offset(0)).has_value());
ASSERT_FALSE(_cache.get(tidp, model::offset(1)).has_value());
ASSERT_TRUE(_cache.get(tidp, model::offset(2)).has_value());
}

TEST_F(batch_cache_test_fixture, test_evict_up_to_all_entries) {
auto tidp = model::topic_id_partition{
model::topic_id::create(), model::partition_id(0)};

auto batch0 = make_large_batch(model::offset(0));
auto batch1 = make_large_batch(model::offset(1));
_cache.put(tidp, batch0);
_cache.put(tidp, batch1);

// Evict up to an offset beyond all entries
_cache.evict_up_to(tidp, model::offset(100));

ASSERT_FALSE(_cache.get(tidp, model::offset(0)).has_value());
ASSERT_FALSE(_cache.get(tidp, model::offset(1)).has_value());
}

TEST_F(batch_cache_test_fixture, test_evict_up_to_no_entries) {
auto tidp = model::topic_id_partition{
model::topic_id::create(), model::partition_id(0)};

auto batch0 = make_large_batch(model::offset(5));
_cache.put(tidp, batch0);

// Evict up to offset before any entry - nothing should be removed
_cache.evict_up_to(tidp, model::offset(3));

ASSERT_TRUE(_cache.get(tidp, model::offset(5)).has_value());
}

TEST_F(batch_cache_test_fixture, test_evict_up_to_nonexistent_tidp) {
auto tidp = model::topic_id_partition{
model::topic_id::create(), model::partition_id(0)};

// Evicting from a tidp that was never inserted should be a no-op
_cache.evict_up_to(tidp, model::offset(100));

ASSERT_FALSE(contains_tidp(tidp));
}

TEST_F(batch_cache_test_fixture, test_evict_up_to_does_not_affect_other_tidps) {
auto tidp1 = model::topic_id_partition{
model::topic_id::create(), model::partition_id(0)};
auto tidp2 = model::topic_id_partition{
model::topic_id::create(), model::partition_id(1)};

auto batch1 = make_large_batch(model::offset(0));
auto batch2 = make_large_batch(model::offset(0));
_cache.put(tidp1, batch1);
_cache.put(tidp2, batch2);

// Evict from tidp1 only
_cache.evict_up_to(tidp1, model::offset(100));

ASSERT_FALSE(_cache.get(tidp1, model::offset(0)).has_value());
ASSERT_TRUE(_cache.get(tidp2, model::offset(0)).has_value());
}
5 changes: 5 additions & 0 deletions src/v/cloud_topics/data_plane_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class data_plane_api {
/// Retrieve materialized record batch from cache
virtual std::optional<model::record_batch>
cache_get(const model::topic_id_partition&, model::offset o) = 0;

/// Evict cached entries with base_offset <= up_to
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cache_evict is documented as evicting entries with base_offset <= up_to, but the backing cache evicts at range granularity (small batches may be coalesced into the same range). In practice this can evict entries with base_offset > up_to as collateral. Please clarify the contract here as well so API users understand the semantics.

Suggested change
/// Evict cached entries with base_offset <= up_to
/// Evict cached entries whose base_offset is less than or equal to
/// `up_to`. Implementations typically evict at range granularity, and
/// multiple batches may share the same cached range. As a result, this
/// call may also evict entries with base_offset > `up_to` that fall into
/// an evicted range; callers must not rely on such entries remaining
/// cached after this call.

Copilot uses AI. Check for mistakes.
virtual void
cache_evict(const model::topic_id_partition&, model::offset up_to)
= 0;
};

} // namespace cloud_topics
5 changes: 5 additions & 0 deletions src/v/cloud_topics/data_plane_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ class impl
return _batch_cache.local().get(tidp, o);
}

void cache_evict(
const model::topic_id_partition& tidp, model::offset up_to) final {
_batch_cache.local().evict_up_to(tidp, up_to);
}

size_t materialize_max_bytes() const final {
return _read_pipeline.local().memory_quota_capacity();
}
Expand Down
6 changes: 6 additions & 0 deletions src/v/cloud_topics/frontend/tests/frontend_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ class mock_api : public data_plane_api {
(const model::topic_id_partition&, model::offset o),
(override));

MOCK_METHOD(
void,
cache_evict,
(const model::topic_id_partition&, model::offset up_to),
(override));

MOCK_METHOD(size_t, materialize_max_bytes, (), (const, override));

MOCK_METHOD(ss::future<>, start, (), (override));
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_topics/reconciler/reconciler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ ss::future<std::expected<void, reconcile_error>> reconciler::commit_objects(
auto result = co_await commit.source->set_last_reconciled_offset(
lro, _as);
if (result.has_value()) {
commit.source->invalidate_cache(lro);
vlog(
lg.debug,
"successfully bumped LRO for {} (tidp: {}) to {}",
Expand Down
6 changes: 6 additions & 0 deletions src/v/cloud_topics/reconciler/reconciliation_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class l0_source : public source {
data_plane_api* dp_api,
ss::lw_shared_ptr<cluster::partition> partition)
: source(std::move(ntp), tidp)
, _dp_api(dp_api)
, _fe(ss::make_lw_shared<frontend>(partition, dp_api))
, _partition(std::move(partition)) {}

Expand Down Expand Up @@ -135,7 +136,12 @@ class l0_source : public source {
std::move(tracker), std::move(reader.reader)));
}

void invalidate_cache(kafka::offset up_to) override {
_dp_api->cache_evict(topic_id_partition(), kafka::offset_cast(up_to));
}

private:
data_plane_api* _dp_api;
ss::lw_shared_ptr<frontend> _fe;
ss::lw_shared_ptr<cluster::partition> _partition;
};
Expand Down
3 changes: 3 additions & 0 deletions src/v/cloud_topics/reconciler/reconciliation_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class source {
virtual ss::future<model::record_batch_reader>
make_reader(reader_config) = 0;

// Invalidate cached batches up to the given offset.
virtual void invalidate_cache(kafka::offset) = 0;

private:
model::ntp _ntp;
model::topic_id_partition _tidp;
Expand Down
58 changes: 58 additions & 0 deletions src/v/cloud_topics/reconciler/tests/reconciler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -576,3 +576,61 @@ TEST_F(ReconcilerTest, OffsetAndTimestampTracking) {
EXPECT_FALSE(obj_beyond_ts.has_value());
EXPECT_EQ(obj_beyond_ts.error(), l1::metastore::errc::out_of_range);
}

TEST_F(ReconcilerTest, CacheEvictedAfterReconciliation) {
auto src = add_source();
src->add_batch({.count = 10});
src->add_batch({.count = 10});

EXPECT_FALSE(src->last_cache_evict().has_value());

reconcile();

EXPECT_EQ(src->last_reconciled_offset(), kafka::offset{19});
// invalidate_cache should have been called with the LRO
EXPECT_THAT(src->last_cache_evict(), Optional(kafka::offset{19}));
}

TEST_F(ReconcilerTest, CacheEvictedMultipleRounds) {
auto src = add_source();
src->add_batch({.count = 10});

reconcile();

EXPECT_THAT(src->last_cache_evict(), Optional(kafka::offset{9}));

src->add_batch({.count = 10});

reconcile();

// After the second round, the eviction offset should advance.
EXPECT_THAT(src->last_cache_evict(), Optional(kafka::offset{19}));
}

TEST_F(ReconcilerTest, CacheNotEvictedOnLROFailure) {
auto src = add_source();
src->add_batch({.count = 10});
src->fail_set_lro(true);

reconcile();

// When set_last_reconciled_offset fails, invalidate_cache should not
// be called.
EXPECT_FALSE(src->last_cache_evict().has_value());
}

TEST_F(ReconcilerTest, CacheEvictedPerSourceIndependently) {
const model::topic tp{"tapioca"};
const model::topic_id tid = model::topic_id::create();

auto src1 = add_source(tp, tid);
auto src2 = add_source(tp, tid);

src1->add_batch({.count = 10});
src2->add_batch({.count = 20});

reconcile();

EXPECT_THAT(src1->last_cache_evict(), Optional(kafka::offset{9}));
EXPECT_THAT(src2->last_cache_evict(), Optional(kafka::offset{19}));
}
9 changes: 9 additions & 0 deletions src/v/cloud_topics/reconciler/tests/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ class fake_source : public source {
std::move(log));
}

void invalidate_cache(kafka::offset up_to) override {
_last_cache_evict = up_to;
}

std::optional<kafka::offset> last_cache_evict() const {
return _last_cache_evict;
}

void fail_set_lro(bool fail) { _fail_set_lro = fail; }
void fail_make_reader(bool fail) { _fail_make_reader = fail; }

Expand All @@ -87,6 +95,7 @@ class fake_source : public source {
chunked_vector<model::record_batch> _source_log;
bool _fail_set_lro = false;
bool _fail_make_reader = false;
std::optional<kafka::offset> _last_cache_evict;
};

class unreliable_metastore : public l1::simple_metastore {
Expand Down
15 changes: 15 additions & 0 deletions src/v/storage/batch_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,21 @@ void batch_cache_index::truncate(model::offset offset) {
}
}

void batch_cache_index::evict_up_to(model::offset offset) {
lock_guard lk(*this);

vassert(
_dirty_tracker.clean(),
"evict_up_to() with dirty data in the index ({}).",
*this);

auto end = _index.upper_bound(offset);
std::for_each(_index.begin(), end, [this](index_type::value_type& e) {
_cache->evict(std::move(e.second.range()));
});
_index.erase(_index.begin(), end);
}

void batch_cache_index::mark_clean(model::offset up_to_inclusive) {
lock_guard lk(*this);

Expand Down
5 changes: 5 additions & 0 deletions src/v/storage/batch_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,11 @@ class batch_cache_index {
*/
void truncate(model::offset offset);

/**
* Evicts all entries with base_offset <= the given offset.
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for evict_up_to implies only entries with base_offset <= offset are evicted, but the implementation evicts whole cache ranges. When multiple batches share a range (small-batch coalescing), evicting one entry’s range can also evict batches with base_offset > offset. Please clarify the contract in the comment (and/or rename) so callers don’t rely on exact per-entry semantics.

Suggested change
* Evicts all entries with base_offset <= the given offset.
* Evicts all cache entries whose base offset is less than or equal to the
* given offset. Eviction is performed on underlying cache ranges rather
* than individual batches, so when multiple batches share a cache range
* (for example due to small-batch coalescing), this may also evict some
* batches with base_offset > offset. Callers must not rely on precise
* per-entry eviction semantics.

Copilot uses AI. Check for mistakes.
*/
void evict_up_to(model::offset offset);

/**
* Marks the offsets as clean up to and including the specified offset.
* \param up_to_inclusive The offset, inclusive, up to which the cache
Expand Down
Loading