diff --git a/src/v/cloud_topics/level_one/compaction/sink.cc b/src/v/cloud_topics/level_one/compaction/sink.cc index fc6076e944dc6..9456709412e41 100644 --- a/src/v/cloud_topics/level_one/compaction/sink.cc +++ b/src/v/cloud_topics/level_one/compaction/sink.cc @@ -112,6 +112,7 @@ compaction_sink::initialize(compaction::sliding_window_reducer::source& src) { auto& new_cleaned_ranges = ct_src._new_cleaned_ranges; new_cleaned_ranges.shrink_to_fit(); + std::ranges::reverse(new_cleaned_ranges); _new_cleaned_ranges = std::move(new_cleaned_ranges); vlog( diff --git a/src/v/cloud_topics/level_one/compaction/source.cc b/src/v/cloud_topics/level_one/compaction/source.cc index d93bde747572c..e5f63672b1d7e 100644 --- a/src/v/cloud_topics/level_one/compaction/source.cc +++ b/src/v/cloud_topics/level_one/compaction/source.cc @@ -270,6 +270,13 @@ ss::future compaction_source::deduplication_iteration( if (extent.last_offset > _max_compactible_offset) { // We have iterated to an extent we cannot compact, stop compaction // here. + vlog( + compaction_log.debug, + "Extent offset range ({}~{}) spans above max compactible offset " + "({}), stopping deduplication iteration here.", + extent.base_offset, + extent.last_offset, + _max_compactible_offset); co_return ss::stop_iteration::yes; } diff --git a/src/v/cloud_topics/level_one/domain/BUILD b/src/v/cloud_topics/level_one/domain/BUILD index 08a3520de7268..40ab210fc119a 100644 --- a/src/v/cloud_topics/level_one/domain/BUILD +++ b/src/v/cloud_topics/level_one/domain/BUILD @@ -79,6 +79,7 @@ redpanda_cc_library( implementation_deps = [ "//src/v/cloud_topics:logger", "//src/v/cloud_topics/level_one/common:object_id", + "//src/v/cloud_topics/level_one/metastore/lsm:db_debug", "//src/v/cloud_topics/level_one/metastore/lsm:garbage_collector", "//src/v/cloud_topics/level_one/metastore/lsm:keys", "//src/v/cloud_topics/level_one/metastore/lsm:state_reader", diff --git a/src/v/cloud_topics/level_one/domain/db_domain_manager.cc b/src/v/cloud_topics/level_one/domain/db_domain_manager.cc index 36042b3953f3c..153555fab4ab9 100644 --- a/src/v/cloud_topics/level_one/domain/db_domain_manager.cc +++ b/src/v/cloud_topics/level_one/domain/db_domain_manager.cc @@ -10,6 +10,7 @@ #include "cloud_topics/level_one/domain/db_domain_manager.h" #include "cloud_topics/level_one/common/object_id.h" +#include "cloud_topics/level_one/metastore/lsm/db_debug.h" #include "cloud_topics/level_one/metastore/lsm/garbage_collector.h" #include "cloud_topics/level_one/metastore/lsm/keys.h" #include "cloud_topics/level_one/metastore/lsm/state_reader.h" @@ -218,6 +219,7 @@ db_domain_manager::replace_objects(rpc::replace_objects_request req) { const auto& p = tp.partition; req_compaction_updates[t][p] = std::move(update); } + vlog(cd_log.debug, "Compaction updates: {}", req_compaction_updates); auto update = replace_objects_db_update{ .new_objects = std::move(req.new_objects), .compaction_updates = std::move(req_compaction_updates), @@ -228,7 +230,9 @@ db_domain_manager::replace_objects(rpc::replace_objects_request req) { .ec = gl_res.error(), }; } - auto reader = state_reader(db_->db().create_snapshot()); + auto snap = db_->db().create_snapshot(); + co_await dump_partition_state(snap); + auto reader = state_reader(std::move(snap)); chunked_vector rows; auto build_res = co_await update.build_rows(reader, rows); if (!build_res.has_value()) { @@ -243,6 +247,8 @@ db_domain_manager::replace_objects(rpc::replace_objects_request req) { .ec = apply_res.error(), }; } + auto post_snap = db_->db().create_snapshot(); + co_await dump_partition_state(post_snap); co_return rpc::replace_objects_reply{ .ec = rpc::errc::ok, }; diff --git a/src/v/cloud_topics/level_one/metastore/BUILD b/src/v/cloud_topics/level_one/metastore/BUILD index 654ea98231988..09dfe76e35ae1 100644 --- a/src/v/cloud_topics/level_one/metastore/BUILD +++ b/src/v/cloud_topics/level_one/metastore/BUILD @@ -274,12 +274,17 @@ redpanda_cc_library( hdrs = [ "state_update_utils.h", ], + implementation_deps = [ + "//src/v/cloud_topics:logger", + ], visibility = ["//visibility:public"], deps = [ ":offset_interval_set", ":state", + "//src/v/base", "//src/v/container:chunked_hash_map", "//src/v/model", + "//src/v/utils:to_string", ], ) diff --git a/src/v/cloud_topics/level_one/metastore/lsm/BUILD b/src/v/cloud_topics/level_one/metastore/lsm/BUILD index 0ecb7323e598c..afdb15d5a3f70 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/BUILD +++ b/src/v/cloud_topics/level_one/metastore/lsm/BUILD @@ -214,6 +214,7 @@ redpanda_cc_library( implementation_deps = [ ":keys", ":values", + "//src/v/cloud_topics:logger", "//src/v/cloud_topics/level_one/metastore:state_update_utils", ], visibility = ["//visibility:public"], @@ -241,6 +242,27 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "db_debug", + srcs = [ + "db_debug.cc", + ], + hdrs = [ + "db_debug.h", + ], + implementation_deps = [ + ":keys", + ":values", + "//src/v/cloud_topics:logger", + "//src/v/serde", + ], + visibility = ["//visibility:public"], + deps = [ + "//src/v/lsm", + "@seastar", + ], +) + redpanda_cc_library( name = "garbage_collector", srcs = [ diff --git a/src/v/cloud_topics/level_one/metastore/lsm/db_debug.cc b/src/v/cloud_topics/level_one/metastore/lsm/db_debug.cc new file mode 100644 index 0000000000000..e04b161934cbe --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/db_debug.cc @@ -0,0 +1,130 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "cloud_topics/level_one/metastore/lsm/db_debug.h" + +#include "cloud_topics/level_one/metastore/lsm/keys.h" +#include "cloud_topics/level_one/metastore/lsm/values.h" +#include "cloud_topics/logger.h" +#include "serde/rw/rw.h" + +#include + +namespace cloud_topics::l1 { + +ss::future<> dump_partition_state(lsm::snapshot& snap) { + if (!cd_log.is_enabled(ss::log_level::debug)) { + co_return; + } + auto iter = co_await snap.create_iterator(); + co_await iter.seek_to_first(); + while (iter.valid()) { + auto key_str = iter.key(); + auto val_buf = iter.value(); + bool is_tombstone = val_buf.empty(); + + if (auto mk = metadata_row_key::decode(key_str)) { + if (is_tombstone) { + vlog(cd_log.debug, "metadata {}: ", mk->tidp); + } else { + auto val = serde::from_iobuf( + std::move(val_buf)); + vlog( + cd_log.debug, + "metadata {}: start={}, next={}, size={}, " + "compaction_epoch={}", + mk->tidp, + val.start_offset, + val.next_offset, + val.size, + val.compaction_epoch); + } + } else if (auto ek = extent_row_key::decode(key_str)) { + if (is_tombstone) { + vlog( + cd_log.debug, + " extent {}: base={} ", + ek->tidp, + ek->base_offset); + } else { + auto val = serde::from_iobuf( + std::move(val_buf)); + vlog( + cd_log.debug, + " extent {}: base={}, last={}, ts={}, filepos={}, " + "len={}, oid={}", + ek->tidp, + ek->base_offset, + val.last_offset, + val.max_timestamp, + val.filepos, + val.len, + val.oid); + } + } else if (auto tk = term_row_key::decode(key_str)) { + if (is_tombstone) { + vlog( + cd_log.debug, + " term {}: term={} ", + tk->tidp, + tk->term); + } else { + auto val = serde::from_iobuf( + std::move(val_buf)); + vlog( + cd_log.debug, + " term {}: term={}, start_offset={}", + tk->tidp, + tk->term, + val.term_start_offset); + } + } else if (auto ck = compaction_row_key::decode(key_str)) { + if (is_tombstone) { + vlog(cd_log.debug, " compaction {}: ", ck->tidp); + } else { + auto val = serde::from_iobuf( + std::move(val_buf)); + vlog( + cd_log.debug, + " compaction {}: cleaned_ranges={}, " + "cleaned_ranges_with_tombstones={}", + ck->tidp, + val.state.cleaned_ranges, + val.state.cleaned_ranges_with_tombstones.size()); + } + } else if (auto ok = object_row_key::decode(key_str)) { + if (is_tombstone) { + vlog(cd_log.debug, " object {}: ", ok->oid); + } else { + auto val = serde::from_iobuf( + std::move(val_buf)); + vlog( + cd_log.debug, + " object {}: total_data_size={}, " + "removed_data_size={}, footer_pos={}, object_size={}", + ok->oid, + val.object.total_data_size, + val.object.removed_data_size, + val.object.footer_pos, + val.object.object_size); + } + } else { + vlog( + cd_log.debug, + "unknown key: {} (len={}, value_len={})", + key_str, + key_str.size(), + val_buf.size_bytes()); + } + + co_await iter.next(); + } +} + +} // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/db_debug.h b/src/v/cloud_topics/level_one/metastore/lsm/db_debug.h new file mode 100644 index 0000000000000..4f463b83e65bc --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/db_debug.h @@ -0,0 +1,23 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "lsm/lsm.h" + +#include + +namespace cloud_topics::l1 { + +/// Iterates all rows in the given database snapshot and logs each partition's +/// metadata and extents at debug level. Directly decodes keys and values from +/// the raw iterator. +ss::future<> dump_partition_state(lsm::snapshot& snap); + +} // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/state_update.cc b/src/v/cloud_topics/level_one/metastore/lsm/state_update.cc index f6824327293a9..ddb6ac0e406b6 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/state_update.cc +++ b/src/v/cloud_topics/level_one/metastore/lsm/state_update.cc @@ -12,6 +12,7 @@ #include "cloud_topics/level_one/metastore/lsm/keys.h" #include "cloud_topics/level_one/metastore/lsm/values.h" #include "cloud_topics/level_one/metastore/state_update.h" +#include "cloud_topics/logger.h" #include #include @@ -607,6 +608,17 @@ replace_objects_db_update::build_rows( co_return std::unexpected(std::move(new_extents_res.error())); } + for (const auto& [tidp, extents] : new_extents_by_tp) { + vlog( + cd_log.debug, + "replace_objects: new extents for {}: count={}", + tidp, + extents.size()); + for (const auto& e : extents) { + vlog(cd_log.debug, "replace_objects: {}", e); + } + } + // Calculate contiguous intervals and validate that they align with // appropriate extents. auto contiguous_intervals_res = contiguous_intervals_for_extents( @@ -616,6 +628,16 @@ replace_objects_db_update::build_rows( invalid_input, std::move(contiguous_intervals_res.error()))); } const auto& contiguous_intervals_by_tp = contiguous_intervals_res.value(); + for (const auto& [tidp, intervals] : contiguous_intervals_by_tp) { + for (const auto& iv : intervals) { + vlog( + cd_log.debug, + "replace_objects: contiguous interval for {}: [{}, {}]", + tidp, + iv.base_offset, + iv.last_offset); + } + } chunked_hash_map old_extent_sizes_by_oid; chunked_hash_map> extent_keys_to_delete; @@ -717,6 +739,28 @@ replace_objects_db_update::build_rows( } } + for (const auto& [tidp, keys] : extent_keys_to_delete) { + for (const auto& key : keys) { + auto ek = extent_row_key::decode(key); + vlog( + cd_log.debug, + "replace_objects: deleting extent for {}: base={}", + tidp, + ek ? ek->base_offset : kafka::offset{-1}); + } + } + for (const auto& [tidp, meta] : updated_metadata) { + vlog( + cd_log.debug, + "replace_objects: updated metadata for {}: start={}, next={}, " + "size={}, compaction_epoch={}", + tidp, + meta.start_offset, + meta.next_offset, + meta.size, + meta.compaction_epoch); + } + // Generate the rows. chunked_hash_set added_extent_keys; for (const auto& [tidp, extents] : new_extents_by_tp) { diff --git a/src/v/cloud_topics/level_one/metastore/state_update.h b/src/v/cloud_topics/level_one/metastore/state_update.h index 1b259c20a5dd3..cb67e88ae3ea8 100644 --- a/src/v/cloud_topics/level_one/metastore/state_update.h +++ b/src/v/cloud_topics/level_one/metastore/state_update.h @@ -113,6 +113,15 @@ struct compaction_state_update return std::tie(base_offset, last_offset, has_tombstones); } + fmt::iterator format_to(fmt::iterator it) const { + return fmt::format_to( + it, + "{{base={}, last={}, tombstones={}}}", + base_offset, + last_offset, + has_tombstones); + } + kafka::offset base_offset; kafka::offset last_offset; @@ -126,6 +135,18 @@ struct compaction_state_update cleaned_at, expected_compaction_epoch); } + + fmt::iterator format_to(fmt::iterator it) const { + return fmt::format_to( + it, + "{{cleaned_at={}, expected_epoch={}, new_cleaned=[{}], " + "removed_tombstones={}}}", + cleaned_at, + expected_compaction_epoch, + fmt::join(new_cleaned_ranges, ", "), + removed_tombstones_ranges); + } + // The cleaned ranges for this compaction, if any. Ranges may or may not // have tombstones. chunked_vector new_cleaned_ranges; diff --git a/src/v/cloud_topics/level_one/metastore/state_update_utils.cc b/src/v/cloud_topics/level_one/metastore/state_update_utils.cc index 44b4f5376f4b2..96e56d15cd240 100644 --- a/src/v/cloud_topics/level_one/metastore/state_update_utils.cc +++ b/src/v/cloud_topics/level_one/metastore/state_update_utils.cc @@ -9,6 +9,10 @@ */ #include "cloud_topics/level_one/metastore/state_update_utils.h" +#include "base/format_to.h" +#include "cloud_topics/logger.h" +#include "utils/to_string.h" + namespace cloud_topics::l1 { std::expected @@ -38,6 +42,7 @@ contiguous_intervals_for_extents( current_base = extent.base_offset; current_last = extent.last_offset; } else { + vlog(cd_log.error, "BROKEN UPDATE WITH EXTENTS: {}", extents); return std::unexpected( fmt::format( "Input object breaks partition {} offset ordering: " diff --git a/src/v/lsm/db/compaction_task.cc b/src/v/lsm/db/compaction_task.cc index 540344f3b9b5f..13f413188760e 100644 --- a/src/v/lsm/db/compaction_task.cc +++ b/src/v/lsm/db/compaction_task.cc @@ -147,6 +147,14 @@ ss::future> do_run_compaction_task( // NOLINTNEXTLINE(*branch-clone*) if (last_seqno_for_key <= state.smallest_snapshot) { // Hidden by a newer entry for the same user key + vlog( + log.trace, + "dropping hidden entry key={} seqno={} " + "last_seqno_for_key={} smallest_snapshot={}", + key.decode(), + key_seqno, + last_seqno_for_key, + state.smallest_snapshot); drop = true; } else if ( key.is_tombstone() && key_seqno <= state.smallest_snapshot @@ -159,6 +167,15 @@ ss::future> do_run_compaction_task( // next few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be // dropped. + vlog( + log.trace, + "dropping tombstone key={} seqno={} " + "smallest_snapshot={} input_level={} output_level={}", + key.decode(), + key_seqno, + state.smallest_snapshot, + input_level, + output_level); drop = true; } last_seqno_for_key = key_seqno; diff --git a/src/v/utils/to_string.h b/src/v/utils/to_string.h index 0f7066fa120c6..a204f28ca7314 100644 --- a/src/v/utils/to_string.h +++ b/src/v/utils/to_string.h @@ -128,6 +128,21 @@ std::ostream& operator<<(std::ostream& o, const absl::btree_set& s) { return o; } +template +std::ostream& operator<<(std::ostream& o, const absl::btree_multiset& s) { + o << "{"; + bool first = true; + for (const auto& k : s) { + if (!first) { + o << ", "; + } + o << k; + first = false; + } + o << "}"; + return o; +} + } // namespace absl template<> diff --git a/tests/rptest/archival/s3_client.py b/tests/rptest/archival/s3_client.py index 3561423175604..9220a3b4fd68a 100644 --- a/tests/rptest/archival/s3_client.py +++ b/tests/rptest/archival/s3_client.py @@ -28,6 +28,14 @@ class ObjectMetadata(NamedTuple): content_length: int +class VersionedObjectMetadata(NamedTuple): + key: str + bucket: str + etag: str + content_length: int + version_id: str + + class S3AddressingStyle(str, Enum): VIRTUAL = "virtual" PATH = "path" @@ -244,7 +252,7 @@ def empty_and_delete_bucket(self, name: str, parallel: bool = False) -> None: assert len(failed_deletions) == 0 self.delete_bucket(name) - def delete_bucket(self, name): + def delete_bucket(self, name: str): self.logger.info(f"Deleting bucket {name}...") try: self._cli.delete_bucket(Bucket=name) @@ -570,6 +578,127 @@ def list_buckets(self, client=None) -> dict[str, Union[list, dict]]: self.logger.error(f"Error listing buckets: {ex}") raise + def enable_bucket_versioning(self, bucket: str): + """Enable versioning on an S3 bucket. + + When versioning is enabled, objects that are deleted or overwritten + retain their previous versions. This is useful for capturing all + SST files that ever existed in tiered storage, even if they were + later removed by compaction or GC during a test run. + """ + self.logger.info(f"Enabling versioning on bucket {bucket}") + self._cli.put_bucket_versioning( + Bucket=bucket, + VersioningConfiguration={"Status": "Enabled"}, + ) + + @retry_on_slowdown() + def _list_object_versions( + self, + *, + bucket, + key_marker=None, + version_id_marker=None, + limit=1000, + prefix=None, + ): + try: + kwargs = dict(Bucket=bucket, MaxKeys=limit, Prefix=prefix if prefix else "") + if key_marker is not None: + kwargs["KeyMarker"] = key_marker + if version_id_marker is not None: + kwargs["VersionIdMarker"] = version_id_marker + return self._cli.list_object_versions(**kwargs) + except ClientError as err: + self.logger.debug(f"error response listing versions {bucket}: {err}") + if err.response["Error"]["Code"] == "SlowDown": + raise SlowDown() + else: + raise + + def list_object_versions( + self, + bucket: str, + prefix: str | None = None, + ) -> Iterator[VersionedObjectMetadata]: + """List all object versions in a bucket, including non-current + (deleted/overwritten) versions. + + Yields VersionedObjectMetadata for every version that ever existed. + Each entry includes the version_id needed to download that specific + version. + """ + key_marker = None + version_id_marker = None + truncated = True + while truncated: + res = self._list_object_versions( + bucket=bucket, + key_marker=key_marker, + version_id_marker=version_id_marker, + limit=100, + prefix=prefix, + ) + truncated = bool(res.get("IsTruncated", False)) + key_marker = res.get("NextKeyMarker") + version_id_marker = res.get("NextVersionIdMarker") + + for item in res.get("Versions", []): + yield VersionedObjectMetadata( + bucket=bucket, + key=item["Key"], + etag=item["ETag"][1:-1], + content_length=item["Size"], + version_id=item["VersionId"], + ) + + def get_object_data_by_version( + self, bucket: str, key: str, version_id: str + ) -> bytes: + """Download a specific version of an object.""" + resp = self._cli.get_object(Bucket=bucket, Key=key, VersionId=version_id) + return resp["Body"].read() + + def empty_versioned_bucket(self, bucket: str): + """Delete all object versions and delete markers from a + versioned bucket so it can be deleted.""" + self.logger.info(f"Emptying all versions from versioned bucket {bucket}") + key_marker = None + version_id_marker = None + truncated = True + while truncated: + res = self._list_object_versions( + bucket=bucket, + key_marker=key_marker, + version_id_marker=version_id_marker, + limit=1000, + ) + truncated = bool(res.get("IsTruncated", False)) + key_marker = res.get("NextKeyMarker") + version_id_marker = res.get("NextVersionIdMarker") + + objects_to_delete = [] + for item in res.get("Versions", []): + objects_to_delete.append( + { + "Key": item["Key"], + "VersionId": item["VersionId"], + } + ) + for item in res.get("DeleteMarkers", []): + objects_to_delete.append( + { + "Key": item["Key"], + "VersionId": item["VersionId"], + } + ) + + if objects_to_delete: + self._cli.delete_objects( + Bucket=bucket, + Delete={"Objects": objects_to_delete}, + ) + def create_expiration_policy(self, bucket: str, days: int): if self._is_gcs: self._gcp_create_expiration_policy(bucket, days) diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 0bfad9e207b01..00914ae90bfc3 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -637,6 +637,7 @@ def __init__( before_call_headers: Callable[[], dict[str, str]] | None = None, skip_end_of_test_scrubbing: bool = False, addressing_style: S3AddressingStyle = S3AddressingStyle.PATH, + enable_bucket_versioning: bool = False, ) -> None: """ :param fast_uploads: if true, set low upload intervals to help tests run @@ -761,6 +762,7 @@ def __init__( # tests. Should figure out how to re-enable it, or consider using # redpanda's built-in scrubbing capabilities. self.skip_end_of_test_scrubbing = skip_end_of_test_scrubbing + self.enable_bucket_versioning = enable_bucket_versioning if fast_uploads: self.cloud_storage_segment_max_upload_interval_sec = 10 @@ -1136,7 +1138,8 @@ class LoggingConfig: # assumed it was always supported/does not require special handling. LOGGER_GENESIS: dict[str, RedpandaVersionTriple] = { "datalake": (24, 3, 1), - "cloud_topics-compaction": (26, 1, 1), + "cloud_topics-compaction": (26, 1, 0), + "lsm": (26, 1, 0), } def __init__(self, default_level: str, logger_levels: dict[str, str] = {}) -> None: @@ -4056,6 +4059,16 @@ def start_si(self): self.si_settings.cloud_storage_bucket ) + # Enable versioning so that deleted/overwritten objects are retained. + # This allows collecting all SST files that ever existed during a test, + # even if compaction or GC removed them before the test completed. + if self.si_settings.enable_bucket_versioning and isinstance( + self.cloud_storage_client, S3Client + ): + self.cloud_storage_client.enable_bucket_versioning( + self.si_settings.cloud_storage_bucket + ) + # If the test has requested to use a bucket cleanup policy then we # attempt to create one which will remove everything from the bucket # after one day. @@ -4140,9 +4153,23 @@ def delete_bucket_from_si(self): f"missing bucket : {self.si_settings.cloud_storage_bucket}" ) t = time.time() - self.cloud_storage_client.empty_and_delete_bucket( - self.si_settings.cloud_storage_bucket, parallel=self.dedicated_nodes - ) + + # Versioned buckets require deleting all object versions and delete + # markers before the bucket itself can be removed. + if self.si_settings.enable_bucket_versioning and isinstance( + self.cloud_storage_client, S3Client + ): + self.cloud_storage_client.empty_versioned_bucket( + self.si_settings.cloud_storage_bucket + ) + self.cloud_storage_client.delete_bucket( + self.si_settings.cloud_storage_bucket + ) + else: + self.cloud_storage_client.empty_and_delete_bucket( + self.si_settings.cloud_storage_bucket, + parallel=self.dedicated_nodes, + ) self.logger.info( f"Emptying and deleting bucket {self.si_settings.cloud_storage_bucket} took {time.time() - t}s" @@ -4479,17 +4506,38 @@ def _cloud_storage_diagnostics(self): key_dump_limit = 10000 manifest_dump_limit = 128 + bucket = self.si_settings.cloud_storage_bucket + use_versioned_listing = ( + self.si_settings.enable_bucket_versioning + and self.si_settings.cloud_storage_type == CloudStorageType.S3 + ) + self.logger.info( - f"Gathering cloud storage diagnostics in bucket {self.si_settings.cloud_storage_bucket}" + f"Gathering cloud storage diagnostics in bucket {bucket}" + f" (versioned={use_versioned_listing})" ) - manifests_to_dump: list[str] = [] - for o in self.cloud_storage_client.list_objects( - self.si_settings.cloud_storage_bucket - ): - key = o.key + # When versioning is enabled, list all versions so we capture objects + # that were deleted by compaction/GC during the test run. Each entry + # is a (key, version_id) tuple so we can fetch the exact version. + client = self.cloud_storage_client + if use_versioned_listing and isinstance(client, S3Client): + versioned_iter = client.list_object_versions(bucket) + object_list: list[tuple[str, int, str | None]] = [ + (o.key, o.content_length, o.version_id) for o in versioned_iter + ] + else: + object_list = [ + (o.key, o.content_length, None) for o in client.list_objects(bucket) + ] + + # (key, version_id|None) tuples + manifests_to_dump: list[tuple[str, str | None]] = [] + segments_to_dump: list[tuple[str, str | None]] = [] + segment_dump_limit = 10000 + for key, content_length, version_id in object_list: if key_dump_limit > 0: - self.logger.info(f" {key} {o.content_length}") + self.logger.info(f" {key} {content_length}") key_dump_limit -= 1 # Gather manifest.json and topic_manifest.json files @@ -4498,10 +4546,26 @@ def _cloud_storage_diagnostics(self): or "manifest.bin" in key and manifest_dump_limit > 0 ): - manifests_to_dump.append(key) + manifests_to_dump.append((key, version_id)) manifest_dump_limit -= 1 - if manifest_dump_limit == 0 and key_dump_limit == 0: + # Gather segment/SST files (everything that is not a manifest, + # index, or lifecycle marker). + elif ( + segment_dump_limit > 0 + and not key.endswith(".tx") + and not key.endswith(".index") + and "lifecycle" not in key + and "cluster_metadata" not in key + ): + segments_to_dump.append((key, version_id)) + segment_dump_limit -= 1 + + if ( + manifest_dump_limit == 0 + and key_dump_limit == 0 + and segment_dump_limit == 0 + ): break service_dir = os.path.join( @@ -4512,16 +4576,21 @@ def _cloud_storage_diagnostics(self): if not os.path.isdir(service_dir): mkdir_p(service_dir) + def _fetch_object(key: str, version_id: str | None) -> bytes: + if version_id is not None and isinstance(client, S3Client): + return client.get_object_data_by_version(bucket, key, version_id) + return client.get_object_data(bucket, key) + archive_basename = "cloud_diagnostics.zip" archive_path = os.path.join(service_dir, archive_basename) with zipfile.ZipFile(archive_path, mode="w") as archive: - for m in manifests_to_dump: + for m, vid in manifests_to_dump: self.logger.info(f"Fetching manifest {m}") - body = self.cloud_storage_client.get_object_data( - self.si_settings.cloud_storage_bucket, m - ) + body = _fetch_object(m, vid) filename = m.replace("/", "_") + if vid is not None: + filename = f"{filename}__{vid}" with archive.open(filename, "w") as outstr: outstr.write(body) @@ -4544,6 +4613,34 @@ def _cloud_storage_diagnostics(self): with archive.open(json_filename, "w") as outstr: outstr.write(json_bytes.encode()) + # Write segment/SST files to a separate archive to keep diagnostics + # manageable and to avoid inflating the manifest archive. + if segments_to_dump: + segments_archive_path = os.path.join(service_dir, "cloud_segments.zip") + self.logger.info( + f"Collecting {len(segments_to_dump)} segment/SST files" + f" into {segments_archive_path}" + ) + with zipfile.ZipFile( + segments_archive_path, + mode="w", + compression=zipfile.ZIP_DEFLATED, + ) as archive: + for s, vid in segments_to_dump: + try: + body = _fetch_object(s, vid) + except Exception as e: + # Object may have been physically purged or the + # version expired; log and continue. + self.logger.warning(f"Failed to fetch segment {s}: {e}") + continue + filename = s.replace("/", "_") + if vid is not None: + filename = f"{filename}__{vid}" + with archive.open(filename, "w") as outstr: + outstr.write(body) + self.logger.info(f"Segment collection complete: {segments_archive_path}") + def raise_on_storage_usage_inconsistency(self): def tracked(fstat: tuple[pathlib.Path, int]): """ diff --git a/tests/rptest/tests/random_node_operations_smoke_test.py b/tests/rptest/tests/random_node_operations_smoke_test.py index 407cfc67342f9..c17ec44c57efb 100644 --- a/tests/rptest/tests/random_node_operations_smoke_test.py +++ b/tests/rptest/tests/random_node_operations_smoke_test.py @@ -128,6 +128,7 @@ def __init__( cloud_storage_enable_remote_read=True, cloud_storage_enable_remote_write=True, fast_uploads=True, + enable_bucket_versioning=True, ) self.catalog_service = IcebergRESTCatalog( test_context, @@ -918,14 +919,13 @@ def __init__(self, *args: Any, **kwargs: Any): "cloud_topics": "debug", "cloud_topics-compaction": "debug", "offset_translator": "trace", + "lsm": "trace", }, ), ) @cluster(num_nodes=9, log_allow_list=RNOT_ALLOW_LIST) - @matrix( - cloud_storage_type=get_cloud_storage_type()[:1], mixed_versions=[True, False] - ) + @matrix(cloud_storage_type=get_cloud_storage_type()[:1], mixed_versions=[False]) def test_node_ops_smoke_test( self, cloud_storage_type: CloudStorageType, mixed_versions: bool ): @@ -936,7 +936,7 @@ def test_node_ops_smoke_test( # iceberg and mixed versions are mutually incompatible, so run two # flavors of the smoke test, one with iceberg and one with mixed versions - with_iceberg = not mixed_versions + with_iceberg = False self._do_test_node_operations( enable_failures=True, mixed_versions=mixed_versions,