diff --git a/src/v/pandaproxy/api/api-doc/schema_registry.json b/src/v/pandaproxy/api/api-doc/schema_registry.json index 75d2265bce61f..ba34fcc0e8f15 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry.json @@ -454,6 +454,13 @@ "required": false, "type": "string", "description": "Redpanda version 25.2 or later. For Avro and Protobuf schemas only. Supported values: an empty string `''` returns the schema in its current format (default), and `serialized` (Protobuf only) returns the schema in its Base64-encoded wire binary format. Unsupported values return a 501 error." + }, + { + "name": "subject", + "in": "query", + "required": false, + "type": "string", + "description": "Optional qualified subject to search for the schema under. Use <:.context:> for context-only lookup, or <:.context:subject> to also verify the schema is associated with that subject. Defaults to searching the default context if unspecified." } ], "produces": [ diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 5a2041e693717..bb120fffb06e5 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -44,6 +44,7 @@ #include #include #include +#include namespace ppj = pandaproxy::json; @@ -153,6 +154,112 @@ to_non_context_schema_ids(const chunked_vector& ids) { | std::ranges::to>(); } +/// Resolve a schema ID within a single context, optionally filtering by subject. +ss::future resolve_schema_id_simple( + const server::request_t& rq, + std::optional& auth_result, + schema_id id, + context_subject ctx_sub) { + if (ctx_sub.ctx == default_context && !ctx_sub.sub().empty()) { + vlog(srlog.error, + "resolve_schema_id_simple cannot be called with default context " + "and non-empty subject"); + throw exception(error_code::internal_server_error); + } + + vlog(srlog.debug, + "Resolving schema ID {} in context '{}'{}", + id, + ctx_sub.ctx, + ctx_sub.sub().empty() ? "" : ss::sstring{", subject '"} + ctx_sub.sub() + "'" ); + + const context_schema_id ctx_id{ctx_sub.ctx, id}; + auto schema_subjects = co_await rq.service().schema_store().get_schema_subjects(ctx_id, include_deleted::yes); + // If a subject is provided, filter the schema_subjects to only that subject (if it exists) + if (!ctx_sub.sub().empty()) { + vlog(srlog.debug, + "Filtering schema subjects for subject '{}'", + ctx_sub.sub()); + schema_subjects = std::ranges::contains(schema_subjects, ctx_sub) + ? decltype(schema_subjects){ctx_sub} + : decltype(schema_subjects){}; + } + + // Ensure requester is authorized to access at least one of the subjects + // associated with the schema ID in the given context. + enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, schema_subjects); + + if (schema_subjects.empty()) { + // The schema ID is not associated with any subject in this context, or if the requester + // provided a ctx_sub.sub, the schema is not associated with that subject. + vlog(srlog.debug, + "Schema ID {} not found in context '{}'{}", + id, + ctx_sub.ctx, + ctx_sub.sub().empty() ? "" : ss::sstring{", subject '"} + ctx_sub.sub() + "'" ); + throw as_exception(not_found(id)); + } + + vlog(srlog.debug, + "Schema ID {} resolved in context '{}'{}", + id, + ctx_sub.ctx, + ctx_sub.sub().empty() ? "" : ss::sstring{", subject '"} + ctx_sub.sub() + "'" ); + + co_return ctx_id; +} + +/// Resolve a schema ID by searching across contexts and subjects. This function +/// assumes that the subject is non-empty. +/// The search order is: +/// 1. Default context with provided subject +/// 2. Other contexts with provided subject +/// 3. Default context without subject restriction +ss::future resolve_schema_id_extended( + const server::request_t& rq, + std::optional& auth_result, + schema_id id, + subject subject) { + if (subject().empty()) { + vlog(srlog.error, + "resolve_schema_id_extended should only be called with non-empty subject"); + throw exception(error_code::internal_server_error); + } + + vlog(srlog.debug, "Performing an extended search to resolve schema ID {} for subject '{}'.", id, subject()); + + // First, try default context with the provided subject + if (context_subject ctx_sub{default_context, subject}; co_await rq.service().schema_store().has_version(ctx_sub, id, include_deleted::yes)) { + vlog(srlog.debug, "Schema ID {} found in default context with subject '{}'", id, subject()); + enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, {std::move(ctx_sub)}); + co_return context_schema_id{default_context, id}; + } + + // Next, try other contexts with the provided subject + auto contexts = co_await rq.service().schema_store().get_materialized_contexts(); + for (const auto& ctx : contexts | std::views::filter([](const auto& c) { + return c != default_context; + })) { + if (context_subject ctx_sub{ctx, subject}; co_await rq.service().schema_store().has_version(ctx_sub, id, include_deleted::yes)) { + vlog(srlog.debug, "Schema ID {} found in context '{}' with subject '{}'", id, ctx, subject()); + enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, {ctx_sub}); + co_return context_schema_id{ctx, id}; + } + } + + // Finally, try default context without subject restriction + auto default_ctx_subjects = co_await rq.service().schema_store().get_subjects(default_context, include_deleted::yes); + enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, default_ctx_subjects); + if (!default_ctx_subjects.empty()) { + vlog(srlog.debug, "Schema ID {} found in default context without subject restriction", id); + co_return context_schema_id{default_context, id}; + } + + vlog(srlog.debug, "Schema ID {} not found in any context with subject '{}' or in default context without subject restriction", id, subject()); + enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, {}); + throw as_exception(not_found(id)); +} + } // namespace ss::future @@ -486,20 +593,19 @@ ss::future get_schemas_ids_id( const auto format = parse_output_format(*rq.req); co_await rq.service().writer().read_sync(); - auto subjects = co_await rq.service().schema_store().get_schema_subjects( - id, include_deleted::yes); - enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, subjects); + // Parse optional subject query parameter to extract context + auto subject_param = parse::query_param>( + *rq.req, "subject") + .value_or(""); - // With deferred schema validation, there might be a schema that - // had invalid references. These might have already been posted, so - // we need to sync - co_await rq.service().writer().read_sync(); + auto ctx_sub = context_subject::from_string(subject_param); - auto def = co_await get_or_load(rq, [&rq, id, format]() { - return rq.service().schema_store().get_schema_definition(id, format); - }); + auto ctx_id = co_await (ctx_sub.ctx == default_context && !ctx_sub.sub().empty() + ? resolve_schema_id_extended(rq, auth_result, id, ctx_sub.sub) + : resolve_schema_id_simple(rq, auth_result, id, ctx_sub)); + auto def = co_await rq.service().schema_store().get_schema_definition(ctx_id, format); auto resp = ppj::rjson_serialize_iobuf( get_schemas_ids_id_response{.definition{std::move(def)}}); log_response(*rq.req, resp); @@ -576,7 +682,8 @@ ss::future get_subjects( auto res = co_await rq.service().schema_store().get_subjects( inc_del, subject_prefix); - // Handle AuthZ - Filters res for the subjects the user is allowed to see + // Handle AuthZ - Filters res for the subjects the user is allowed to + // see enterprise::handle_get_subjects_authz(rq, auth_result, res); // Convert context_subject to qualified string format for JSON response @@ -627,7 +734,8 @@ post_subject(server::request_t rq, server::reply_t rp) { const auto format = parse_output_format(*rq.req); vlog( srlog.debug, - "post_subject subject='{}', normalize='{}', deleted='{}', format='{}'", + "post_subject subject='{}', normalize='{}', deleted='{}', " + "format='{}'", ctx_sub, norm, inc_del, @@ -790,7 +898,8 @@ post_subject_versions(server::request_t rq, server::reply_t rp) { throw exception( error_code::schema_incompatible, fmt::format( - "Schema being registered is incompatible with an earlier " + "Schema being registered is incompatible with an " + "earlier " "schema for subject \"{}\", details: [{}]", ctx_sub, fmt::join(compat.messages, ", "))); @@ -990,7 +1099,8 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) { auto unparsed = co_await rjson_parse( *rq.req, post_subject_versions_request_handler<>{ctx_sub}); - // Must read, in case we have the subject in cache with an outdated config + // Must read, in case we have the subject in cache with an outdated + // config co_await rq.service().writer().read_sync(); vlog( diff --git a/src/v/pandaproxy/schema_registry/seq_writer.cc b/src/v/pandaproxy/schema_registry/seq_writer.cc index c72acc26a001e..18041e317eadb 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.cc +++ b/src/v/pandaproxy/schema_registry/seq_writer.cc @@ -332,8 +332,9 @@ ss::future> seq_writer::do_write_config( } batch_builder rb(write_at); - auto sub_key = sub.is_default_context() ? std::optional{} - : std::make_optional(sub); + auto sub_key = sub.is_default_context_only() + ? std::optional{} + : std::make_optional(sub); rb( config_key{.seq{write_at}, .node{_node_id}, .sub{sub_key}}, config_value{.compat = compat, .sub{sub_key}}); @@ -457,7 +458,7 @@ ss::future> seq_writer::do_write_mode( } batch_builder rb(write_at); - auto sub_key = ctx_sub.is_default_context() + auto sub_key = ctx_sub.is_default_context_only() ? std::optional{} : std::make_optional(ctx_sub); diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 3f3064100dce6..36c9b30833b54 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -435,6 +435,20 @@ ss::future> sharded_store::get_subjects( co_return co_await _store.map_reduce0(map, subjects{}, reduce); } +ss::future> +sharded_store::get_subjects(context ctx, include_deleted inc_del) { + using subjects = chunked_vector; + auto map = [ctx, inc_del](store& s) { + return s.get_subjects(ctx, inc_del); + }; + auto reduce = [](subjects acc, subjects subs) { + acc.reserve(acc.size() + subs.size()); + std::move(subs.begin(), subs.end(), std::back_inserter(acc)); + return acc; + }; + co_return co_await _store.map_reduce0(map, subjects{}, reduce); +} + ss::future sharded_store::has_subjects(context ctx, include_deleted inc_del) { auto map = [ctx, inc_del](store& s) { diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index 72f9efebd3bc6..9021d4493a765 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -108,6 +108,10 @@ class sharded_store final : public schema_getter { include_deleted inc_del, std::optional subject_prefix = std::nullopt); + ///\brief Return a list of subjects for a specific context. + ss::future> + get_subjects(context ctx, include_deleted inc_del); + ///\brief Return whether there are any subjects. ss::future has_subjects(context ctx, include_deleted inc_del); diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index d8c8ee48ec8cb..f5f809db9ca2d 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -200,6 +200,26 @@ class store { return res; } + ///\brief Return a list of subjects for a specific context. + chunked_vector get_subjects( + const context& ctx, include_deleted inc_del) const { + chunked_vector res; + for (const auto& ctx_sub : _subjects) { + if (ctx_sub.first.ctx != ctx) { + continue; + } + if (inc_del || !ctx_sub.second.deleted) { + auto has_version = std::ranges::any_of( + ctx_sub.second.versions, + [inc_del](const auto& v) { return inc_del || !v.deleted; }); + if (has_version) { + res.push_back(ctx_sub.first); + } + } + } + return res; + } + ///\brief Return if there are subjects. bool has_subjects(const context& ctx, include_deleted inc_del) const { return std::ranges::any_of(_subjects, [inc_del, &ctx](const auto& sub) { diff --git a/src/v/pandaproxy/schema_registry/test/context_subject.cc b/src/v/pandaproxy/schema_registry/test/context_subject.cc index fe5c062e556dc..c62770495e288 100644 --- a/src/v/pandaproxy/schema_registry/test/context_subject.cc +++ b/src/v/pandaproxy/schema_registry/test/context_subject.cc @@ -45,13 +45,15 @@ TEST_F(ContextSubjectTest, FromString) { context_subject::from_string(":.ctx:a:b:c"), (context_subject{context{".ctx"}, subject{"a:b:c"}})); - // Invalid qualified syntax falls back to unqualified + // Invalid qualified syntax (no dot after colon) falls back to unqualified EXPECT_EQ( context_subject::from_string(":no-dot"), (context_subject{default_context, subject{":no-dot"}})); + + // Context-only form without trailing colon: ":.ctx" (empty subject) EXPECT_EQ( context_subject::from_string(":.no-second-colon"), - (context_subject{default_context, subject{":.no-second-colon"}})); + (context_subject{context{".no-second-colon"}, subject{""}})); } TEST_F(ContextSubjectTest, ToStringAndRoundTrip) { diff --git a/src/v/pandaproxy/schema_registry/types.cc b/src/v/pandaproxy/schema_registry/types.cc index dea12a81c2911..3ae6874906309 100644 --- a/src/v/pandaproxy/schema_registry/types.cc +++ b/src/v/pandaproxy/schema_registry/types.cc @@ -94,12 +94,16 @@ context_subject context_subject::from_string(std::string_view input) { // Find the second colon that separates context from subject auto second_colon = input.find(':', 2); - if (second_colon != std::string_view::npos) { - auto ctx_str = input.substr(1, second_colon - 1); - auto sub_str = input.substr(second_colon + 1); - - return context_subject{context{ctx_str}, subject{sub_str}}; + if (second_colon == std::string_view::npos) { + // No second colon, so only context is provided + return context_subject{context{input.substr(1)}, subject{}}; } + + // Both context and subject are provided + auto ctx_str = input.substr(1, second_colon - 1); + auto sub_str = input.substr(second_colon + 1); + + return context_subject{context{ctx_str}, subject{sub_str}}; } // Default case: unqualified subject or invalid qualified syntax diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index e72f32b84a29c..1c07dca5f445f 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -199,10 +199,12 @@ struct context_subject { /// Retrurns true if this represents the default context with an empty /// subject. - bool is_default_context() const { + bool is_default_context_only() const { return is_context_only() && ctx == default_context; } + bool is_non_default_context() const { return ctx != default_context; } + context ctx; subject sub; }; diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index c1eec8c70c32e..649408b23f7f6 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -1000,10 +1000,17 @@ def get_schemas_types( "GET", "schemas/types", headers=headers, tls_enabled=tls_enabled, **kwargs ) - def get_schemas_ids_id(self, id, format=None, headers=HTTP_GET_HEADERS, **kwargs): - format_arg = f"?format={format}" if format is not None else "" + def get_schemas_ids_id( + self, id, format=None, subject=None, headers=HTTP_GET_HEADERS, **kwargs + ): + params = [] + if format is not None: + params.append(f"format={format}") + if subject is not None: + params.append(f"subject={subject}") + query_string = f"?{'&'.join(params)}" if params else "" return self.request( - "GET", f"schemas/ids/{id}{format_arg}", headers=headers, **kwargs + "GET", f"schemas/ids/{id}{query_string}", headers=headers, **kwargs ) def get_schemas_ids_id_versions(self, id, headers=HTTP_GET_HEADERS, **kwargs): @@ -5434,6 +5441,140 @@ def test_context_record_persistence(self): err_msg=f"Failed to find replay log: {replay_pattern}", ) + @cluster(num_nodes=1) + def test_get_schema_by_id_with_subject(self): + """Test GET /schemas/ids/{id} with subject query parameter for context lookup.""" + + # === SETUP === + # Register in default context + result = self.sr_client.post_subjects_subject_versions( + subject="sub1", data=json.dumps({"schema": schema1_def}) + ) + assert result.status_code == requests.codes.ok + default_id1 = result.json()["id"] # ID 1 + + # Register another schema in default context + result = self.sr_client.post_subjects_subject_versions( + subject="sub2", data=json.dumps({"schema": schema2_def}) + ) + assert result.status_code == requests.codes.ok + + # Register in ctx1 context (same subject name, different context) + result = self.sr_client.post_subjects_subject_versions( + subject=":.ctx1:sub1", data=json.dumps({"schema": schema1_def}) + ) + assert result.status_code == requests.codes.ok + ctx1_id1 = result.json()["id"] # ID 1 in ctx1 + + # Register unique subject only in ctx1 (for cross-context search test) + # This subject name does NOT exist in default context + result = self.sr_client.post_subjects_subject_versions( + subject=":.ctx1:unique-sub", data=json.dumps({"schema": schema3_def}) + ) + assert result.status_code == requests.codes.ok + ctx1_unique_id = result.json()["id"] # ID 2 in ctx1 + + # Register a third schema in ctx1 to create an ID that doesn't exist in default + # (for testing "ID only exists in non-default context") + result = self.sr_client.post_subjects_subject_versions( + subject=":.ctx1:ctx-only-sub", + data=json.dumps({"schema": simple_proto_def, "schemaType": "PROTOBUF"}), + ) + assert result.status_code == requests.codes.ok + ctx1_only_id = result.json()["id"] # ID 3 in ctx1, no ID 3 in default + + # === Test: Subject portion empty (sub().empty()) === + self.logger.info("Testing: Subject portion empty") + + # 1a. No subject param at all - uses default context + result = self.sr_client.get_schemas_ids_id(id=default_id1) + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # 1b. Context-only param ":.ctx1:" - uses ctx1, no subject restriction + result = self.sr_client.get_schemas_ids_id(id=ctx1_id1, subject=":.ctx1:") + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # 1c. Explicit default context-only ":.:" - uses default, no subject restriction + result = self.sr_client.get_schemas_ids_id(id=default_id1, subject=":.:") + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # === Test: Non-default context (qualified, no cross-context search) === + self.logger.info("Testing: Non-default context") + + # 2a. Non-default context - schema found + result = self.sr_client.get_schemas_ids_id(id=ctx1_id1, subject=":.ctx1:sub1") + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # 2b. Non-default context - wrong subject for ID (no fallback for non-default ctx) + result = self.sr_client.get_schemas_ids_id( + id=ctx1_id1, subject=":.ctx1:wrong-sub" + ) + assert result.status_code == requests.codes.not_found + + # 2c. Non-default context - context doesn't exist + result = self.sr_client.get_schemas_ids_id( + id=default_id1, subject=":.nonexistent:sub1" + ) + assert result.status_code == requests.codes.not_found + + # === Test: Default context (implicit or explicit), schema found === + self.logger.info("Testing: Default context, schema found") + + # 3a. Unqualified subject exists in default context + result = self.sr_client.get_schemas_ids_id(id=default_id1, subject="sub1") + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # 3b. Explicit default context (:.:) - same behavior as unqualified + result = self.sr_client.get_schemas_ids_id(id=default_id1, subject=":.:sub1") + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # === Test: Cross-context search === + self.logger.info("Testing: Cross-context search") + + # 4a. Unqualified subject "unique-sub" not in default, but exists in ctx1 + # Should find it via cross-context search + result = self.sr_client.get_schemas_ids_id( + id=ctx1_unique_id, subject="unique-sub" + ) + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema3_def + + # 4b. Explicit default context (:.:) also triggers cross-context search + result = self.sr_client.get_schemas_ids_id( + id=ctx1_unique_id, subject=":.:unique-sub" + ) + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema3_def + + # === Test: Fallback without subject restriction === + self.logger.info("Testing: Fallback without subject restriction") + + # 5a. Subject "nonexistent-sub" doesn't exist anywhere, but ID exists in default + # Should fallback to returning schema without subject check + result = self.sr_client.get_schemas_ids_id( + id=default_id1, subject="nonexistent-sub" + ) + assert result.status_code == requests.codes.ok + assert result.json()["schema"] == schema1_def + + # === ERROR CASES === + self.logger.info("Testing error cases") + + # 6a. Schema ID doesn't exist at all + result = self.sr_client.get_schemas_ids_id(id=99999) + assert result.status_code == requests.codes.not_found + + # 6b. Schema ID exists only in ctx1, no subject param (looks in default only) + # ctx1_only_id (ID 3) only exists in ctx1, not in default context (which only has IDs 1-2) + result = self.sr_client.get_schemas_ids_id(id=ctx1_only_id) + assert result.status_code == requests.codes.not_found + class SchemaRegistryBasicAuthTest(SchemaRegistryEndpoints): """ @@ -8154,44 +8295,12 @@ def resource(self) -> dict: return {"name": "", "type": "registry"} -class SchemaRegistryAclAuthzTest(SchemaRegistryEndpoints): +class SchemaRegistryAclAuthzTestBase(SchemaRegistryEndpoints): """ - Verify that schema registry endpoints are protected by the correct ACL resource and operation. + Base class providing shared ACL test infrastructure (setup, helpers) without test methods. """ - ENDPOINTS = [ - GetConfigEndpoint, - PutConfigEndpoint, - GetConfigSubjectEndpoint, - PutConfigSubjectEndpoint, - DeleteConfigSubject, - GetMode, - PutMode, - GetModeSubject, - PutModeSubject, - DeleteModeSubject, - PostSubjectVersions, - GetSchemasIdsIdVersions, - GetSchemasIdsIdSubjects, - GetSubjectVersions, - PostSubject, - GetSubjectVersionsVersion, - GetSubjectVersionsVersionSchema, - GetSubjectVersionsVersionReferencedBy, - DeleteSubject, - DeleteSubjectVersion, - CompatibilitySubjectVersion, - # Tested separately: - # GET_SCHEMAS_TYPES - no ACLs required - # SCHEMA_REGISTRY_STATUS_READY - no ACLs required - # GET_SCHEMAS_IDS_ID - custom ACL handling - # GET_SUBJECTS - custom ACL handling - # GET_SECURITY_ACLS - kafka cluster ACL required - # POST_SECURITY_ACLS - kafka cluster ACL required - # DELETE_SECURITY_ACLS - kafka cluster ACL required - ] - - def __init__(self, context): + def __init__(self, context, extra_rp_conf: dict | None = None, **kwargs): security = SecurityConfig() security.enable_sasl = True security.endpoint_authn_method = "sasl" @@ -8200,11 +8309,13 @@ def __init__(self, context): schema_registry_config.authn_method = "http_basic" schema_registry_config.mode_mutability = True - super(SchemaRegistryAclAuthzTest, self).__init__( + super().__init__( context, security=security, num_brokers=1, schema_registry_config=schema_registry_config, + extra_rp_conf=extra_rp_conf, + **kwargs, ) superuser = self.redpanda.SUPERUSER_CREDENTIALS @@ -8292,6 +8403,44 @@ def setUp(self): {"schema_registry_enable_authorization": "True"} ) + +class SchemaRegistryAclAuthzTest(SchemaRegistryAclAuthzTestBase): + """ + Verify that schema registry endpoints are protected by the correct ACL resource and operation. + """ + + ENDPOINTS = [ + GetConfigEndpoint, + PutConfigEndpoint, + GetConfigSubjectEndpoint, + PutConfigSubjectEndpoint, + DeleteConfigSubject, + GetMode, + PutMode, + GetModeSubject, + PutModeSubject, + DeleteModeSubject, + PostSubjectVersions, + GetSchemasIdsIdVersions, + GetSchemasIdsIdSubjects, + GetSubjectVersions, + PostSubject, + GetSubjectVersionsVersion, + GetSubjectVersionsVersionSchema, + GetSubjectVersionsVersionReferencedBy, + DeleteSubject, + DeleteSubjectVersion, + CompatibilitySubjectVersion, + # Tested separately: + # GET_SCHEMAS_TYPES - no ACLs required + # SCHEMA_REGISTRY_STATUS_READY - no ACLs required + # GET_SCHEMAS_IDS_ID - custom ACL handling + # GET_SUBJECTS - custom ACL handling + # GET_SECURITY_ACLS - kafka cluster ACL required + # POST_SECURITY_ACLS - kafka cluster ACL required + # DELETE_SECURITY_ACLS - kafka cluster ACL required + ] + def _get_endpoint_by_name(self, name: str) -> ACLTestEndpoint: for endpoint in self.ENDPOINTS: if endpoint.name == name: @@ -8731,3 +8880,188 @@ def test_enterprise_sanctions(self): self.redpanda.set_cluster_config( {"schema_registry_enable_authorization": True} ) + + +class SchemaRegistryContextAuthzTest(SchemaRegistryAclAuthzTestBase): + """ + Authorization tests for context-qualified subject functionality. + + These tests verify that Schema Registry correctly enforces ACL authorization + when using context-qualified subjects and the subject query parameter. + """ + + def __init__(self, context: TestContext, **kwargs: Any): + super().__init__( + context, + extra_rp_conf={"schema_registry_enable_qualified_subjects": True}, + **kwargs, + ) + + def _setup_test_schemas(self): + """Create schemas used by all authorization tests.""" + schema_data = json.dumps({"schema": schema1_def}) + schema_data_2 = json.dumps({"schema": schema2_def}) + + # sub1 and sub2 in default context share the same schema + result = self.sr_client.post_subjects_subject_versions( + "sub1", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + self.schema_id = result.json()["id"] + + result = self.sr_client.post_subjects_subject_versions( + "sub2", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + self.assert_equal(result.json()["id"], self.schema_id) + + # :.ctx1:sub1 also has the same schema + result = self.sr_client.post_subjects_subject_versions( + ":.ctx1:sub1", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + self.assert_equal(result.json()["id"], self.schema_id) + + # :.ctx1:sub2 also has the same schema + result = self.sr_client.post_subjects_subject_versions( + ":.ctx1:sub2", data=schema_data, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + + # :.ctx1:unique-sub has a different schema (only exists in ctx1) + result = self.sr_client.post_subjects_subject_versions( + ":.ctx1:unique-sub", data=schema_data_2, auth=self.super_auth + ) + self.assert_equal(result.status_code, 200) + self.ctx1_unique_id = result.json()["id"] + + def setUp(self): + super().setUp() + self._setup_test_schemas() + + @cluster(num_nodes=1) + def test_subject_param_with_authorized_subject(self): + """ + GET /schemas/ids/{id}?subject=sub1 succeeds when user has READ on sub1. + """ + self._post_acl(self._create_acl("sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject="sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + @cluster(num_nodes=1) + def test_subject_param_unauthorized_despite_other_subject_access(self): + """ + GET /schemas/ids/{id}?subject=sub2 fails when user only has READ on sub1, + even though both subjects reference the same schema. + Authorization checks only the specified subject. + """ + self._post_acl(self._create_acl("sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject="sub2", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + @cluster(num_nodes=1) + def test_context_qualified_subject_with_matching_acl(self): + """ + GET /schemas/ids/{id}?subject=:.ctx1:sub1 succeeds when user has READ + on the context-qualified subject :.ctx1:sub1. + """ + self._post_acl(self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject=":.ctx1:sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + @cluster(num_nodes=1) + def test_context_qualified_subject_without_acl_on_that_context(self): + """ + GET /schemas/ids/{id}?subject=:.ctx1:sub2 fails when user has READ on + sub1 (default context) but not on :.ctx1:sub2. + """ + self._post_acl(self._create_acl("sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject=":.ctx1:sub2", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + @cluster(num_nodes=1) + def test_cross_context_search_finds_subject_in_non_default_context(self): + """ + GET /schemas/ids/{id}?subject=unique-sub succeeds when the subject only + exists in ctx1 as :.ctx1:unique-sub and user has READ on :.ctx1:unique-sub. + Cross-context search finds the subject in the non-default context. + """ + self._post_acl( + self._create_acl(":.ctx1:unique-sub", "SUBJECT", "LITERAL", "READ") + ) + result = self.sr_client.get_schemas_ids_id( + self.ctx1_unique_id, subject="unique-sub", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + @cluster(num_nodes=1) + def test_cross_context_search_no_auth_on_found_context(self): + """ + GET /schemas/ids/{id}?subject=unique-sub fails when the subject is found + via cross-context search in ctx1 but user has DENY on :.ctx1:unique-sub. + """ + self._post_acl( + self._create_acl(":.ctx1:unique-sub", "SUBJECT", "LITERAL", "READ", "DENY") + ) + result = self.sr_client.get_schemas_ids_id( + self.ctx1_unique_id, subject="unique-sub", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + @cluster(num_nodes=1) + def test_context_only_param_with_auth_on_ctx_subject(self): + """ + GET /schemas/ids/{id}?subject=:.ctx1: succeeds when user has READ on + at least one subject in ctx1 (:.ctx1:sub1). + Context-only param checks all subjects in that context. + """ + self._post_acl(self._create_acl(":.ctx1:sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject=":.ctx1:", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + @cluster(num_nodes=1) + def test_context_only_param_without_auth_on_any_ctx_subject(self): + """ + GET /schemas/ids/{id}?subject=:.ctx1: fails when user has no READ + permission on any subject in ctx1. + """ + # Only grant access to default context subject, not ctx1 + self._post_acl(self._create_acl("sub1", "SUBJECT", "LITERAL", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject=":.ctx1:", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403) + + @cluster(num_nodes=1) + def test_prefix_acl_covers_context_qualified_subject(self): + """ + GET /schemas/ids/{id}?subject=:.ctx1:sub1 succeeds when user has a + PREFIX ACL on :.ctx1: which covers all subjects in that context. + """ + self._post_acl(self._create_acl(":.ctx1:", "SUBJECT", "PREFIXED", "READ")) + result = self.sr_client.get_schemas_ids_id( + self.schema_id, subject=":.ctx1:sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 200) + + @cluster(num_nodes=1) + def test_nonexistent_schema_id_returns_403_not_404(self): + """ + GET /schemas/ids/{id}?subject=sub1 returns 403 (not 404) for non-existent + schema ID when user lacks authorization. This prevents information leakage + about whether a schema ID exists. + """ + result = self.sr_client.get_schemas_ids_id( + 99999, subject="sub1", auth=self.user_auth + ) + self.assert_equal(result.status_code, 403)