Skip to content

Commit 82b20d2

Browse files
committed
datalake/coordinator: add partition overrides to reset
In some cases we want to manually choose from where translation should resume after a coordinator reset. Add per-partition last_committed overrides and a reset_all_partitions flag. By default reset_all_partitions is false and the request is a no-op unless partition_overrides are provided — only the listed partitions have their pending entries cleared and last_committed set. Setting reset_all_partitions to true clears all partitions first, then applies any overrides. This default-safe design minimizes surprise and makes it harder to accidentally wipe state.
1 parent c6f5621 commit 82b20d2

File tree

17 files changed

+338
-40
lines changed

17 files changed

+338
-40
lines changed

proto/redpanda/core/admin/internal/datalake/v1/datalake.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,19 @@ message GetCoordinatorStateResponse {
5151
CoordinatorState state = 1;
5252
}
5353

54+
message PartitionStateOverride {
55+
optional int64 last_committed = 1;
56+
}
57+
58+
// When reset_all_partitions is true, clears pending entries and
59+
// last_committed for all partitions, then applies overrides. When false,
60+
// clears pending entries only for partitions in partition_overrides and
61+
// applies their overrides. No-op when false and partition_overrides is empty.
5462
message CoordinatorResetTopicStateRequest {
5563
string topic_name = 1;
5664
int64 revision = 2;
5765
bool reset_all_partitions = 3;
66+
map<int32, PartitionStateOverride> partition_overrides = 4;
5867
}
5968

6069
message CoordinatorResetTopicStateResponse {}

src/v/datalake/coordinator/BUILD

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,13 @@ redpanda_cc_library(
5151
],
5252
deps = [
5353
":file_committer",
54+
":partition_state_override",
5455
":snapshot_remover",
5556
":state_update",
5657
":stm",
5758
"//src/v/cluster",
5859
"//src/v/config",
60+
"//src/v/container:chunked_hash_map",
5961
"//src/v/container:chunked_vector",
6062
"//src/v/model",
6163
"@abseil-cpp//absl/hash",
@@ -253,6 +255,21 @@ redpanda_cc_library(
253255
],
254256
)
255257

