From 81282af88b46009771e8685b51933bb8598aad8b Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Mon, 2 Feb 2026 04:02:02 +0000 Subject: [PATCH 01/13] sr/types: Update context_subject::from_string Update context_subject::from_string to handle context-only strings. --- .../schema_registry/test/context_subject.cc | 6 ++++-- src/v/pandaproxy/schema_registry/types.cc | 14 +++++++++----- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/test/context_subject.cc b/src/v/pandaproxy/schema_registry/test/context_subject.cc index fe5c062e556dc..c62770495e288 100644 --- a/src/v/pandaproxy/schema_registry/test/context_subject.cc +++ b/src/v/pandaproxy/schema_registry/test/context_subject.cc @@ -45,13 +45,15 @@ TEST_F(ContextSubjectTest, FromString) { context_subject::from_string(":.ctx:a:b:c"), (context_subject{context{".ctx"}, subject{"a:b:c"}})); - // Invalid qualified syntax falls back to unqualified + // Invalid qualified syntax (no dot after colon) falls back to unqualified EXPECT_EQ( context_subject::from_string(":no-dot"), (context_subject{default_context, subject{":no-dot"}})); + + // Context-only form without trailing colon: ":.ctx" (empty subject) EXPECT_EQ( context_subject::from_string(":.no-second-colon"), - (context_subject{default_context, subject{":.no-second-colon"}})); + (context_subject{context{".no-second-colon"}, subject{""}})); } TEST_F(ContextSubjectTest, ToStringAndRoundTrip) { diff --git a/src/v/pandaproxy/schema_registry/types.cc b/src/v/pandaproxy/schema_registry/types.cc index dea12a81c2911..3ae6874906309 100644 --- a/src/v/pandaproxy/schema_registry/types.cc +++ b/src/v/pandaproxy/schema_registry/types.cc @@ -94,12 +94,16 @@ context_subject context_subject::from_string(std::string_view input) { // Find the second colon that separates context from subject auto second_colon = input.find(':', 2); - if (second_colon != std::string_view::npos) { - auto ctx_str = input.substr(1, second_colon - 1); - auto sub_str = input.substr(second_colon + 1); - - return context_subject{context{ctx_str}, subject{sub_str}}; + if (second_colon == std::string_view::npos) { + // No second colon, so only context is provided + return context_subject{context{input.substr(1)}, subject{}}; } + + // Both context and subject are provided + auto ctx_str = input.substr(1, second_colon - 1); + auto sub_str = input.substr(second_colon + 1); + + return context_subject{context{ctx_str}, subject{sub_str}}; } // Default case: unqualified subject or invalid qualified syntax From 2070bf1ac2bb766134c0ed1f7eac276027f5a5b1 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Mon, 2 Feb 2026 04:03:20 +0000 Subject: [PATCH 02/13] schema_registry: Refactor context_subject helper methods Rename is_default_context() to is_default_context_only() to better reflect its behavior: it returns true only when the context is the default context AND the subject is empty (context-only). Add a new method is_non_default_context() that checks if a subject is in a non-default context. This will be used in future changes to handle subject query parameters. --- src/v/pandaproxy/schema_registry/seq_writer.cc | 7 ++++--- src/v/pandaproxy/schema_registry/types.h | 4 +++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/seq_writer.cc b/src/v/pandaproxy/schema_registry/seq_writer.cc index c72acc26a001e..18041e317eadb 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.cc +++ b/src/v/pandaproxy/schema_registry/seq_writer.cc @@ -332,8 +332,9 @@ ss::future> seq_writer::do_write_config( } batch_builder rb(write_at); - auto sub_key = sub.is_default_context() ? std::optional{} - : std::make_optional(sub); + auto sub_key = sub.is_default_context_only() + ? std::optional{} + : std::make_optional(sub); rb( config_key{.seq{write_at}, .node{_node_id}, .sub{sub_key}}, config_value{.compat = compat, .sub{sub_key}}); @@ -457,7 +458,7 @@ ss::future> seq_writer::do_write_mode( } batch_builder rb(write_at); - auto sub_key = ctx_sub.is_default_context() + auto sub_key = ctx_sub.is_default_context_only() ? std::optional{} : std::make_optional(ctx_sub); diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index e72f32b84a29c..1c07dca5f445f 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -199,10 +199,12 @@ struct context_subject { /// Retrurns true if this represents the default context with an empty /// subject. - bool is_default_context() const { + bool is_default_context_only() const { return is_context_only() && ctx == default_context; } + bool is_non_default_context() const { return ctx != default_context; } + context ctx; subject sub; }; From 5cd1e90646fa698c01fb1cdb21582b3edccf440d Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 28 Jan 2026 22:36:18 +0000 Subject: [PATCH 03/13] schema_registry: add subject query param to GET /schemas/ids/{id} Add an optional `subject` query parameter to control schema lookup context and subject restriction. The parameter value is parsed using context_subject::from_string(), which extracts a context substring and a subject substring from the input. Lookup behavior: - No parameter: search default context without subject restriction (existing behavior) - Context only (e.g., ":.ctx:"): search the specified context without subject restriction - Qualified (e.g., ":.ctx:sub"): search the specified context, restricted to the subject substring - Unqualified (e.g., "sub" or ":.:sub"): search the default context restricted to the subject substring; if not found, search all other contexts; if still not found, fall back to the default context without subject restriction A subject parameter is "unqualified" if it resolves to the default context, either implicitly (no context substring) or explicitly (context substring is "."). --- src/v/pandaproxy/schema_registry/handlers.cc | 124 ++++++++++++++++--- 1 file changed, 110 insertions(+), 14 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 5a2041e693717..06c588bb2be56 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -44,6 +44,7 @@ #include #include #include +#include namespace ppj = pandaproxy::json; @@ -153,6 +154,93 @@ to_non_context_schema_ids(const chunked_vector& ids) { | std::ranges::to>(); } +ss::future> try_get_schema_definition( + const server::request_t& rq, + std::optional& auth_result, + schema_id id, + context_subject ctx_sub) { + const context& ctx = ctx_sub.ctx().empty() ? default_context : ctx_sub.ctx; + const subject& sub = ctx_sub.sub; + context_schema_id ctx_id{ctx, id}; + + auto schema_subjects + = co_await rq.service().schema_store().get_schema_subjects( + ctx_id, include_deleted::yes); + + if (!sub().empty()) { + // If a subject is provided, ensure the schema ID is associated with it + if (std::ranges::contains(schema_subjects, ctx_sub)) { + // The schema ID is associated with the given subject in the + // given context. + schema_subjects = {ctx_sub}; + } else { + // The schema ID is not associated with the given subject in the + // given context. + schema_subjects = {}; + } + } + + // Ensure requester is authorized to access at least one of the subjects + // associated with the schema ID in the given context. + enterprise::handle_get_schemas_ids_id_authz( + rq, auth_result, schema_subjects); + + if (schema_subjects.empty()) { + // The schema ID is not associated with any subject that the requester + // is authorized to access. + co_return std::nullopt; + } + + // Here, the schema ID is verified to be associated with a subject in the + // given context that the requester is authorized to access. + co_return co_await rq.service().schema_store().maybe_get_schema_definition( + ctx_id); +} + +/// Resolve a schema definition, searching across contexts if needed. +/// First tries the given context and subject. If a subject is provided, we're +/// in the default context, and the schema is not found, then searches other +/// contexts for the schema ID with that subject. Falls back to searching the +/// default context without subject restriction if still not found. +ss::future> resolve_schema_across_contexts( + const server::request_t& rq, + std::optional& auth_result, + schema_id id, + context_subject ctx_sub) { + // Try to get schema definition with given context and subject + auto schema_def = co_await try_get_schema_definition( + rq, auth_result, id, ctx_sub); + if ( + ctx_sub.sub().empty() || ctx_sub.is_non_default_context() + || schema_def.has_value()) { + // Either no subject provided, or non-default context, or schema found + co_return schema_def; + } + + // Here, subject is NOT empty and we're in the default context (either + // implicitly or explicitly). We did not find the schema with the given + // subject in the default context, so search other contexts for the schema + // ID with the given subject. + auto contexts + = co_await rq.service().schema_store().get_materialized_contexts(); + for (const auto& ctx : contexts) { + if (ctx == default_context) { + // Already checked default context + continue; + } + schema_def = co_await try_get_schema_definition( + rq, auth_result, id, {ctx, ctx_sub.sub}); + if (schema_def) { + co_return schema_def; + } + } + + // Here, schema ID not found under any context with the given subject. + // Try searching in the default context without subject restriction. + co_return co_await try_get_schema_definition( + rq, auth_result, id, {ctx_sub.ctx, subject{}}); +} + } // namespace ss::future @@ -486,19 +574,23 @@ ss::future get_schemas_ids_id( const auto format = parse_output_format(*rq.req); co_await rq.service().writer().read_sync(); - auto subjects = co_await rq.service().schema_store().get_schema_subjects( - id, include_deleted::yes); - enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, subjects); + // Parse optional subject query parameter to extract context + auto subject_param = parse::query_param>( + *rq.req, "subject") + .value_or(""); - // With deferred schema validation, there might be a schema that - // had invalid references. These might have already been posted, so - // we need to sync - co_await rq.service().writer().read_sync(); + auto ctx_sub = context_subject::from_string(subject_param); - auto def = co_await get_or_load(rq, [&rq, id, format]() { - return rq.service().schema_store().get_schema_definition(id, format); - }); + auto maybe_def = co_await resolve_schema_across_contexts( + rq, auth_result, id, ctx_sub); + + if (!maybe_def) { + throw as_exception(not_found(id)); + } + + auto def = co_await rq.service().schema_store().format_schema( + std::move(*maybe_def), format); auto resp = ppj::rjson_serialize_iobuf( get_schemas_ids_id_response{.definition{std::move(def)}}); @@ -576,7 +668,8 @@ ss::future get_subjects( auto res = co_await rq.service().schema_store().get_subjects( inc_del, subject_prefix); - // Handle AuthZ - Filters res for the subjects the user is allowed to see + // Handle AuthZ - Filters res for the subjects the user is allowed to + // see enterprise::handle_get_subjects_authz(rq, auth_result, res); // Convert context_subject to qualified string format for JSON response @@ -627,7 +720,8 @@ post_subject(server::request_t rq, server::reply_t rp) { const auto format = parse_output_format(*rq.req); vlog( srlog.debug, - "post_subject subject='{}', normalize='{}', deleted='{}', format='{}'", + "post_subject subject='{}', normalize='{}', deleted='{}', " + "format='{}'", ctx_sub, norm, inc_del, @@ -790,7 +884,8 @@ post_subject_versions(server::request_t rq, server::reply_t rp) { throw exception( error_code::schema_incompatible, fmt::format( - "Schema being registered is incompatible with an earlier " + "Schema being registered is incompatible with an " + "earlier " "schema for subject \"{}\", details: [{}]", ctx_sub, fmt::join(compat.messages, ", "))); @@ -990,7 +1085,8 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) { auto unparsed = co_await rjson_parse( *rq.req, post_subject_versions_request_handler<>{ctx_sub}); - // Must read, in case we have the subject in cache with an outdated config + // Must read, in case we have the subject in cache with an outdated + // config co_await rq.service().writer().read_sync(); vlog( From 3c45591c946eabdfe72201e919856bb0b531daf4 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 28 Jan 2026 22:41:42 +0000 Subject: [PATCH 04/13] schema_registry/swagger: document subject param for GET /schemas/ids/{id} --- src/v/pandaproxy/api/api-doc/schema_registry.json | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/v/pandaproxy/api/api-doc/schema_registry.json b/src/v/pandaproxy/api/api-doc/schema_registry.json index 75d2265bce61f..ba34fcc0e8f15 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry.json @@ -454,6 +454,13 @@ "required": false, "type": "string", "description": "Redpanda version 25.2 or later. For Avro and Protobuf schemas only. Supported values: an empty string `''` returns the schema in its current format (default), and `serialized` (Protobuf only) returns the schema in its Base64-encoded wire binary format. Unsupported values return a 501 error." + }, + { + "name": "subject", + "in": "query", + "required": false, + "type": "string", + "description": "Optional qualified subject to search for the schema under. Use <:.context:> for context-only lookup, or <:.context:subject> to also verify the schema is associated with that subject. Defaults to searching the default context if unspecified." } ], "produces": [ From 26fe7f5efad5947635c4947c3e14b7c107fc39e0 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 28 Jan 2026 22:47:06 +0000 Subject: [PATCH 05/13] schema_registry/dt: test context lookup via subject param Add test for the `subject` query parameter on GET /schemas/ids/{id}, verifying that it correctly extracts context for schema lookup. Also update the test client to support the new parameter. --- tests/rptest/tests/schema_registry_test.py | 147 ++++++++++++++++++++- 1 file changed, 144 insertions(+), 3 deletions(-) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index c1eec8c70c32e..0b99e72a54af5 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -1000,10 +1000,17 @@ def get_schemas_types( "GET", "schemas/types", headers=headers, tls_enabled=tls_enabled, **kwargs ) - def get_schemas_ids_id(self, id, format=None, headers=HTTP_GET_HEADERS, **kwargs): - format_arg = f"?format={format}" if format is not None else "" + def get_schemas_ids_id( + self, id, format=None, subject=None, headers=HTTP_GET_HEADERS, **kwargs + ): + params = [] + if format is not None: + params.append(f"format={format}") + if subject is not None: + params.append(f"subject={subject}") + query_string = f"?{'&'.join(params)}" if params else "" return self.request( - "GET", f"schemas/ids/{id}{format_arg}", headers=headers, **kwargs + "GET", f"schemas/ids/{id}{query_string}", headers=headers, **kwargs ) def get_schemas_ids_id_versions(self, id, headers=HTTP_GET_HEADERS, **kwargs): @@ -5434,6 +5441,140 @@ def test_context_record_persistence(self): err_msg=f"Failed to find replay log: {replay_pattern}", ) + @cluster(num_nodes=1) + def test_get_schema_by_id_with_subject(self): + """Test GET /schemas/ids/{id} with subject query parameter for context lookup.""" + + # === SETUP === + # Register in default context + result = self.sr_client.post_subjects_subject_versions( + subject="sub1", data=json.dumps({"schema": schema1_def}) + ) + assert result.status_code == requests.codes.ok + default_id1 = result.json()["id"] # ID 1 + + # Register another schema in default context + result = self.sr_client.post_subjects_subject_versions( + subject="sub2", data=json.dumps({"schema": schema2_def}) + ) + assert result.status_code == requests.codes.ok + + # Register in ctx1 context (same subject name, different context) + result = self.sr_client.post_subjects_subject_versions( + subject=":.ctx1:sub1", data=json.dumps({"schema": schema1_def}) + ) + assert result.status_code == requests.codes.ok + ctx1_id1 = result.json()["id"] # ID 1 in ctx1 + + # Register unique subject only in ctx1 (for cross-context search test) + # This subject name does NOT exist in default context + result = self.sr_client.post_subjects_subject_versions( + subject=":.ctx1:unique-sub", data=json.dumps({"schema": schema3_def}) + ) + assert result.status_code == requests.codes.ok + ctx1_unique_id = result.json()["id"] # ID 2 in ctx1 + + # Register a third schema in ctx1 to create an ID that doesn't exist in default + # (for testing "ID only exists in non-default context") + result = self.sr_client.post_subjects_subject_versions( + subject=":.ctx1:ctx-only-sub", + data=json.dumps({"schema": simple_proto_def, "schemaType": "PROTOBUF"}), + ) + assert result.status_code == requests.codes.ok + ctx1_only_id = result.json()["id"] # ID 3 in ctx1, no ID 3 in default + + # === Test: Subject portion empty (sub().empty()) === + self.logger.info("Testing: Subject portion empty") + + # 1a. No subject param at all - uses default context + result = self.sr_client.get_schemas_ids_id(id=default_id1) + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # 1b. Context-only param ":.ctx1:" - uses ctx1, no subject restriction + result = self.sr_client.get_schemas_ids_id(id=ctx1_id1, subject=":.ctx1:") + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # 1c. Explicit default context-only ":.:" - uses default, no subject restriction + result = self.sr_client.get_schemas_ids_id(id=default_id1, subject=":.:") + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # === Test: Non-default context (qualified, no cross-context search) === + self.logger.info("Testing: Non-default context") + + # 2a. Non-default context - schema found + result = self.sr_client.get_schemas_ids_id(id=ctx1_id1, subject=":.ctx1:sub1") + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # 2b. Non-default context - wrong subject for ID (no fallback for non-default ctx) + result = self.sr_client.get_schemas_ids_id( + id=ctx1_id1, subject=":.ctx1:wrong-sub" + ) + assert result.status_code == requests.codes.not_found + + # 2c. Non-default context - context doesn't exist + result = self.sr_client.get_schemas_ids_id( + id=default_id1, subject=":.nonexistent:sub1" + ) + assert result.status_code == requests.codes.not_found + + # === Test: Default context (implicit or explicit), schema found === + self.logger.info("Testing: Default context, schema found") + + # 3a. Unqualified subject exists in default context + result = self.sr_client.get_schemas_ids_id(id=default_id1, subject="sub1") + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # 3b. Explicit default context (:.:) - same behavior as unqualified + result = self.sr_client.get_schemas_ids_id(id=default_id1, subject=":.:sub1") + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # === Test: Cross-context search === + self.logger.info("Testing: Cross-context search") + + # 4a. Unqualified subject "unique-sub" not in default, but exists in ctx1 + # Should find it via cross-context search + result = self.sr_client.get_schemas_ids_id( + id=ctx1_unique_id, subject="unique-sub" + ) + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema3_def + + # 4b. Explicit default context (:.:) also triggers cross-context search + result = self.sr_client.get_schemas_ids_id( + id=ctx1_unique_id, subject=":.:unique-sub" + ) + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema3_def + + # === Test: Fallback without subject restriction === + self.logger.info("Testing: Fallback without subject restriction") + + # 5a. Subject "nonexistent-sub" doesn't exist anywhere, but ID exists in default + # Should fallback to returning schema without subject check + result = self.sr_client.get_schemas_ids_id( + id=default_id1, subject="nonexistent-sub" + ) + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # === ERROR CASES === + self.logger.info("Testing error cases") + + # 6a. Schema ID doesn't exist at all + result = self.sr_client.get_schemas_ids_id(id=99999) + assert result.status_code == requests.codes.not_found + + # 6b. Schema ID exists only in ctx1, no subject param (looks in default only) + # ctx1_only_id (ID 3) only exists in ctx1, not in default context (which only has IDs 1-2) + result = self.sr_client.get_schemas_ids_id(id=ctx1_only_id) + assert result.status_code == requests.codes.not_found + class SchemaRegistryBasicAuthTest(SchemaRegistryEndpoints): """ From 3ce6adf7d4d445e9edf090e5e64ac80c904ebdf9 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Tue, 3 Feb 2026 23:49:11 +0000 Subject: [PATCH 06/13] Saving WIP - new implementation --- src/v/pandaproxy/schema_registry/handlers.cc | 245 +++++++++++------- .../schema_registry/sharded_store.cc | 14 + .../schema_registry/sharded_store.h | 4 + src/v/pandaproxy/schema_registry/store.h | 20 ++ 4 files changed, 194 insertions(+), 89 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 06c588bb2be56..1c20313e87268 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -9,6 +9,7 @@ #include "handlers.h" +#include "base/vassert.h" #include "bytes/iobuf_parser.h" #include "cluster/controller.h" #include "cluster/security_frontend.h" @@ -154,91 +155,162 @@ to_non_context_schema_ids(const chunked_vector& ids) { | std::ranges::to>(); } -ss::future> try_get_schema_definition( - const server::request_t& rq, - std::optional& auth_result, - schema_id id, - context_subject ctx_sub) { - const context& ctx = ctx_sub.ctx().empty() ? default_context : ctx_sub.ctx; - const subject& sub = ctx_sub.sub; - context_schema_id ctx_id{ctx, id}; - - auto schema_subjects - = co_await rq.service().schema_store().get_schema_subjects( - ctx_id, include_deleted::yes); - - if (!sub().empty()) { - // If a subject is provided, ensure the schema ID is associated with it - if (std::ranges::contains(schema_subjects, ctx_sub)) { - // The schema ID is associated with the given subject in the - // given context. - schema_subjects = {ctx_sub}; - } else { - // The schema ID is not associated with the given subject in the - // given context. - schema_subjects = {}; +// ss::future> try_get_schema_definition( +// const server::request_t& rq, +// std::optional& auth_result, +// schema_id id, +// context_subject ctx_sub) { +// const context& ctx = ctx_sub.ctx().empty() ? default_context : ctx_sub.ctx; +// const subject& sub = ctx_sub.sub; +// context_schema_id ctx_id{ctx, id}; + +// auto schema_subjects +// = co_await rq.service().schema_store().get_schema_subjects( +// ctx_id, include_deleted::yes); + +// if (!sub().empty()) { +// // If a subject is provided, ensure the schema ID is associated with it +// if (std::ranges::contains(schema_subjects, ctx_sub)) { +// // The schema ID is associated with the given subject in the +// // given context. +// schema_subjects = {ctx_sub}; +// } else { +// // The schema ID is not associated with the given subject in the +// // given context. +// schema_subjects = {}; +// } +// } + +// // Ensure requester is authorized to access at least one of the subjects +// // associated with the schema ID in the given context. +// enterprise::handle_get_schemas_ids_id_authz( +// rq, auth_result, schema_subjects); + +// if (schema_subjects.empty()) { +// // The schema ID is not associated with any subject that the requester +// // is authorized to access. +// co_return std::nullopt; +// } + +// // Here, the schema ID is verified to be associated with a subject in the +// // given context that the requester is authorized to access. +// co_return co_await rq.service().schema_store().maybe_get_schema_definition( +// ctx_id); +// } + +// /// Resolve a schema definition, searching across contexts if needed. +// /// First tries the given context and subject. If a subject is provided, we're +// /// in the default context, and the schema is not found, then searches other +// /// contexts for the schema ID with that subject. Falls back to searching the +// /// default context without subject restriction if still not found. +// ss::future> resolve_schema_across_contexts( +// const server::request_t& rq, +// std::optional& auth_result, +// schema_id id, +// context_subject ctx_sub) { +// // Try to get schema definition with given context and subject +// auto schema_def = co_await try_get_schema_definition( +// rq, auth_result, id, ctx_sub); +// if ( +// ctx_sub.sub().empty() || ctx_sub.is_non_default_context() +// || schema_def.has_value()) { +// // Either no subject provided, or non-default context, or schema found +// co_return schema_def; +// } + +// // Here, subject is NOT empty and we're in the default context (either +// // implicitly or explicitly). We did not find the schema with the given +// // subject in the default context, so search other contexts for the schema +// // ID with the given subject. +// auto contexts +// = co_await rq.service().schema_store().get_materialized_contexts(); +// for (const auto& ctx : contexts) { +// if (ctx == default_context) { +// // Already checked default context +// continue; +// } +// schema_def = co_await try_get_schema_definition( +// rq, auth_result, id, {ctx, ctx_sub.sub}); +// if (schema_def) { +// co_return schema_def; +// } +// } + +// // Here, schema ID not found under any context with the given subject. +// // Try searching in the default context without subject restriction. +// co_return co_await try_get_schema_definition( +// rq, auth_result, id, {ctx_sub.ctx, subject{}}); +// } + +/// Resolve a schema ID in a simple way, without searching across contexts. +/// This function assumes that either the context is not the default context, +/// or if it is the default context, then the subject is empty. +ss::future resolve_schema_id_simple( + const server::request_t& rq, + std::optional auth_result, + schema_id id, + context_subject ctx_sub) { + vassert(ctx_sub.ctx != default_context || ctx_sub.sub().empty(), + "resolve_schema_id_simple cannot be called with default context and " + "non-empty subject"); + + const context_schema_id ctx_id{ctx_sub.ctx, id}; + auto schema_subjects = co_await rq.service().schema_store().get_schema_subjects(ctx_id, include_deleted::yes); + // If a subject is provided, filter the schema_subjects to only that subject (if it exists) + if (!ctx_sub.sub().empty()) { + schema_subjects = std::ranges::contains(schema_subjects, ctx_sub) + ? decltype(schema_subjects){ctx_sub} + : decltype(schema_subjects){}; } - } - // Ensure requester is authorized to access at least one of the subjects - // associated with the schema ID in the given context. - enterprise::handle_get_schemas_ids_id_authz( - rq, auth_result, schema_subjects); + // Ensure requester is authorized to access at least one of the subjects + // associated with the schema ID in the given context. + enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, schema_subjects); - if (schema_subjects.empty()) { - // The schema ID is not associated with any subject that the requester - // is authorized to access. - co_return std::nullopt; - } + if (schema_subjects.empty()) { + // The schema ID is not associated with any subject in this context, or if the requester + // provided a ctx_sub.sub, the schema is not associated with that subject. + throw as_exception(not_found(id)); + } - // Here, the schema ID is verified to be associated with a subject in the - // given context that the requester is authorized to access. - co_return co_await rq.service().schema_store().maybe_get_schema_definition( - ctx_id); + co_return ctx_id; } -/// Resolve a schema definition, searching across contexts if needed. -/// First tries the given context and subject. If a subject is provided, we're -/// in the default context, and the schema is not found, then searches other -/// contexts for the schema ID with that subject. Falls back to searching the -/// default context without subject restriction if still not found. -ss::future> resolve_schema_across_contexts( - const server::request_t& rq, - std::optional& auth_result, - schema_id id, - context_subject ctx_sub) { - // Try to get schema definition with given context and subject - auto schema_def = co_await try_get_schema_definition( - rq, auth_result, id, ctx_sub); - if ( - ctx_sub.sub().empty() || ctx_sub.is_non_default_context() - || schema_def.has_value()) { - // Either no subject provided, or non-default context, or schema found - co_return schema_def; - } +ss::future resolve_schema_id_extended( + const server::request_t& rq, + std::optional auth_result, + schema_id id, + subject subject) { + vassert(!subject().empty(), + "resolve_schema_id_extended should only be called with a non-empty subject"); + + // First, try default context with the provided subject + if (context_subject ctx_sub{default_context, subject}; co_await rq.service().schema_store().has_version(ctx_sub, id, include_deleted::yes)) { + enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, {ctx_sub}); + co_return context_schema_id{default_context, id}; + } - // Here, subject is NOT empty and we're in the default context (either - // implicitly or explicitly). We did not find the schema with the given - // subject in the default context, so search other contexts for the schema - // ID with the given subject. - auto contexts - = co_await rq.service().schema_store().get_materialized_contexts(); - for (const auto& ctx : contexts) { - if (ctx == default_context) { - // Already checked default context - continue; + // Next, try other contexts with the provided subject + auto contexts = co_await rq.service().schema_store().get_materialized_contexts(); + for (const auto& ctx : contexts | std::views::filter([](const auto& c) { + return c != default_context; + })) { + if (context_subject ctx_sub{ctx, subject}; co_await rq.service().schema_store().has_version(ctx_sub, id, include_deleted::yes)) { + enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, {ctx_sub}); + co_return context_schema_id{ctx, id}; + } } - schema_def = co_await try_get_schema_definition( - rq, auth_result, id, {ctx, ctx_sub.sub}); - if (schema_def) { - co_return schema_def; + + // Finally, try default context without subject restriction + auto default_ctx_subjects = co_await rq.service().schema_store().get_subjects(default_context, include_deleted::yes); + enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, default_ctx_subjects); + if (!default_ctx_subjects.empty()) { + co_return context_schema_id{default_context, id}; } - } - // Here, schema ID not found under any context with the given subject. - // Try searching in the default context without subject restriction. - co_return co_await try_get_schema_definition( - rq, auth_result, id, {ctx_sub.ctx, subject{}}); + // Schema ID not found in any context with the provided subject, nor in default context without subject restriction + enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, {}); + throw as_exception(not_found(id)); } } // namespace @@ -582,21 +654,16 @@ ss::future get_schemas_ids_id( auto ctx_sub = context_subject::from_string(subject_param); - auto maybe_def = co_await resolve_schema_across_contexts( - rq, auth_result, id, ctx_sub); - - if (!maybe_def) { - throw as_exception(not_found(id)); - } - - auto def = co_await rq.service().schema_store().format_schema( - std::move(*maybe_def), format); + auto ctx_id = co_await (ctx_sub.ctx == default_context && !ctx_sub.sub().empty() + ? resolve_schema_id_extended(rq, auth_result, id, ctx_sub.sub) + : resolve_schema_id_simple(rq, auth_result, id, ctx_sub)); - auto resp = ppj::rjson_serialize_iobuf( - get_schemas_ids_id_response{.definition{std::move(def)}}); - log_response(*rq.req, resp); - rp.rep->write_body("json", ppj::as_body_writer(std::move(resp))); - co_return rp; + auto def = co_await rq.service().schema_store().get_schema_definition(ctx_id, format); + auto resp = ppj::rjson_serialize_iobuf( + get_schemas_ids_id_response{.definition{std::move(def)}}); + log_response(*rq.req, resp); + rp.rep->write_body("json", ppj::as_body_writer(std::move(resp))); + co_return rp; } ss::future diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 3f3064100dce6..36c9b30833b54 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -435,6 +435,20 @@ ss::future> sharded_store::get_subjects( co_return co_await _store.map_reduce0(map, subjects{}, reduce); } +ss::future> +sharded_store::get_subjects(context ctx, include_deleted inc_del) { + using subjects = chunked_vector; + auto map = [ctx, inc_del](store& s) { + return s.get_subjects(ctx, inc_del); + }; + auto reduce = [](subjects acc, subjects subs) { + acc.reserve(acc.size() + subs.size()); + std::move(subs.begin(), subs.end(), std::back_inserter(acc)); + return acc; + }; + co_return co_await _store.map_reduce0(map, subjects{}, reduce); +} + ss::future sharded_store::has_subjects(context ctx, include_deleted inc_del) { auto map = [ctx, inc_del](store& s) { diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index 72f9efebd3bc6..9021d4493a765 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -108,6 +108,10 @@ class sharded_store final : public schema_getter { include_deleted inc_del, std::optional subject_prefix = std::nullopt); + ///\brief Return a list of subjects for a specific context. + ss::future> + get_subjects(context ctx, include_deleted inc_del); + ///\brief Return whether there are any subjects. ss::future has_subjects(context ctx, include_deleted inc_del); diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index d8c8ee48ec8cb..f5f809db9ca2d 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -200,6 +200,26 @@ class store { return res; } + ///\brief Return a list of subjects for a specific context. + chunked_vector get_subjects( + const context& ctx, include_deleted inc_del) const { + chunked_vector res; + for (const auto& ctx_sub : _subjects) { + if (ctx_sub.first.ctx != ctx) { + continue; + } + if (inc_del || !ctx_sub.second.deleted) { + auto has_version = std::ranges::any_of( + ctx_sub.second.versions, + [inc_del](const auto& v) { return inc_del || !v.deleted; }); + if (has_version) { + res.push_back(ctx_sub.first); + } + } + } + return res; + } + ///\brief Return if there are subjects. bool has_subjects(const context& ctx, include_deleted inc_del) const { return std::ranges::any_of(_subjects, [inc_del, &ctx](const auto& sub) { From f27c710368cfee84dfc52a05157bee568eadd1ce Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 4 Feb 2026 00:14:11 +0000 Subject: [PATCH 07/13] Cleanup --- src/v/pandaproxy/schema_registry/handlers.cc | 97 ++------------------ 1 file changed, 7 insertions(+), 90 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 1c20313e87268..0c088abd5b6a8 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -155,96 +155,7 @@ to_non_context_schema_ids(const chunked_vector& ids) { | std::ranges::to>(); } -// ss::future> try_get_schema_definition( -// const server::request_t& rq, -// std::optional& auth_result, -// schema_id id, -// context_subject ctx_sub) { -// const context& ctx = ctx_sub.ctx().empty() ? default_context : ctx_sub.ctx; -// const subject& sub = ctx_sub.sub; -// context_schema_id ctx_id{ctx, id}; - -// auto schema_subjects -// = co_await rq.service().schema_store().get_schema_subjects( -// ctx_id, include_deleted::yes); - -// if (!sub().empty()) { -// // If a subject is provided, ensure the schema ID is associated with it -// if (std::ranges::contains(schema_subjects, ctx_sub)) { -// // The schema ID is associated with the given subject in the -// // given context. -// schema_subjects = {ctx_sub}; -// } else { -// // The schema ID is not associated with the given subject in the -// // given context. -// schema_subjects = {}; -// } -// } - -// // Ensure requester is authorized to access at least one of the subjects -// // associated with the schema ID in the given context. -// enterprise::handle_get_schemas_ids_id_authz( -// rq, auth_result, schema_subjects); - -// if (schema_subjects.empty()) { -// // The schema ID is not associated with any subject that the requester -// // is authorized to access. -// co_return std::nullopt; -// } - -// // Here, the schema ID is verified to be associated with a subject in the -// // given context that the requester is authorized to access. -// co_return co_await rq.service().schema_store().maybe_get_schema_definition( -// ctx_id); -// } - -// /// Resolve a schema definition, searching across contexts if needed. -// /// First tries the given context and subject. If a subject is provided, we're -// /// in the default context, and the schema is not found, then searches other -// /// contexts for the schema ID with that subject. Falls back to searching the -// /// default context without subject restriction if still not found. -// ss::future> resolve_schema_across_contexts( -// const server::request_t& rq, -// std::optional& auth_result, -// schema_id id, -// context_subject ctx_sub) { -// // Try to get schema definition with given context and subject -// auto schema_def = co_await try_get_schema_definition( -// rq, auth_result, id, ctx_sub); -// if ( -// ctx_sub.sub().empty() || ctx_sub.is_non_default_context() -// || schema_def.has_value()) { -// // Either no subject provided, or non-default context, or schema found -// co_return schema_def; -// } - -// // Here, subject is NOT empty and we're in the default context (either -// // implicitly or explicitly). We did not find the schema with the given -// // subject in the default context, so search other contexts for the schema -// // ID with the given subject. -// auto contexts -// = co_await rq.service().schema_store().get_materialized_contexts(); -// for (const auto& ctx : contexts) { -// if (ctx == default_context) { -// // Already checked default context -// continue; -// } -// schema_def = co_await try_get_schema_definition( -// rq, auth_result, id, {ctx, ctx_sub.sub}); -// if (schema_def) { -// co_return schema_def; -// } -// } - -// // Here, schema ID not found under any context with the given subject. -// // Try searching in the default context without subject restriction. -// co_return co_await try_get_schema_definition( -// rq, auth_result, id, {ctx_sub.ctx, subject{}}); -// } - -/// Resolve a schema ID in a simple way, without searching across contexts. -/// This function assumes that either the context is not the default context, -/// or if it is the default context, then the subject is empty. +/// Resolve a schema ID within a single context, optionally filtering by subject. ss::future resolve_schema_id_simple( const server::request_t& rq, std::optional auth_result, @@ -276,6 +187,12 @@ ss::future resolve_schema_id_simple( co_return ctx_id; } +/// Resolve a schema ID by searching across contexts and subjects. This function +/// assumes that the subject is non-empty. +/// The search order is: +/// 1. Default context with provided subject +/// 2. Other contexts with provided subject +/// 3. Default context without subject restriction ss::future resolve_schema_id_extended( const server::request_t& rq, std::optional auth_result, From 9445ca3b3450ba55a75e70ce07ece38f069134f7 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 4 Feb 2026 01:15:41 +0000 Subject: [PATCH 08/13] Fixed issue. --- src/v/pandaproxy/schema_registry/handlers.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 0c088abd5b6a8..f3c05915e9866 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -158,7 +158,7 @@ to_non_context_schema_ids(const chunked_vector& ids) { /// Resolve a schema ID within a single context, optionally filtering by subject. ss::future resolve_schema_id_simple( const server::request_t& rq, - std::optional auth_result, + std::optional& auth_result, schema_id id, context_subject ctx_sub) { vassert(ctx_sub.ctx != default_context || ctx_sub.sub().empty(), @@ -195,7 +195,7 @@ ss::future resolve_schema_id_simple( /// 3. Default context without subject restriction ss::future resolve_schema_id_extended( const server::request_t& rq, - std::optional auth_result, + std::optional& auth_result, schema_id id, subject subject) { vassert(!subject().empty(), From ce4facf305bb932934f56f559de7bd86245ea431 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 4 Feb 2026 01:40:41 +0000 Subject: [PATCH 09/13] Saving WIP - adding ducktape tests for context auth --- tests/rptest/tests/schema_registry_test.py | 248 +++++++++++++++++++++ 1 file changed, 248 insertions(+) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 0b99e72a54af5..5ad929889f418 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -5576,6 +5576,254 @@ def test_get_schema_by_id_with_subject(self): assert result.status_code == requests.codes.not_found +class SchemaRegistryContextAuthzTest(SchemaRegistryContextTest): + """ + Authorization tests for context-qualified subject functionality. + + These tests verify that Schema Registry correctly enforces ACL authorization + when using context-qualified subjects and the subject query parameter. + """ + + def __init__(self, context: TestContext, **kwargs: Any): + security = SecurityConfig() + security.enable_sasl = True + security.endpoint_authn_method = "sasl" + + schema_registry_config = SchemaRegistryConfig() + schema_registry_config.authn_method = "http_basic" + schema_registry_config.mode_mutability = True + + # Call grandparent (SchemaRegistryEndpoints) directly to set security, + # while keeping qualified subjects enabled via extra_rp_conf + SchemaRegistryEndpoints.__init__( + self, + context, + security=security, + schema_registry_config=schema_registry_config, + extra_rp_conf={"schema_registry_enable_qualified_subjects": True}, + **kwargs, + ) + + superuser = self.redpanda.SUPERUSER_CREDENTIALS + self.user = SaslCredentials("user", "panda012345678", superuser.mechanism) + + self.super_auth = (superuser.username, superuser.password) + self.user_auth = (self.user.username, self.user.password) + + def _init_users(self): + admin = Admin(self.redpanda) + admin.create_user( + username=self.user.username, + password=self.user.password, + algorithm=self.user.mechanism, + await_exists=True, + ) + + def _create_acl( + self, resource, resource_type, pattern_type, operation, permission="ALLOW" + ): + return self.sr_client.create_acl( + self.user.username, + resource, + resource_type, + pattern_type, + "*", + operation, + permission, + ) + + def _post_acl(self, acl): + """Grant one or more ACLs to the regular user.""" + acl_list = [acl] if isinstance(acl, dict) else acl + + resp = self.sr_client.post_security_acls(acl_list, auth=self.super_auth) + self.assert_equal(resp.status_code, 201, f"Failed to create ACL: {acl=}") + + # Wait until the ACLs are propagated to all nodes + def acl_all_observable(): + for node in self.redpanda.nodes: + resp = self.sr_client.get_security_acls( + hostname=node.account.hostname, auth=self.super_auth + ) + self.assert_equal(resp.status_code, 200) + + response_acls = resp.json() + for a in acl_list: + self.redpanda.logger.debug( + f"Checking if {a} in response from {node.account.hostname}: {response_acls}" + ) + self.assert_in(a, response_acls) + + return True + + wait_until( + acl_all_observable, + timeout_sec=30, + backoff_sec=1, + retry_on_exc=True, + err_msg=f"Failed to propagate ACLs to all nodes: {acl_list}", + ) + + def setUp(self): + super().setUp() + self._init_users() + self.redpanda.set_cluster_config( + {"schema_registry_enable_authorization": "True"} + ) + + @cluster(num_nodes=1) + def test_get_schemas_ids_id_subject_param_authorization(self): + """ + Test GET /schemas/ids/{id}?subject= authorization. + + When a subject param is provided, authorization is checked against + that specific subject (or the subject found via cross-context search), + not all subjects referencing the schema. + """ + schema_data = json.dumps({"schema": schema1_def}) + schema_data_2 = json.dumps({"schema": schema2_def}) + + # Setup: Create schemas in multiple subjects/contexts + # sub1 and sub2 in default context share the same schema + result = self.sr_client.post_subjects_subject_versions( + "sub1", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + schema_id = result.json()["id"] + + result = self.sr_client.post_subjects_subject_versions( + "sub2", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + self.assert_equal(result.json()["id"], schema_id) + + # :.ctx1:sub1 also has the same schema + result = self.sr_client.post_subjects_subject_versions( + ":.ctx1:sub1", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + self.assert_equal(result.json()["id"], schema_id) + + # :.ctx1:unique-sub has a different schema (only exists in ctx1) + result = self.sr_client.post_subjects_subject_versions( + ":.ctx1:unique-sub", data=schema_data_2, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + ctx1_unique_id = result.json()["id"] + + self.logger.info("Scenario 1: Subject param with authorized subject") + # Grant READ on sub1 - GET with subject=sub1 should succeed + self._post_acl(self._create_acl("sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + schema_id, subject="sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + self.logger.info( + "Scenario 2: Subject param for unauthorized subject " + "(user has access to different subject with same schema)" + ) + # User has READ on sub1, but requesting with subject=sub2 should fail + # because auth checks only the specified subject + result = self.sr_client.get_schemas_ids_id( + schema_id, subject="sub2", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + self.logger.info("Scenario 3: Context-qualified subject with matching ACL") + # Grant READ on :.ctx1:sub1 - GET with subject=:.ctx1:sub1 should succeed + self._post_acl(self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + schema_id, subject=":.ctx1:sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + self.logger.info( + "Scenario 4: Context-qualified subject without ACL on that context" + ) + # User has READ on sub1 (default) but not on :.ctx1:sub2 + # Requesting :.ctx1:sub2 should fail + result = self.sr_client.post_subjects_subject_versions( + ":.ctx1:sub2", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + result = self.sr_client.get_schemas_ids_id( + schema_id, subject=":.ctx1:sub2", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + self.logger.info( + "Scenario 5: Cross-context search finds subject in non-default context" + ) + # unique-sub only exists in ctx1 as :.ctx1:unique-sub + # Grant READ on :.ctx1:unique-sub + # GET with subject=unique-sub triggers cross-context search and should succeed + self._post_acl( + self._create_acl(":.ctx1:unique-sub", "SUBJECT", "LITERAL", "READ") + ) + result = self.sr_client.get_schemas_ids_id( + ctx1_unique_id, subject="unique-sub", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + self.logger.info("Scenario 6: Cross-context search, no auth on found context") + # Deny READ on :.ctx1:unique-sub, grant on unrelated subject + self._post_acl( + self._create_acl(":.ctx1:unique-sub", "SUBJECT", "LITERAL", "READ", "DENY") + ) + self._post_acl(self._create_acl("other-sub", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + ctx1_unique_id, subject="unique-sub", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + self.logger.info( + "Scenario 7: Context-only param :.ctx1: with auth on ctx1 subject" + ) + # Re-grant READ on :.ctx1:sub1 + # Context-only param :.ctx1: should check all subjects in ctx1 + self._post_acl(self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + schema_id, subject=":.ctx1:", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + self.logger.info( + "Scenario 8: Context-only param without auth on any ctx subject" + ) + # Deny all ctx1 subjects, keep only default context access + self._post_acl( + self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ", "DENY") + ) + result = self.sr_client.get_schemas_ids_id( + schema_id, subject=":.ctx1:", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + self.logger.info("Scenario 9: Prefix ACL covers context-qualified subject") + # Remove the DENY ACL from Scenario 8 to test PREFIX ACL in isolation + self.sr_client.delete_security_acls( + [self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ", "DENY")], + auth=self.super_auth, + ) + # Grant prefix ACL on :.ctx1: - should cover all ctx1 subjects + self._post_acl(self._create_acl(":.ctx1:", "SUBJECT", "PREFIXED", "READ")) + result = self.sr_client.get_schemas_ids_id( + schema_id, subject=":.ctx1:sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + self.logger.info( + "Scenario 10: Information leakage - non-existent schema ID returns 403" + ) + # Deny all to test info leakage protection + self._post_acl(self._create_acl("*", "SUBJECT", "LITERAL", "READ", "DENY")) + result = self.sr_client.get_schemas_ids_id( + 99999, subject="sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + class SchemaRegistryBasicAuthTest(SchemaRegistryEndpoints): """ Test schema registry against a redpanda cluster with HTTP Basic Auth enabled. From 0c40efa59df9e8f2680f6f9c0649166ef1aa8f12 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 4 Feb 2026 01:40:57 +0000 Subject: [PATCH 10/13] WIP refactor, but probably not going to keep --- tests/rptest/tests/schema_registry_test.py | 458 ++++++++++----------- 1 file changed, 209 insertions(+), 249 deletions(-) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 5ad929889f418..84b7119879459 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -5576,254 +5576,6 @@ def test_get_schema_by_id_with_subject(self): assert result.status_code == requests.codes.not_found -class SchemaRegistryContextAuthzTest(SchemaRegistryContextTest): - """ - Authorization tests for context-qualified subject functionality. - - These tests verify that Schema Registry correctly enforces ACL authorization - when using context-qualified subjects and the subject query parameter. - """ - - def __init__(self, context: TestContext, **kwargs: Any): - security = SecurityConfig() - security.enable_sasl = True - security.endpoint_authn_method = "sasl" - - schema_registry_config = SchemaRegistryConfig() - schema_registry_config.authn_method = "http_basic" - schema_registry_config.mode_mutability = True - - # Call grandparent (SchemaRegistryEndpoints) directly to set security, - # while keeping qualified subjects enabled via extra_rp_conf - SchemaRegistryEndpoints.__init__( - self, - context, - security=security, - schema_registry_config=schema_registry_config, - extra_rp_conf={"schema_registry_enable_qualified_subjects": True}, - **kwargs, - ) - - superuser = self.redpanda.SUPERUSER_CREDENTIALS - self.user = SaslCredentials("user", "panda012345678", superuser.mechanism) - - self.super_auth = (superuser.username, superuser.password) - self.user_auth = (self.user.username, self.user.password) - - def _init_users(self): - admin = Admin(self.redpanda) - admin.create_user( - username=self.user.username, - password=self.user.password, - algorithm=self.user.mechanism, - await_exists=True, - ) - - def _create_acl( - self, resource, resource_type, pattern_type, operation, permission="ALLOW" - ): - return self.sr_client.create_acl( - self.user.username, - resource, - resource_type, - pattern_type, - "*", - operation, - permission, - ) - - def _post_acl(self, acl): - """Grant one or more ACLs to the regular user.""" - acl_list = [acl] if isinstance(acl, dict) else acl - - resp = self.sr_client.post_security_acls(acl_list, auth=self.super_auth) - self.assert_equal(resp.status_code, 201, f"Failed to create ACL: {acl=}") - - # Wait until the ACLs are propagated to all nodes - def acl_all_observable(): - for node in self.redpanda.nodes: - resp = self.sr_client.get_security_acls( - hostname=node.account.hostname, auth=self.super_auth - ) - self.assert_equal(resp.status_code, 200) - - response_acls = resp.json() - for a in acl_list: - self.redpanda.logger.debug( - f"Checking if {a} in response from {node.account.hostname}: {response_acls}" - ) - self.assert_in(a, response_acls) - - return True - - wait_until( - acl_all_observable, - timeout_sec=30, - backoff_sec=1, - retry_on_exc=True, - err_msg=f"Failed to propagate ACLs to all nodes: {acl_list}", - ) - - def setUp(self): - super().setUp() - self._init_users() - self.redpanda.set_cluster_config( - {"schema_registry_enable_authorization": "True"} - ) - - @cluster(num_nodes=1) - def test_get_schemas_ids_id_subject_param_authorization(self): - """ - Test GET /schemas/ids/{id}?subject= authorization. - - When a subject param is provided, authorization is checked against - that specific subject (or the subject found via cross-context search), - not all subjects referencing the schema. - """ - schema_data = json.dumps({"schema": schema1_def}) - schema_data_2 = json.dumps({"schema": schema2_def}) - - # Setup: Create schemas in multiple subjects/contexts - # sub1 and sub2 in default context share the same schema - result = self.sr_client.post_subjects_subject_versions( - "sub1", data=schema_data, auth=self.super_auth - ) - self.assert_equal(result.status_code, 200) - schema_id = result.json()["id"] - - result = self.sr_client.post_subjects_subject_versions( - "sub2", data=schema_data, auth=self.super_auth - ) - self.assert_equal(result.status_code, 200) - self.assert_equal(result.json()["id"], schema_id) - - # :.ctx1:sub1 also has the same schema - result = self.sr_client.post_subjects_subject_versions( - ":.ctx1:sub1", data=schema_data, auth=self.super_auth - ) - self.assert_equal(result.status_code, 200) - self.assert_equal(result.json()["id"], schema_id) - - # :.ctx1:unique-sub has a different schema (only exists in ctx1) - result = self.sr_client.post_subjects_subject_versions( - ":.ctx1:unique-sub", data=schema_data_2, auth=self.super_auth - ) - self.assert_equal(result.status_code, 200) - ctx1_unique_id = result.json()["id"] - - self.logger.info("Scenario 1: Subject param with authorized subject") - # Grant READ on sub1 - GET with subject=sub1 should succeed - self._post_acl(self._create_acl("sub1", "SUBJECT", "LITERAL", "READ")) - result = self.sr_client.get_schemas_ids_id( - schema_id, subject="sub1", auth=self.user_auth - ) - self.assert_equal(result.status_code, 200) - - self.logger.info( - "Scenario 2: Subject param for unauthorized subject " - "(user has access to different subject with same schema)" - ) - # User has READ on sub1, but requesting with subject=sub2 should fail - # because auth checks only the specified subject - result = self.sr_client.get_schemas_ids_id( - schema_id, subject="sub2", auth=self.user_auth - ) - self.assert_equal(result.status_code, 403) - - self.logger.info("Scenario 3: Context-qualified subject with matching ACL") - # Grant READ on :.ctx1:sub1 - GET with subject=:.ctx1:sub1 should succeed - self._post_acl(self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ")) - result = self.sr_client.get_schemas_ids_id( - schema_id, subject=":.ctx1:sub1", auth=self.user_auth - ) - self.assert_equal(result.status_code, 200) - - self.logger.info( - "Scenario 4: Context-qualified subject without ACL on that context" - ) - # User has READ on sub1 (default) but not on :.ctx1:sub2 - # Requesting :.ctx1:sub2 should fail - result = self.sr_client.post_subjects_subject_versions( - ":.ctx1:sub2", data=schema_data, auth=self.super_auth - ) - self.assert_equal(result.status_code, 200) - result = self.sr_client.get_schemas_ids_id( - schema_id, subject=":.ctx1:sub2", auth=self.user_auth - ) - self.assert_equal(result.status_code, 403) - - self.logger.info( - "Scenario 5: Cross-context search finds subject in non-default context" - ) - # unique-sub only exists in ctx1 as :.ctx1:unique-sub - # Grant READ on :.ctx1:unique-sub - # GET with subject=unique-sub triggers cross-context search and should succeed - self._post_acl( - self._create_acl(":.ctx1:unique-sub", "SUBJECT", "LITERAL", "READ") - ) - result = self.sr_client.get_schemas_ids_id( - ctx1_unique_id, subject="unique-sub", auth=self.user_auth - ) - self.assert_equal(result.status_code, 200) - - self.logger.info("Scenario 6: Cross-context search, no auth on found context") - # Deny READ on :.ctx1:unique-sub, grant on unrelated subject - self._post_acl( - self._create_acl(":.ctx1:unique-sub", "SUBJECT", "LITERAL", "READ", "DENY") - ) - self._post_acl(self._create_acl("other-sub", "SUBJECT", "LITERAL", "READ")) - result = self.sr_client.get_schemas_ids_id( - ctx1_unique_id, subject="unique-sub", auth=self.user_auth - ) - self.assert_equal(result.status_code, 403) - - self.logger.info( - "Scenario 7: Context-only param :.ctx1: with auth on ctx1 subject" - ) - # Re-grant READ on :.ctx1:sub1 - # Context-only param :.ctx1: should check all subjects in ctx1 - self._post_acl(self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ")) - result = self.sr_client.get_schemas_ids_id( - schema_id, subject=":.ctx1:", auth=self.user_auth - ) - self.assert_equal(result.status_code, 200) - - self.logger.info( - "Scenario 8: Context-only param without auth on any ctx subject" - ) - # Deny all ctx1 subjects, keep only default context access - self._post_acl( - self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ", "DENY") - ) - result = self.sr_client.get_schemas_ids_id( - schema_id, subject=":.ctx1:", auth=self.user_auth - ) - self.assert_equal(result.status_code, 403) - - self.logger.info("Scenario 9: Prefix ACL covers context-qualified subject") - # Remove the DENY ACL from Scenario 8 to test PREFIX ACL in isolation - self.sr_client.delete_security_acls( - [self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ", "DENY")], - auth=self.super_auth, - ) - # Grant prefix ACL on :.ctx1: - should cover all ctx1 subjects - self._post_acl(self._create_acl(":.ctx1:", "SUBJECT", "PREFIXED", "READ")) - result = self.sr_client.get_schemas_ids_id( - schema_id, subject=":.ctx1:sub1", auth=self.user_auth - ) - self.assert_equal(result.status_code, 200) - - self.logger.info( - "Scenario 10: Information leakage - non-existent schema ID returns 403" - ) - # Deny all to test info leakage protection - self._post_acl(self._create_acl("*", "SUBJECT", "LITERAL", "READ", "DENY")) - result = self.sr_client.get_schemas_ids_id( - 99999, subject="sub1", auth=self.user_auth - ) - self.assert_equal(result.status_code, 403) - - class SchemaRegistryBasicAuthTest(SchemaRegistryEndpoints): """ Test schema registry against a redpanda cluster with HTTP Basic Auth enabled. @@ -8580,7 +8332,7 @@ class SchemaRegistryAclAuthzTest(SchemaRegistryEndpoints): # DELETE_SECURITY_ACLS - kafka cluster ACL required ] - def __init__(self, context): + def __init__(self, context, extra_rp_conf: dict | None = None, **kwargs): security = SecurityConfig() security.enable_sasl = True security.endpoint_authn_method = "sasl" @@ -8594,6 +8346,8 @@ def __init__(self, context): security=security, num_brokers=1, schema_registry_config=schema_registry_config, + extra_rp_conf=extra_rp_conf, + **kwargs, ) superuser = self.redpanda.SUPERUSER_CREDENTIALS @@ -9120,3 +8874,209 @@ def test_enterprise_sanctions(self): self.redpanda.set_cluster_config( {"schema_registry_enable_authorization": True} ) + + +class SchemaRegistryContextAuthzTest(SchemaRegistryAclAuthzTest): + """ + Authorization tests for context-qualified subject functionality. + + These tests verify that Schema Registry correctly enforces ACL authorization + when using context-qualified subjects and the subject query parameter. + """ + + def __init__(self, context: TestContext, **kwargs: Any): + super().__init__( + context, + extra_rp_conf={"schema_registry_enable_qualified_subjects": True}, + **kwargs, + ) + + def _clear_user_acls(self): + """Clear all ACLs for the test user to ensure test isolation.""" + # Get all current ACLs + resp = self.sr_client.get_security_acls(auth=self.super_auth) + if resp.status_code == 200: + acls = resp.json() + # Filter to only ACLs for our test user + user_acls = [ + acl + for acl in acls + if acl.get("principal") == f"User:{self.user.username}" + ] + if user_acls: + self.sr_client.delete_security_acls(user_acls, auth=self.super_auth) + + def _setup_test_schemas(self): + """Create schemas used by all authorization tests.""" + schema_data = json.dumps({"schema": schema1_def}) + schema_data_2 = json.dumps({"schema": schema2_def}) + + # sub1 and sub2 in default context share the same schema + result = self.sr_client.post_subjects_subject_versions( + "sub1", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + self.schema_id = result.json()["id"] + + result = self.sr_client.post_subjects_subject_versions( + "sub2", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + self.assert_equal(result.json()["id"], self.schema_id) + + # :.ctx1:sub1 also has the same schema + result = self.sr_client.post_subjects_subject_versions( + ":.ctx1:sub1", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + self.assert_equal(result.json()["id"], self.schema_id) + + # :.ctx1:sub2 also has the same schema + result = self.sr_client.post_subjects_subject_versions( + ":.ctx1:sub2", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + + # :.ctx1:unique-sub has a different schema (only exists in ctx1) + result = self.sr_client.post_subjects_subject_versions( + ":.ctx1:unique-sub", data=schema_data_2, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + self.ctx1_unique_id = result.json()["id"] + + def setUp(self): + super().setUp() + self._init_users() + self.redpanda.set_cluster_config( + {"schema_registry_enable_authorization": "True"} + ) + self._setup_test_schemas() + # self._clear_user_acls() + + @cluster(num_nodes=1) + def test_subject_param_with_authorized_subject(self): + """ + GET /schemas/ids/{id}?subject=sub1 succeeds when user has READ on sub1. + """ + self._post_acl(self._create_acl("sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject="sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + @cluster(num_nodes=1) + def test_subject_param_unauthorized_despite_other_subject_access(self): + """ + GET /schemas/ids/{id}?subject=sub2 fails when user only has READ on sub1, + even though both subjects reference the same schema. + Authorization checks only the specified subject. + """ + self._post_acl(self._create_acl("sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject="sub2", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + @cluster(num_nodes=1) + def test_context_qualified_subject_with_matching_acl(self): + """ + GET /schemas/ids/{id}?subject=:.ctx1:sub1 succeeds when user has READ + on the context-qualified subject :.ctx1:sub1. + """ + self._post_acl(self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject=":.ctx1:sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + @cluster(num_nodes=1) + def test_context_qualified_subject_without_acl_on_that_context(self): + """ + GET /schemas/ids/{id}?subject=:.ctx1:sub2 fails when user has READ on + sub1 (default context) but not on :.ctx1:sub2. + """ + self._post_acl(self._create_acl("sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject=":.ctx1:sub2", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + @cluster(num_nodes=1) + def test_cross_context_search_finds_subject_in_non_default_context(self): + """ + GET /schemas/ids/{id}?subject=unique-sub succeeds when the subject only + exists in ctx1 as :.ctx1:unique-sub and user has READ on :.ctx1:unique-sub. + Cross-context search finds the subject in the non-default context. + """ + self._post_acl( + self._create_acl(":.ctx1:unique-sub", "SUBJECT", "LITERAL", "READ") + ) + result = self.sr_client.get_schemas_ids_id( + self.ctx1_unique_id, subject="unique-sub", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + @cluster(num_nodes=1) + def test_cross_context_search_no_auth_on_found_context(self): + """ + GET /schemas/ids/{id}?subject=unique-sub fails when the subject is found + via cross-context search in ctx1 but user has DENY on :.ctx1:unique-sub. + """ + self._post_acl( + self._create_acl(":.ctx1:unique-sub", "SUBJECT", "LITERAL", "READ", "DENY") + ) + result = self.sr_client.get_schemas_ids_id( + self.ctx1_unique_id, subject="unique-sub", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + @cluster(num_nodes=1) + def test_context_only_param_with_auth_on_ctx_subject(self): + """ + GET /schemas/ids/{id}?subject=:.ctx1: succeeds when user has READ on + at least one subject in ctx1 (:.ctx1:sub1). + Context-only param checks all subjects in that context. + """ + self._post_acl(self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject=":.ctx1:", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + @cluster(num_nodes=1) + def test_context_only_param_without_auth_on_any_ctx_subject(self): + """ + GET /schemas/ids/{id}?subject=:.ctx1: fails when user has no READ + permission on any subject in ctx1. + """ + # Only grant access to default context subject, not ctx1 + self._post_acl(self._create_acl("sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject=":.ctx1:", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + @cluster(num_nodes=1) + def test_prefix_acl_covers_context_qualified_subject(self): + """ + GET /schemas/ids/{id}?subject=:.ctx1:sub1 succeeds when user has a + PREFIX ACL on :.ctx1: which covers all subjects in that context. + """ + self._post_acl(self._create_acl(":.ctx1:", "SUBJECT", "PREFIXED", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject=":.ctx1:sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + @cluster(num_nodes=1) + def test_nonexistent_schema_id_returns_403_not_404(self): + """ + GET /schemas/ids/{id}?subject=sub1 returns 403 (not 404) for non-existent + schema ID when user lacks authorization. This prevents information leakage + about whether a schema ID exists. + """ + self._post_acl(self._create_acl("*", "SUBJECT", "LITERAL", "READ", "DENY")) + result = self.sr_client.get_schemas_ids_id( + 99999, subject="sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) From e143a819a37f979c571b42835ecf6cb6ecf98444 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 4 Feb 2026 05:14:14 +0000 Subject: [PATCH 11/13] More refactor of dt tests --- tests/rptest/tests/schema_registry_test.py | 78 ++++++++++++---------- 1 file changed, 42 insertions(+), 36 deletions(-) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 84b7119879459..30899162f9fdd 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -8295,43 +8295,11 @@ def resource(self) -> dict: return {"name": "", "type": "registry"} -class SchemaRegistryAclAuthzTest(SchemaRegistryEndpoints): +class SchemaRegistryAclAuthzTestBase(SchemaRegistryEndpoints): """ - Verify that schema registry endpoints are protected by the correct ACL resource and operation. + Base class providing shared ACL test infrastructure (setup, helpers) without test methods. """ - ENDPOINTS = [ - GetConfigEndpoint, - PutConfigEndpoint, - GetConfigSubjectEndpoint, - PutConfigSubjectEndpoint, - DeleteConfigSubject, - GetMode, - PutMode, - GetModeSubject, - PutModeSubject, - DeleteModeSubject, - PostSubjectVersions, - GetSchemasIdsIdVersions, - GetSchemasIdsIdSubjects, - GetSubjectVersions, - PostSubject, - GetSubjectVersionsVersion, - GetSubjectVersionsVersionSchema, - GetSubjectVersionsVersionReferencedBy, - DeleteSubject, - DeleteSubjectVersion, - CompatibilitySubjectVersion, - # Tested separately: - # GET_SCHEMAS_TYPES - no ACLs required - # SCHEMA_REGISTRY_STATUS_READY - no ACLs required - # GET_SCHEMAS_IDS_ID - custom ACL handling - # GET_SUBJECTS - custom ACL handling - # GET_SECURITY_ACLS - kafka cluster ACL required - # POST_SECURITY_ACLS - kafka cluster ACL required - # DELETE_SECURITY_ACLS - kafka cluster ACL required - ] - def __init__(self, context, extra_rp_conf: dict | None = None, **kwargs): security = SecurityConfig() security.enable_sasl = True @@ -8341,7 +8309,7 @@ def __init__(self, context, extra_rp_conf: dict | None = None, **kwargs): schema_registry_config.authn_method = "http_basic" schema_registry_config.mode_mutability = True - super(SchemaRegistryAclAuthzTest, self).__init__( + super().__init__( context, security=security, num_brokers=1, @@ -8435,6 +8403,44 @@ def setUp(self): {"schema_registry_enable_authorization": "True"} ) + +class SchemaRegistryAclAuthzTest(SchemaRegistryAclAuthzTestBase): + """ + Verify that schema registry endpoints are protected by the correct ACL resource and operation. + """ + + ENDPOINTS = [ + GetConfigEndpoint, + PutConfigEndpoint, + GetConfigSubjectEndpoint, + PutConfigSubjectEndpoint, + DeleteConfigSubject, + GetMode, + PutMode, + GetModeSubject, + PutModeSubject, + DeleteModeSubject, + PostSubjectVersions, + GetSchemasIdsIdVersions, + GetSchemasIdsIdSubjects, + GetSubjectVersions, + PostSubject, + GetSubjectVersionsVersion, + GetSubjectVersionsVersionSchema, + GetSubjectVersionsVersionReferencedBy, + DeleteSubject, + DeleteSubjectVersion, + CompatibilitySubjectVersion, + # Tested separately: + # GET_SCHEMAS_TYPES - no ACLs required + # SCHEMA_REGISTRY_STATUS_READY - no ACLs required + # GET_SCHEMAS_IDS_ID - custom ACL handling + # GET_SUBJECTS - custom ACL handling + # GET_SECURITY_ACLS - kafka cluster ACL required + # POST_SECURITY_ACLS - kafka cluster ACL required + # DELETE_SECURITY_ACLS - kafka cluster ACL required + ] + def _get_endpoint_by_name(self, name: str) -> ACLTestEndpoint: for endpoint in self.ENDPOINTS: if endpoint.name == name: @@ -8876,7 +8882,7 @@ def test_enterprise_sanctions(self): ) -class SchemaRegistryContextAuthzTest(SchemaRegistryAclAuthzTest): +class SchemaRegistryContextAuthzTest(SchemaRegistryAclAuthzTestBase): """ Authorization tests for context-qualified subject functionality. From d8c96430d52344b2ef03939f546ea2f25c207a3b Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 4 Feb 2026 05:37:37 +0000 Subject: [PATCH 12/13] Cleanup --- tests/rptest/tests/schema_registry_test.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 30899162f9fdd..826b795e1187a 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -8897,21 +8897,6 @@ def __init__(self, context: TestContext, **kwargs: Any): **kwargs, ) - def _clear_user_acls(self): - """Clear all ACLs for the test user to ensure test isolation.""" - # Get all current ACLs - resp = self.sr_client.get_security_acls(auth=self.super_auth) - if resp.status_code == 200: - acls = resp.json() - # Filter to only ACLs for our test user - user_acls = [ - acl - for acl in acls - if acl.get("principal") == f"User:{self.user.username}" - ] - if user_acls: - self.sr_client.delete_security_acls(user_acls, auth=self.super_auth) - def _setup_test_schemas(self): """Create schemas used by all authorization tests.""" schema_data = json.dumps({"schema": schema1_def}) @@ -8957,7 +8942,6 @@ def setUp(self): {"schema_registry_enable_authorization": "True"} ) self._setup_test_schemas() - # self._clear_user_acls() @cluster(num_nodes=1) def test_subject_param_with_authorized_subject(self): @@ -9081,7 +9065,6 @@ def test_nonexistent_schema_id_returns_403_not_404(self): schema ID when user lacks authorization. This prevents information leakage about whether a schema ID exists. """ - self._post_acl(self._create_acl("*", "SUBJECT", "LITERAL", "READ", "DENY")) result = self.sr_client.get_schemas_ids_id( 99999, subject="sub1", auth=self.user_auth ) From 566f4154686ae013d1c09392caa36aa9794c4ff4 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 4 Feb 2026 16:33:18 +0000 Subject: [PATCH 13/13] Saving WIP --- src/v/pandaproxy/schema_registry/handlers.cc | 56 +++++++++++++++----- tests/rptest/tests/schema_registry_test.py | 4 -- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index f3c05915e9866..bb120fffb06e5 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -9,7 +9,6 @@ #include "handlers.h" -#include "base/vassert.h" #include "bytes/iobuf_parser.h" #include "cluster/controller.h" #include "cluster/security_frontend.h" @@ -161,14 +160,26 @@ ss::future resolve_schema_id_simple( std::optional& auth_result, schema_id id, context_subject ctx_sub) { - vassert(ctx_sub.ctx != default_context || ctx_sub.sub().empty(), - "resolve_schema_id_simple cannot be called with default context and " - "non-empty subject"); + if (ctx_sub.ctx == default_context && !ctx_sub.sub().empty()) { + vlog(srlog.error, + "resolve_schema_id_simple cannot be called with default context " + "and non-empty subject"); + throw exception(error_code::internal_server_error); + } + + vlog(srlog.debug, + "Resolving schema ID {} in context '{}'{}", + id, + ctx_sub.ctx, + ctx_sub.sub().empty() ? "" : ss::sstring{", subject '"} + ctx_sub.sub() + "'" ); const context_schema_id ctx_id{ctx_sub.ctx, id}; auto schema_subjects = co_await rq.service().schema_store().get_schema_subjects(ctx_id, include_deleted::yes); // If a subject is provided, filter the schema_subjects to only that subject (if it exists) if (!ctx_sub.sub().empty()) { + vlog(srlog.debug, + "Filtering schema subjects for subject '{}'", + ctx_sub.sub()); schema_subjects = std::ranges::contains(schema_subjects, ctx_sub) ? decltype(schema_subjects){ctx_sub} : decltype(schema_subjects){}; @@ -181,9 +192,20 @@ ss::future resolve_schema_id_simple( if (schema_subjects.empty()) { // The schema ID is not associated with any subject in this context, or if the requester // provided a ctx_sub.sub, the schema is not associated with that subject. + vlog(srlog.debug, + "Schema ID {} not found in context '{}'{}", + id, + ctx_sub.ctx, + ctx_sub.sub().empty() ? "" : ss::sstring{", subject '"} + ctx_sub.sub() + "'" ); throw as_exception(not_found(id)); } + vlog(srlog.debug, + "Schema ID {} resolved in context '{}'{}", + id, + ctx_sub.ctx, + ctx_sub.sub().empty() ? "" : ss::sstring{", subject '"} + ctx_sub.sub() + "'" ); + co_return ctx_id; } @@ -198,12 +220,18 @@ ss::future resolve_schema_id_extended( std::optional& auth_result, schema_id id, subject subject) { - vassert(!subject().empty(), - "resolve_schema_id_extended should only be called with a non-empty subject"); + if (subject().empty()) { + vlog(srlog.error, + "resolve_schema_id_extended should only be called with non-empty subject"); + throw exception(error_code::internal_server_error); + } + + vlog(srlog.debug, "Performing an extended search to resolve schema ID {} for subject '{}'.", id, subject()); // First, try default context with the provided subject if (context_subject ctx_sub{default_context, subject}; co_await rq.service().schema_store().has_version(ctx_sub, id, include_deleted::yes)) { - enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, {ctx_sub}); + vlog(srlog.debug, "Schema ID {} found in default context with subject '{}'", id, subject()); + enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, {std::move(ctx_sub)}); co_return context_schema_id{default_context, id}; } @@ -213,6 +241,7 @@ ss::future resolve_schema_id_extended( return c != default_context; })) { if (context_subject ctx_sub{ctx, subject}; co_await rq.service().schema_store().has_version(ctx_sub, id, include_deleted::yes)) { + vlog(srlog.debug, "Schema ID {} found in context '{}' with subject '{}'", id, ctx, subject()); enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, {ctx_sub}); co_return context_schema_id{ctx, id}; } @@ -222,10 +251,11 @@ ss::future resolve_schema_id_extended( auto default_ctx_subjects = co_await rq.service().schema_store().get_subjects(default_context, include_deleted::yes); enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, default_ctx_subjects); if (!default_ctx_subjects.empty()) { + vlog(srlog.debug, "Schema ID {} found in default context without subject restriction", id); co_return context_schema_id{default_context, id}; } - // Schema ID not found in any context with the provided subject, nor in default context without subject restriction + vlog(srlog.debug, "Schema ID {} not found in any context with subject '{}' or in default context without subject restriction", id, subject()); enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, {}); throw as_exception(not_found(id)); } @@ -576,11 +606,11 @@ ss::future get_schemas_ids_id( : resolve_schema_id_simple(rq, auth_result, id, ctx_sub)); auto def = co_await rq.service().schema_store().get_schema_definition(ctx_id, format); - auto resp = ppj::rjson_serialize_iobuf( - get_schemas_ids_id_response{.definition{std::move(def)}}); - log_response(*rq.req, resp); - rp.rep->write_body("json", ppj::as_body_writer(std::move(resp))); - co_return rp; + auto resp = ppj::rjson_serialize_iobuf( + get_schemas_ids_id_response{.definition{std::move(def)}}); + log_response(*rq.req, resp); + rp.rep->write_body("json", ppj::as_body_writer(std::move(resp))); + co_return rp; } ss::future diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 826b795e1187a..649408b23f7f6 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -8937,10 +8937,6 @@ def _setup_test_schemas(self): def setUp(self): super().setUp() - self._init_users() - self.redpanda.set_cluster_config( - {"schema_registry_enable_authorization": "True"} - ) self._setup_test_schemas() @cluster(num_nodes=1)