@@ -390,3 +390,88 @@ def test_partial_failure(self, cloud_storage_type: CloudStorageType):
390390 )
391391 errs = self .gc_pause (node_to_kill_id )
392392 assert len (errs ) == 0 , "Unexpected errors: {errs=}"
393+
394+
395+ class CloudTopicsL0GCMetricsTest (CloudTopicsL0GCTestBase ):
396+ """
397+ Integration: Basic semantics for some GC metrics
398+ """
399+
400+ def _get_int_metric (self , name : str ) -> list [int ]:
401+ samples = self .redpanda .metrics_sample (name )
402+ self .logger .debug (samples )
403+ assert samples is not None , "samples unexpectedly None"
404+ vals = [int (s .value ) for s in samples .samples ]
405+ return vals
406+
407+ def get_epoch_lag (self ) -> list [int ]:
408+ return self ._get_int_metric ("vectorized_cloud_topics_l0_gc_epoch_lag" )
409+
410+ def get_max_deleted_epoch (self ) -> list [int ]:
411+ return self ._get_int_metric ("vectorized_cloud_topics_l0_gc_max_deleted_epoch" )
412+
413+ def get_collection_rounds (self ) -> list [int ]:
414+ return self ._get_int_metric (
415+ "vectorized_cloud_topics_l0_gc_collection_rounds_total"
416+ )
417+
418+ def get_objects_listed (self ) -> list [int ]:
419+ return self ._get_int_metric (
420+ "vectorized_cloud_topics_l0_gc_objects_listed_total"
421+ )
422+
423+ @cluster (num_nodes = 5 )
424+ @matrix (
425+ cloud_storage_type = get_cloud_storage_type (applies_only_on = [CloudStorageType .S3 ])
426+ )
427+ def test_gc_metrics (self , cloud_storage_type : CloudStorageType ):
428+ self .topics = [TopicSpec (partition_count = 1 )]
429+ self .create_topics (self .topics )
430+ self .produce_some (topics = [spec .name for spec in self .topics ], n = 100 )
431+
432+ # GC should be completing collection rounds and scanning objects
433+ wait_until (
434+ lambda : sum (self .get_collection_rounds ()) > 0 ,
435+ timeout_sec = 30 ,
436+ backoff_sec = 2 ,
437+ retry_on_exc = True ,
438+ )
439+ self .logger .info (
440+ f"GC running, collection_rounds={ self .get_collection_rounds ()} "
441+ )
442+
443+ wait_until (
444+ lambda : sum (self .get_objects_listed ()) > 0 ,
445+ timeout_sec = 30 ,
446+ backoff_sec = 2 ,
447+ retry_on_exc = True ,
448+ )
449+ self .logger .info (
450+ f"GC scanning objects, objects_listed={ self .get_objects_listed ()} "
451+ )
452+
453+ # Wait for GC to start deleting - max_deleted_epoch should become >= 0 on some shard
454+ wait_until (
455+ lambda : any (e >= 0 for e in self .get_max_deleted_epoch ()),
456+ timeout_sec = 30 ,
457+ backoff_sec = 2 ,
458+ retry_on_exc = True ,
459+ )
460+
461+ initial_max_deleted = max (self .get_max_deleted_epoch ())
462+ self .logger .info (f"GC started deleting, { initial_max_deleted = } " )
463+
464+ # Produce more to create new epochs while GC is running
465+ self .produce_some (topics = [spec .name for spec in self .topics ], n = 100 )
466+
467+ # max_deleted_epoch should increase as GC makes progress
468+ wait_until (
469+ lambda : max (self .get_max_deleted_epoch ()) > initial_max_deleted ,
470+ timeout_sec = 30 ,
471+ backoff_sec = 2 ,
472+ retry_on_exc = True ,
473+ )
474+ self .logger .info (
475+ f"GC progressing, max_deleted_epoch increased to "
476+ f"{ max (self .get_max_deleted_epoch ())} "
477+ )
0 commit comments