datalake/coordinator: add reset topic state escape hatch#29596
datalake/coordinator: add reset topic state escape hatch#29596nvartolomei merged 2 commits intoredpanda-data:devfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an admin “escape hatch” to reset datalake coordinator per-topic state, including optional per-partition last_committed overrides, and validates it with unit + e2e coverage.
Changes:
- Add
CoordinatorResetStateadmin RPC + generated client bindings and protobuf types. - Implement coordinator/frontend/state-machine support for
reset_topic_stateupdates and overrides. - Add coordinator reset unit tests and a new datalake e2e test exercising reset behavior.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/rptest/tests/datalake/datalake_e2e_test.py | Adds an end-to-end test for coordinator reset semantics and overrides. |
| tests/rptest/clients/types.py | Adds a TopicSpec config key for iceberg partition spec used by tests. |
| tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2_connect.py | Adds client stubs for the new CoordinatorResetState RPC. |
| tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.pyi | Adds typing stubs for new protobuf messages. |
| tests/rptest/clients/admin/proto/redpanda/core/admin/internal/datalake/v1/datalake_pb2.py | Updates generated protobuf module with new messages + service method. |
| src/v/redpanda/admin/services/datalake/datalake.h | Exposes coordinator_reset_state handler in admin service impl. |
| src/v/redpanda/admin/services/datalake/datalake.cc | Implements admin RPC handler calling coordinator frontend reset. |
| src/v/datalake/coordinator/types.h | Adds RPC request/reply types for reset topic state + overrides. |
| src/v/datalake/coordinator/types.cc | Adds logging formatter for reset reply. |
| src/v/datalake/coordinator/tests/state_update_test.cc | Adds unit tests for reset update behavior. |
| src/v/datalake/coordinator/tests/BUILD | Adds deps for new reset/override types in tests. |
| src/v/datalake/coordinator/state_update.h | Adds reset_topic_state update key + update struct. |
| src/v/datalake/coordinator/state_update.cc | Implements apply/can_apply + logging for reset update. |
| src/v/datalake/coordinator/state_machine.cc | Applies the new reset update key in the STM. |
| src/v/datalake/coordinator/service.h | Adds RPC endpoint for reset topic state. |
| src/v/datalake/coordinator/service.cc | Implements RPC dispatch into frontend reset handler. |
| src/v/datalake/coordinator/rpc.json | Declares new RPC method for codegen. |
| src/v/datalake/coordinator/partition_state_override.h | Introduces the override struct for per-partition fields. |
| src/v/datalake/coordinator/partition_state_override.cc | Implements logging formatter for overrides. |
| src/v/datalake/coordinator/frontend.h | Adds frontend reset API + local execution helper. |
| src/v/datalake/coordinator/frontend.cc | Implements frontend reset path and coordinator-manager invocation. |
| src/v/datalake/coordinator/coordinator.h | Adds coordinator sync API to replicate reset updates. |
| src/v/datalake/coordinator/coordinator.cc | Implements STM replication of the reset update. |
| src/v/datalake/coordinator/BUILD | Adds build targets/deps for new override type. |
| proto/redpanda/core/admin/internal/datalake/v1/datalake.proto | Adds protobuf messages + admin RPC definition. |
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
Operators may need to clear pending files from the datalake coordinator when Iceberg catalog state becomes inconsistent (e.g. after manual catalog modifications) which can result in stuck coordinator. The new CoordinatorResetState RPC is exposed via the admin API with SUPERUSER authorization and plumbed through the coordinator frontend, RPC service, and state machine.
ccc3337 to
c8b0b7a
Compare
andrwng
left a comment
There was a problem hiding this comment.
Nice, LGTM! One nit/question about the test, but doesn't need to block
c8b0b7a to
715890a
Compare
|
/ci-repeat 1 |
715890a to
82b20d2
Compare
oleiman
left a comment
There was a problem hiding this comment.
lgtm. i have one question about the override logic.
| auto check_res = update.can_apply(stm_->state()); | ||
| if (check_res.has_error()) { | ||
| vlog( | ||
| datalake_log.debug, |
There was a problem hiding this comment.
Copy pasted. Can address in follow up.
| if (p.last_committed.has_value()) { | ||
| fmt::print(o, "{{last_committed: {}}}", p.last_committed.value()); | ||
| } else { | ||
| fmt::print(o, "{{last_committed: nullopt}}"); | ||
| } |
There was a problem hiding this comment.
nitpick/fyi: I think just formatting the optional will do the right thing, no?
There was a problem hiding this comment.
It doesn't work without additional includes here and I refused to add them because it is a hack and we'll have to get rid of it soon-ish.
This violates C++ standard guidelines https://nvartolomei.com/cpp-code-hygiene/#specializing-standard-library-know-your-boundaries.
In newer fmt, ostream formatters aren't used afaik. Preferred correct code rather than yoloing.
There was a problem hiding this comment.
missed that it was an ostream operator... why are we even writing one, exactly?
|
/ci-repeat 1 |
Retry command for Build#80732please wait until all jobs are finished before running the slash command |
|
/ci-repeat 1 |
In some cases we want to manually choose from where translation should resume after a coordinator reset. Add per-partition last_committed overrides and a reset_all_partitions flag. By default reset_all_partitions is false and the request is a no-op unless partition_overrides are provided — only the listed partitions have their pending entries cleared and last_committed set. Setting reset_all_partitions to true clears all partitions first, then applies any overrides. This default-safe design minimizes surprise and makes it harder to accidentally wipe state.
82b20d2 to
f44ca9d
Compare
Backports Required
Release Notes