Skip to content

Commit 1a8fcd2

Browse files
authored
Merge pull request #29809 from redpanda-data/l0-reader-fail-mat
ct/l0: allow materialization to skip missing extents
2 parents d439de7 + 86d70e6 commit 1a8fcd2

File tree

16 files changed

+217
-16
lines changed

16 files changed

+217
-16
lines changed

src/v/cloud_topics/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,13 @@ redpanda_cc_library(
4747
],
4848
visibility = [":__subpackages__"],
4949
deps = [
50+
"//src/v/base",
5051
"//src/v/random:generators",
5152
"//src/v/serde",
5253
"//src/v/utils:named_type",
5354
"//src/v/utils:uuid",
5455
"@fmt",
56+
"@seastar",
5557
],
5658
)
5759

@@ -257,6 +259,7 @@ redpanda_cc_library(
257259
],
258260
visibility = ["//visibility:public"],
259261
deps = [
262+
":types",
260263
"//src/v/base",
261264
"//src/v/model",
262265
"//src/v/utils:to_string",

src/v/cloud_topics/data_plane_api.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,16 @@ class data_plane_api {
7373

7474
// Materialize extents from the L0 read pipeline.
7575
// `output_size_estimate` must not exceed `materialize_max_bytes()`.
76+
// When `allow_mat_failure` is yes, download_not_found (404)
77+
// errors for individual extents are tolerated: the missing extents are
78+
// skipped and the result contains fewer batches than requested.
7679
virtual ss::future<result<chunked_vector<model::record_batch>>> materialize(
7780
model::ntp ntp,
7881
size_t output_size_estimate,
7982
chunked_vector<extent_meta> metadata,
8083
model::timeout_clock::time_point timeout,
81-
model::opt_abort_source_t)
84+
model::opt_abort_source_t,
85+
allow_materialization_failure allow_mat_failure)
8286
= 0;
8387

8488
/// Return the maximum bytes that may be requested in a single

src/v/cloud_topics/data_plane_impl.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ class impl
165165
size_t output_size_estimate,
166166
chunked_vector<extent_meta> metadata,
167167
model::timeout_clock::time_point timeout,
168-
model::opt_abort_source_t as) override {
168+
model::opt_abort_source_t as,
169+
allow_materialization_failure allow_mat_failure) override {
169170
if (metadata.empty()) {
170171
co_return chunked_vector<model::record_batch>{};
171172
}
@@ -183,6 +184,7 @@ class impl
183184
{
184185
.output_size_estimate = output_size_estimate,
185186
.meta = std::move(metadata),
187+
.allow_mat_failure = allow_mat_failure,
186188
},
187189
timeout,
188190
as);

src/v/cloud_topics/frontend/tests/frontend_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ class mock_api : public data_plane_api {
6363
size_t output_size_estimate,
6464
chunked_vector<extent_meta> metadata,
6565
model::timeout_clock::time_point timeout,
66-
model::opt_abort_source_t),
66+
model::opt_abort_source_t,
67+
allow_materialization_failure allow_mat_failure),
6768
(override));
6869