258+
redpanda_cc_library(
259+
name = "partition_state_override",
260+
srcs = [
261+
"partition_state_override.cc",
262+
],
263+
hdrs = [
264+
"partition_state_override.h",
265+
],
266+
deps = [
267+
"//src/v/model",
268+
"//src/v/serde",
269+
"@fmt",
270+
],
271+
)
272+
256273
redpanda_cc_library(
257274
name = "state",
258275
srcs = [
@@ -281,6 +298,7 @@ redpanda_cc_library(
281298
],
282299
visibility = ["//visibility:public"],
283300
deps = [
301+
":partition_state_override",
284302
":translated_offset_range",
285303
"//src/v/datalake:schema_identifier",
286304
"//src/v/datalake:types",
@@ -333,9 +351,11 @@ redpanda_cc_library(
333351
"state_update.h",
334352
],
335353
deps = [
354+
":partition_state_override",
336355
":state",
337356
":translated_offset_range",
338357
"//src/v/base",
358+
"//src/v/container:chunked_hash_map",
339359
"//src/v/container:chunked_vector",
340360
"//src/v/iceberg:manifest_entry",
341361
"//src/v/model",

src/v/datalake/coordinator/coordinator.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -889,7 +889,9 @@ ss::future<checked<void, coordinator::errc>>
889889
coordinator::sync_reset_topic_state(
890890
model::topic topic,
891891
model::revision_id topic_revision,
892-
bool reset_all_partitions) {
892+
bool reset_all_partitions,
893+
chunked_hash_map<model::partition_id, partition_state_override>
894+
partition_overrides) {
893895
auto gate = maybe_gate();
894896
if (gate.has_error()) {
895897
co_return gate.error();
@@ -905,6 +907,7 @@ coordinator::sync_reset_topic_state(
905907
.topic = topic,
906908
.topic_revision = topic_revision,
907909
.reset_all_partitions = reset_all_partitions,
910+
.partition_overrides = std::move(partition_overrides),
908911
};
909912
auto check_res = update.can_apply(stm_->state());
910913
if (check_res.has_error()) {

src/v/datalake/coordinator/coordinator.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
#include "absl/hash/hash.h"
1313
#include "cluster/fwd.h"
1414
#include "config/property.h"
15+
#include "container/chunked_hash_map.h"
1516
#include "container/chunked_vector.h"
1617
#include "datalake/coordinator/file_committer.h"
18+
#include "datalake/coordinator/partition_state_override.h"
1719
#include "datalake/coordinator/snapshot_remover.h"
1820
#include "datalake/coordinator/state_machine.h"
1921
#include "datalake/fwd.h"
@@ -92,7 +94,9 @@ class coordinator {
9294
ss::future<checked<void, errc>> sync_reset_topic_state(
9395
model::topic topic,
9496
model::revision_id topic_rev,
95-
bool reset_all_partitions);
97+
bool reset_all_partitions,
98+
chunked_hash_map<model::partition_id, partition_state_override>
99+
partition_overrides);
96100

97101
void notify_leadership(std::optional<model::node_id>);
98102

src/v/datalake/coordinator/frontend.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,8 @@ ss::future<reset_topic_state_reply> frontend::reset_topic_state_locally(
517517
->sync_reset_topic_state(
518518
request.topic,
519519
request.topic_revision,
520-
request.reset_all_partitions)
520+
request.reset_all_partitions,
521+
std::move(request.partition_overrides))
521522
.then([](auto result) {
522523
reset_topic_state_reply resp{};
523524
if (result.has_error()) {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2026 Redpanda Data, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
11+
#include "datalake/coordinator/partition_state_override.h"
12+
13+
#include <fmt/ostream.h>
14+
15+
namespace datalake::coordinator {
16+
17+
std::ostream& operator<<(std::ostream& o, const partition_state_override& p) {
18+
if (p.last_committed.has_value()) {
19+
fmt::print(o, "{{last_committed: {}}}", p.last_committed.value());
20+
} else {
21+
fmt::print(o, "{{last_committed: nullopt}}");
22+
}
23+
return o;
24+
}
25+
26+
} // namespace datalake::coordinator
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2026 Redpanda Data, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
11+
#pragma once
12+
13+
#include "model/fundamental.h"
14+
#include "serde/envelope.h"
15+
16+
namespace datalake::coordinator {
17+
18+
struct partition_state_override
19+
: public serde::envelope<
20+
partition_state_override,
21+
serde::version<0>,
22+
serde::compat_version<0>> {
23+
std::optional<kafka::offset> last_committed;
24+
25+
auto serde_fields() { return std::tie(last_committed); }
26+
27+
friend std::ostream&
28+
operator<<(std::ostream&, const partition_state_override&);
29+
};
30+
31+
} // namespace datalake::coordinator

src/v/datalake/coordinator/state_update.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,13 @@ reset_topic_state_update::apply(topics_state& state) {
324324
if (reset_all_partitions) {
325325
t_state.pid_to_pending_files.clear();
326326
}
327+
for (auto& [pid, po] : partition_overrides) {
328+
auto& ps = t_state.pid_to_pending_files[pid];
329+
ps.pending_entries.clear();
330+
if (po.last_committed.has_value()) {
331+
ps.last_committed = po.last_committed.value();
332+
}
333+
}
327334
return outcome::success();
328335
}
329336

@@ -368,10 +375,12 @@ std::ostream& operator<<(std::ostream& o, const topic_lifecycle_update& u) {
368375
std::ostream& operator<<(std::ostream& o, const reset_topic_state_update& u) {
369376
fmt::print(
370377
o,
371-
"{{topic: {}, revision: {}, reset_all_partitions: {}}}",
378+
"{{topic: {}, revision: {}, reset_all_partitions: {}, "
379+
"partition_overrides: {} entries}}",
372380
u.topic,
373381
u.topic_revision,
374-
u.reset_all_partitions);
382+
u.reset_all_partitions,
383+
u.partition_overrides.size());
375384
return o;
376385
}
377386

src/v/datalake/coordinator/state_update.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
#pragma once
1111

1212
#include "base/outcome.h"
13+
#include "container/chunked_hash_map.h"
1314
#include "container/chunked_vector.h"
15+
#include "datalake/coordinator/partition_state_override.h"
1416
#include "datalake/coordinator/state.h"
1517
#include "datalake/coordinator/translated_offset_range.h"
1618
#include "iceberg/manifest_entry.h"
@@ -127,9 +129,12 @@ struct reset_topic_state_update
127129
model::topic topic;
128130
model::revision_id topic_revision;
129131
bool reset_all_partitions{false};
132+
chunked_hash_map<model::partition_id, partition_state_override>
133+
partition_overrides;
130134

131135
auto serde_fields() {
132-
return std::tie(topic, topic_revision, reset_all_partitions);
136+
return std::tie(
137+
topic, topic_revision, reset_all_partitions, partition_overrides);
133138
}
134139

135140
checked<void, stm_update_error> can_apply(const topics_state&);

src/v/datalake/coordinator/tests/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,9 @@ redpanda_cc_gtest(
122122
],
123123
deps = [
124124
":state_test_utils",
125+
"//src/v/container:chunked_hash_map",
125126
"//src/v/container:chunked_vector",
127+
"//src/v/datalake/coordinator:partition_state_override",
126128
"//src/v/datalake/coordinator:state",
127129
"//src/v/datalake/coordinator:state_update",
128130
"//src/v/datalake/coordinator:translated_offset_range",

0 commit comments

Comments
 (0)