1919#include " cluster/health_monitor_frontend.h"
2020#include " cluster/members_table.h"
2121#include " cluster/topic_table.h"
22+ #include " config/configuration.h"
2223#include " ssx/semaphore.h"
2324#include " ssx/work_queue.h"
2425
3233
3334namespace {
3435constexpr ss::lowres_clock::duration control_timeout = 5s;
35- }
36+ constexpr ss::lowres_clock::duration health_report_query_timeout = 10s;
37+ } // namespace
3638
3739namespace cloud_topics {
3840
@@ -484,7 +486,6 @@ class epoch_source_impl : public level_zero_gc::epoch_source {
484486 * Get a recent health report. Partitions use the health reporting
485487 * mechanism to self-report their max GC eligible epoch.
486488 */
487- constexpr auto health_report_query_timeout = 10s;
488489
489490 auto health_report
490491 = co_await health_monitor_->local ().get_cluster_health (
@@ -615,13 +616,82 @@ class node_info_impl : public level_zero_gc::node_info {
615616 seastar::sharded<cluster::members_table>* members_table_;
616617};
617618
619+ class cluster_safety_monitor : public level_zero_gc ::safety_monitor {
620+ public:
621+ explicit cluster_safety_monitor (
622+ seastar::sharded<cluster::health_monitor_frontend>* health_monitor,
623+ config::binding<std::chrono::milliseconds> check_interval)
624+ : health_monitor_(health_monitor)
625+ , check_interval_(std::move(check_interval))
626+ , cached_result_{.ok = false , .reason = " awaiting first health check" }
627+ , poll_loop_(do_poll_loop()) {}
628+
629+ result can_proceed () const override { return cached_result_; }
630+
631+ void start () override { started_ = true ; }
632+
633+ seastar::future<> stop () override {
634+ started_ = false ;
635+ as_.request_abort ();
636+ co_await std::exchange (poll_loop_, seastar::make_ready_future<>());
637+ }
638+
639+ private:
640+ seastar::future<> do_poll_loop () noexcept {
641+ while (!as_.abort_requested ()) {
642+ if (started_) {
643+ auto poll_fut = co_await ss::coroutine::as_future (
644+ poll_health ());
645+ if (poll_fut.failed ()) {
646+ auto ex = poll_fut.get_exception ();
647+ cached_result_ = result{
648+ .ok = false ,
649+ .reason = fmt::format (" health check failed: {}" , ex)};
650+ }
651+ }
652+
653+ auto sleep_fut = co_await seastar::coroutine::as_future (
654+ seastar::sleep_abortable (check_interval_ (), as_));
655+ if (sleep_fut.failed ()) {
656+ sleep_fut.ignore_ready_future ();
657+ break ;
658+ }
659+ }
660+ }
661+
662+ seastar::future<> poll_health () {
663+ auto overview
664+ = co_await health_monitor_->local ().get_cluster_health_overview (
665+ model::timeout_clock::now () + health_report_query_timeout);
666+
667+ if (overview.is_healthy ()) {
668+ cached_result_ = result{.ok = true , .reason = std::nullopt };
669+ } else {
670+ cached_result_ = result{
671+ .ok = false ,
672+ .reason = overview.unhealthy_reasons .empty ()
673+ ? " cluster unhealthy"
674+ : overview.unhealthy_reasons .front ()};
675+ }
676+ }
677+
678+ seastar::sharded<cluster::health_monitor_frontend>* health_monitor_;
679+ config::binding<std::chrono::milliseconds> check_interval_;
680+ result cached_result_;
681+ bool started_{false };
682+ seastar::abort_source as_;
683+ seastar::future<> poll_loop_;
684+ };
685+
618686level_zero_gc::level_zero_gc (
619687 level_zero_gc_config config,
620688 std::unique_ptr<object_storage> storage,
621689 std::unique_ptr<epoch_source> epoch_source,
622- std::unique_ptr<node_info> node_info)
690+ std::unique_ptr<node_info> node_info,
691+ std::unique_ptr<safety_monitor> safety_monitor)
623692 : config_(std::move(config))
624693 , epoch_source_(std::move(epoch_source))
694+ , safety_monitor_(std::move(safety_monitor))
625695 , should_run_(false ) // begin in a stopped state
626696 , should_shutdown_(false )
627697 , worker_(worker())
@@ -654,7 +724,11 @@ level_zero_gc::level_zero_gc(
654724 std::make_unique<object_storage_remote_impl>(remote, std::move(bucket)),
655725 std::make_unique<epoch_source_impl>(
656726 health_monitor, controller_stm, topic_table),
657- std::make_unique<node_info_impl>(self, members_table)) {}
727+ std::make_unique<node_info_impl>(self, members_table),
728+ std::make_unique<cluster_safety_monitor>(
729+ health_monitor,
730+ config::shard_local_cfg ()
731+ .cloud_topics_gc_health_check_interval.bind())) {}
658732
659733level_zero_gc::~level_zero_gc () = default ;
660734
@@ -665,6 +739,7 @@ seastar::future<> level_zero_gc::start() {
665739 }
666740 vlog (cd_log.info , " Starting cloud topics L0 GC worker" );
667741 delete_worker_->start ();
742+ safety_monitor_->start ();
668743 should_run_ = true ;
669744 worker_cv_.signal ();
670745}
@@ -687,6 +762,7 @@ seastar::future<> level_zero_gc::stop() {
687762 worker_cv_.signal ();
688763 co_await delete_worker_->stop ();
689764 co_await std::exchange (worker_, seastar::make_ready_future<>());
765+ co_await safety_monitor_->stop ();
690766 vlog (cd_log.info , " Stopped cloud_topics L0 GC worker" );
691767}
692768
@@ -732,6 +808,8 @@ std::string_view to_string_view(level_zero_gc::state s) {
732808 return " level_zero_gc::state::stopping" ;
733809 case stopped:
734810 return " level_zero_gc::state::stopped" ;
811+ case safety_blocked:
812+ return " level_zero_gc::state::safety_blocked" ;
735813 }
736814 vunreachable (" Unrecognized GC state: {}" , s);
737815}
@@ -746,7 +824,11 @@ auto level_zero_gc::get_state() const -> state {
746824 if (resetting_) {
747825 return state::resetting;
748826 }
749- return should_run_ ? state::running : state::paused;
827+ if (!should_run_) {
828+ return state::paused;
829+ }
830+ return safety_monitor_->can_proceed ().ok ? state::running
831+ : state::safety_blocked;
750832 }();
751833 vlog (cd_log.debug , " cloud_topics L0 GC worker state: {}" , st);
752834 return st;
@@ -779,6 +861,19 @@ seastar::future<> level_zero_gc::worker() {
779861 // ensure that the abort source is unreferenced at this time.
780862 asrc_ = {};
781863
864+ if (auto safety = safety_monitor_->can_proceed (); !safety.ok ) {
865+ vlog (
866+ cd_log.debug ,
867+ " L0 GC blocked by safety monitor: {}" ,
868+ safety.reason .value_or (" unknown" ));
869+ probe_.safety_blocked ();
870+ (co_await seastar::coroutine::as_future (
871+ seastar::sleep_abortable (
872+ config_.throttle_no_progress (), asrc_)))
873+ .ignore_ready_future ();
874+ continue ;
875+ }
876+
782877 if (backoff.count () > 0 ) {
783878 auto t0 = ss::lowres_clock::now ();
784879 (co_await seastar::coroutine::as_future (
0 commit comments