-
Notifications
You must be signed in to change notification settings - Fork 22
Description
I ran a local kafka 3.4.0 broker, and enabled mTLS with self signed certificates and using OPA policies as ACLs. I am using the example OPA policy posted in the project :
Policy
package kafka.authz
import future.keywords.in
default allow = false
allow {
inter_broker_communication
}
allow {
consume(input.action)
on_own_topic(input.action)
as_consumer
}
allow {
produce(input.action)
on_own_topic(input.action)
as_producer
}
allow {
create(input.action)
on_own_topic(input.action)
}
allow {
any_operation(input.action)
on_own_topic(input.action)
as_mgmt_user
}
allow {
input.action.operation == "READ"
input.action.resourcePattern.resourceType == "GROUP"
}
allow {
describe(input.action)
}
inter_broker_communication {
input.requestContext.principal.name == "ANONYMOUS"
}
inter_broker_communication {
input.requestContext.securityProtocol == "SSL"
input.requestContext.principal.principalType == "User"
username == "localhost"
}
consume(action) {
action.operation == "READ"
}
produce(action) {
action.operation == "WRITE"
}
create(action) {
action.operation == "CREATE"
}
describe(action) {
action.operation == "DESCRIBE"
}
any_operation(action) {
action.operation in ["READ", "WRITE", "CREATE", "ALTER", "DESCRIBE", "DELETE"]
}
as_consumer {
regex.match(".*-consumer", username)
}
as_producer {
regex.match(".*-producer", username)
}
as_mgmt_user {
regex.match(".*-mgmt", username)
}
on_own_topic(action) {
owner := trim(username, "-consumer")
regex.match(owner, action.resourcePattern.name)
}
on_own_topic(action) {
owner := trim(username, "-producer")
regex.match(owner, action.resourcePattern.name)
}
on_own_topic(action) {
owner := trim(username, "-mgmt")
regex.match(owner, action.resourcePattern.name)
}
username = cn_parts[0] {
name := input.requestContext.principal.name
startswith(name, "CN=")
parsed := parse_user(name)
cn_parts := split(parsed.CN, ".")
}
else = input.requestContext.principal.name {
true
}
parse_user(user) = {key: value |
parts := split(user, ",")
[key, value] := split(parts[_], "=")
}
Producer
On running a console producer to produce to the topic bob-topic , :
bin/kafka-console-producer.sh --bootstrap-server localhost:9094 --producer.config bob.producer.config --topic bob-topic
I get the :
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
This is the request received by OPA from the Kafka Broker :
{"client_addr":"127.0.0.1:51806","level":"info","msg":"Received request.","req_body":"{"input":{"requestContext":{"clientAddress":"/127.0.0.1","clientInformation":{"softwareName":"apache-kafka-java","softwareVersion":"3.4.0"},"connectionId":"127.0.0.1:9094-127.0.0.1:51818-0","header":{"name":{"clientId":"console-producer","correlationId":2,"requestApiKey":22,"requestApiVersion":4},"headerVersion":2},"listenerName":"SSL", "principal":{"principalType":"User", "name": "CN=bob-producer"},"securityProtocol": "SSL"},
"action":{"resourcePattern":{"resourceType":"TOPIC", "name":"" ,"patternType": "PREFIXED" ,"unknown":false}, "operation": "WRITE", "resourceReferenceCount":0, "logIfAllowed":true, "logIfDenied":true}}}" , "req_id":8,"req_method":"POST","req_params":{},"req_path":"/v1/data/kafka/authz/allow","time":"2023-03-30T11:31:25-07:00"}
Notice the
{"resourcePattern":{"resourceType":"TOPIC", "name":"" ,"patternType": "PREFIXED" ,"unknown":false}
which I believe is the cause of the error
If it was
{"resourcePattern":{"resourceType":"TOPIC", "name":"bob-topic" ,"patternType": "LITERAL" ,"unknown":false}
then OPA would have allowed the access.
Is this a bug or am I doing something wrong ?