6970
MOCK_METHOD(

src/v/cloud_topics/level_zero/frontend_reader/level_zero_reader.cc

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,8 @@ level_zero_log_reader_impl::materialize_batches(
386386
materialize_bytes,
387387
std::move(to_materialize),
388388
deadline,
389-
_config.abort_source);
389+
_config.abort_source,
390+
_config.allow_mat_failure);
390391
if (!mat_res.has_value()) {
391392
if (mat_res.error() == errc::shutting_down) {
392393
vlog(_log.debug, "Materialize aborted due to shutdown");
@@ -402,7 +403,10 @@ level_zero_log_reader_impl::materialize_batches(
402403
mat_res.error().message()));
403404
}
404405
batches = std::move(mat_res.value());
405-
if (batches.size() != materialize_count) {
406+
auto count_ok = bool(_config.allow_mat_failure)
407+
? batches.size() <= materialize_count
408+
: batches.size() == materialize_count;
409+
if (!count_ok) {
406410
throw std::runtime_error(fmt_with_ctx(
407411
fmt::format,
408412
"Materialized unexpected number of batches: {}, expected: {}",
@@ -411,7 +415,10 @@ level_zero_log_reader_impl::materialize_batches(
411415
}
412416
}
413417
// Merge our selected subset of unhydrated batches with the materialized
414-
// batches, preserving control batches from the local log.
418+
// batches, preserving control batches from the local log. When
419+
// allow_mat_failure is set, some extents may have been skipped: the
420+
// materialized batches are a subsequence of the query (same offset
421+
// order), so sequential offset comparison identifies which were skipped.
415422
auto batches_it = batches.begin();
416423
chunked_circular_buffer<model::record_batch> hydrated;
417424
auto range_to_materialize = std::ranges::subrange(
@@ -421,10 +428,27 @@ level_zero_log_reader_impl::materialize_batches(
421428
_config.abort_source.value().get().check();
422429
}
423430
auto& local_batch_header = local_batch.header;
424-
model::record_batch batch = ss::visit(
431+
auto maybe_batch = ss::visit(
425432
local_batch.data,
426-
[this, &local_batch_header, &batches_it, &tidp](
427-
const cloud_topics::extent_meta&) {
433+
[this, &local_batch_header, &batches_it, &batches, &tidp](
434+
const cloud_topics::extent_meta& meta)
435+
-> std::optional<model::record_batch> {
436+
if (
437+
batches_it == batches.end()
438+
|| batches_it->base_offset()
439+
!= kafka::offset_cast(meta.base_offset)) {
440+
if (!bool(_config.allow_mat_failure)) {
441+
throw std::runtime_error(fmt_with_ctx(
442+
fmt::format,
443+
"Materialized batch offset mismatch: expected "
444+
"{}, got {}",
445+
kafka::offset_cast(meta.base_offset),
446+
batches_it == batches.end()
447+
? model::offset{}
448+
: batches_it->base_offset()));
449+
}
450+
return std::nullopt;
451+
}
428452
model::record_batch batch = apply_placeholder_to_batch(
429453
local_batch_header, std::move(*batches_it));
430454
++batches_it;
@@ -440,19 +464,23 @@ level_zero_log_reader_impl::materialize_batches(
440464
}
441465
return batch;
442466
},
443-
[&local_batch_header](local_log_batch::payload& payload) {
467+
[&local_batch_header](local_log_batch::payload& payload)
468+
-> std::optional<model::record_batch> {
444469
return model::record_batch(
445470
local_batch_header,
446471
std::move(payload),
447472
model::record_batch::tag_ctor_ng{});
448473
},
449-
[](local_log_batch::cached_batch& cb) {
474+
[](local_log_batch::cached_batch& cb)
475+
-> std::optional<model::record_batch> {
450476
// Cache hit resolved during the collection loop above.
451477
// The batch is already fully formed (apply_placeholder_to_batch
452478
// was applied before cache_put on the path that populated it).
453479
return std::move(cb.batch);
454480
});
455-
hydrated.push_back(std::move(batch));
481+
if (maybe_batch.has_value()) {
482+
hydrated.push_back(std::move(*maybe_batch));
483+
}
456484
co_await ss::coroutine::maybe_yield();
457485
}
458486
vassert(

src/v/cloud_topics/level_zero/pipeline/read_request.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ struct dataplane_query_result {
3838
struct dataplane_query {
3939
size_t output_size_estimate{0};
4040
chunked_vector<extent_meta> meta;
41+
allow_materialization_failure allow_mat_failure
42+
= allow_materialization_failure::no;
4143
};
4244

4345
// This object is created for every fetch request.

src/v/cloud_topics/level_zero/reader/fetch_request_handler.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,13 @@ ss::future<> fetch_handler::process_single_request(l0::read_request<>* req) {
105105
std::optional<chunked_vector<model::tx_range>> aborted_tx;
106106
try {
107107
auto meta = std::move(req->query.meta);
108-
109108
auto extent = co_await ss::coroutine::as_future(
110109
materialize_placeholders(
111110
_bucket,
112111
std::move(meta),
113112
*_remote,
114113
*_cache,
114+
req->query.allow_mat_failure,
115115
req->rtc,
116116
req->rtc_logger));
117117

src/v/cloud_topics/level_zero/reader/materialized_extent_reader.cc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ ss::future<result<chunked_vector<materialized_extent>>> materialize_sorted_run(
3737
cloud_storage_clients::bucket_name bucket,
3838
cloud_io::remote_api<>* api,
3939
cloud_io::basic_cache_service_api<>* cache,
40+
allow_materialization_failure allow_mat_failure,
4041
retry_chain_node* rtc,
4142
micro_probe* probe) {
4243
absl::node_hash_map<object_id, iobuf> hydrated;
@@ -54,6 +55,19 @@ ss::future<result<chunked_vector<materialized_extent>>> materialize_sorted_run(
5455
auto res = co_await materialize(
5556
&back, bucket, api, cache, rtc, probe);
5657
if (!res.has_value()) {
58+
if (
59+
bool(allow_mat_failure)
60+
&& res.error() == errc::download_not_found) {
61+
vlog(
62+
cd_log.warn,
63+
"Skipping extent {} (offsets {}~{}) due to missing "
64+
"object",
65+
back.meta.id,
66+
back.meta.base_offset,
67+
back.meta.last_offset);
68+
extents.pop_back();
69+
continue;
70+
}
5771
co_return res.error();
5872
}
5973
// If reading from cache (res.value() == true), only the
@@ -78,11 +92,12 @@ ss::future<materialize_result> materialize_placeholders(
7892
chunked_vector<extent_meta> query,
7993
cloud_io::remote_api<ss::lowres_clock>& api,
8094
cloud_io::basic_cache_service_api<ss::lowres_clock>& cache,
95+
allow_materialization_failure allow_mat_failure,
8196
retry_chain_node& rtc,
8297
retry_chain_logger& logger) {
8398
micro_probe probe;
8499
auto extents = co_await materialize_sorted_run(
85-
std::move(query), bucket, &api, &cache, &rtc, &probe);
100+
std::move(query), bucket, &api, &cache, allow_mat_failure, &rtc, &probe);
86101
if (!extents.has_value()) {
87102
vlog(
88103
logger.warn,

src/v/cloud_topics/level_zero/reader/materialized_extent_reader.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,20 @@ struct materialize_result {
3333
/// The method processes 'underlying' fully. The result is stored
3434
/// in memory so the caller should be careful with this param.
3535
///
36-
/// \param cfg is a log reader config
3736
/// \param bucket is a cloud storage bucket
3837
/// \param query is an array of extent_meta objects
3938
/// \param api is a cloud_io::remote instance
4039
/// \param cache is a cloud storage cache instance
40+
/// \param allow_mat_failure when yes, 404 errors for individual extents are
41+
/// tolerated and those extents are skipped
4142
/// \param rtc is a retry chain node to use
4243
/// \param rtc_logger is a logger that should track the progress
4344
ss::future<materialize_result> materialize_placeholders(
4445
cloud_storage_clients::bucket_name bucket,
4546
chunked_vector<extent_meta> query,
4647
cloud_io::remote_api<ss::lowres_clock>& api,
4748
cloud_io::basic_cache_service_api<ss::lowres_clock>& cache,
49+
allow_materialization_failure allow_mat_failure,
4850
retry_chain_node& rtc,
4951
retry_chain_logger& logger);
5052

src/v/cloud_topics/level_zero/reader/tests/materialized_extent_reader_test.cc

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ TEST_F_CORO(materialized_extent_fixture, full_scan_test) {
5454
std::move(underlying),
5555
remote,
5656
cache,
57+
cloud_topics::allow_materialization_failure::no,
5758
rtc,
5859
logger);
5960
ASSERT_EQ_CORO(actual.value().size(), expected.size());
@@ -103,6 +104,7 @@ ss::future<> test_aggregated_log_partial_scan(
103104
std::move(underlying),
104105
fx->remote,
105106
fx->cache,
107+
cloud_topics::allow_materialization_failure::no,
106108
rtc,
107109
logger);
108110

@@ -134,9 +136,123 @@ TEST_F_CORO(materialized_extent_fixture, timeout_test) {
134136
std::move(underlying),
135137
remote,
136138
cache,
139+
cloud_topics::allow_materialization_failure::no,
137140
rtc,
138141
logger);
139142

140143
ASSERT_TRUE_CORO(!actual.has_value());
141144
ASSERT_TRUE_CORO(actual.error() == cloud_topics::errc::timeout);
142145
}
146+
147+
TEST_F_CORO(
148+
materialized_extent_fixture, allow_materialization_failure_skips_notfound) {
149+
// When allow_materialization_failure is set and an extent's object returns
150+
// 404, that extent should be skipped and the result should be empty.
151+
co_await add_random_batches(1);
152+
153+
std::queue<injected_failure> failures;
154+
failures.push({.cloud_get = injected_cloud_get_failure::return_notfound});
155+
produce_placeholders(false, 1, std::move(failures));
156+
157+
auto underlying = convert_placeholders(make_underlying());
158+
ss::abort_source as;
159+
retry_chain_node rtc(as, 1s, 100ms);
160+
retry_chain_logger logger(test_log, rtc, "materialized_extent_reader_test");
161+
162+
auto [actual, probe] = co_await cloud_topics::l0::materialize_placeholders(
163+
cloud_storage_clients::bucket_name("test-bucket-name"),
164+
std::move(underlying),
165+
remote,
166+
cache,
167+
cloud_topics::allow_materialization_failure::yes,
168+
rtc,
169+
logger);
170+
171+
ASSERT_TRUE_CORO(actual.has_value());
172+
ASSERT_EQ_CORO(actual.value().size(), 0);
173+
}
174+
175+
TEST_F_CORO(
176+
materialized_extent_fixture, notfound_propagates_without_failure_flag) {
177+
// Without allow_materialization_failure, a 404 should propagate as an
178+
// error, same as any other download failure.
179+
co_await add_random_batches(1);
180+
181+
std::queue<injected_failure> failures;
182+
failures.push({.cloud_get = injected_cloud_get_failure::return_notfound});
183+
produce_placeholders(false, 1, std::move(failures));
184+
185+
auto underlying = convert_placeholders(make_underlying());
186+
ss::abort_source as;
187+
retry_chain_node rtc(as, 1s, 100ms);
188+
retry_chain_logger logger(test_log, rtc, "materialized_extent_reader_test");
189+
190+
auto [actual, probe] = co_await cloud_topics::l0::materialize_placeholders(
191+
cloud_storage_clients::bucket_name("test-bucket-name"),
192+
std::move(underlying),
193+
remote,
194+
cache,
195+
cloud_topics::allow_materialization_failure::no,
196+
rtc,
197+
logger);
198+
199+
ASSERT_TRUE_CORO(!actual.has_value());
200+
ASSERT_TRUE_CORO(actual.error() == cloud_topics::errc::download_not_found);
201+
}
202+
203+
TEST_F_CORO(
204+
materialized_extent_fixture, non_404_error_propagates_with_failure_flag) {
205+
// Only 404 errors are tolerated. A timeout should still propagate
206+
// even when allow_materialization_failure is set.
207+
co_await add_random_batches(1);
208+
209+
std::queue<injected_failure> failures;
210+
failures.push({.cloud_get = injected_cloud_get_failure::return_timeout});
211+
produce_placeholders(false, 1, std::move(failures));
212+
213+
auto underlying = convert_placeholders(make_underlying());
214+
ss::abort_source as;
215+
retry_chain_node rtc(as, 1s, 100ms);
216+
retry_chain_logger logger(test_log, rtc, "materialized_extent_reader_test");
217+
218+
auto [actual, probe] = co_await cloud_topics::l0::materialize_placeholders(
219+
cloud_storage_clients::bucket_name("test-bucket-name"),
220+
std::move(underlying),
221+
remote,
222+
cache,
223+
cloud_topics::allow_materialization_failure::yes,
224+
rtc,
225+
logger);
226+
227+
ASSERT_TRUE_CORO(!actual.has_value());
228+
ASSERT_TRUE_CORO(actual.error() == cloud_topics::errc::timeout);
229+
}
230+
231+
TEST_F_CORO(materialized_extent_fixture, skip_one_missing_extent_among_many) {
232+
// With multiple extents, only the missing one should be skipped.
233+
// The other extents should materialize normally.
234+
co_await add_random_batches(3);
235+
236+
// Single failure in the queue: exactly one of the three L0 objects
237+
// will return notfound, the other two will succeed.
238+
std::queue<injected_failure> failures;
239+
failures.push({.cloud_get = injected_cloud_get_failure::return_notfound});
240+
produce_placeholders(false, 1, std::move(failures));
241+
242+
auto underlying = convert_placeholders(make_underlying());
243+
ss::abort_source as;
244+
retry_chain_node rtc(as, 1s, 100ms);
245+
retry_chain_logger logger(test_log, rtc, "materialized_extent_reader_test");
246+
247+
auto [actual, probe] = co_await cloud_topics::l0::materialize_placeholders(
248+
cloud_storage_clients::bucket_name("test-bucket-name"),
249+
std::move(underlying),
250+
remote,
251+
cache,
252+
cloud_topics::allow_materialization_failure::yes,
253+
rtc,
254+
logger);
255+
256+
ASSERT_TRUE_CORO(actual.has_value());
257+
ASSERT_EQ_CORO(actual.value().size(), 2);
258+
}

0 commit comments

Comments
 (0)