Skip to content

Commit b66fbd2

Browse files
author
Thomas Newman
committed
Added Azure Entra Auth support
1 parent 8c70126 commit b66fbd2

File tree

7 files changed

+183
-0
lines changed

7 files changed

+183
-0
lines changed

api/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@
9494
<version>2.1.0</version>
9595
</dependency>
9696

97+
<dependency>
98+
<groupId>com.azure</groupId>
99+
<artifactId>azure-identity</artifactId>
100+
<version>1.13.0</version>
101+
</dependency>
102+
97103
<dependency>
98104
<groupId>org.apache.avro</groupId>
99105
<artifactId>avro</artifactId>
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package io.kafbat.ui.sasl.azure.entra;
2+
3+
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
4+
5+
import com.azure.core.credential.AccessToken;
6+
import com.azure.core.credential.TokenCredential;
7+
import com.azure.core.credential.TokenRequestContext;
8+
import com.azure.identity.DefaultAzureCredentialBuilder;
9+
import java.net.URI;
10+
import java.util.List;
11+
import java.util.Map;
12+
import javax.security.auth.callback.Callback;
13+
import javax.security.auth.callback.UnsupportedCallbackException;
14+
import javax.security.auth.login.AppConfigurationEntry;
15+
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
16+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
public class AzureEntraLoginCallbackHandler implements AuthenticateCallbackHandler {
21+
22+
private static final Logger LOGGER = LoggerFactory.getLogger(AzureEntraLoginCallbackHandler.class);
23+
24+
private static final String TOKEN_AUDIENCE_FORMAT = "%s://%s/.default";
25+
26+
static TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
27+
28+
private TokenRequestContext tokenRequestContext;
29+
30+
@Override
31+
public void configure(
32+
Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
33+
tokenRequestContext = buildTokenRequestContext(configs);
34+
}
35+
36+
private TokenRequestContext buildTokenRequestContext(Map<String, ?> configs) {
37+
URI uri = buildEventHubsServerUri(configs);
38+
String tokenAudience = buildTokenAudience(uri);
39+
40+
TokenRequestContext request = new TokenRequestContext();
41+
request.addScopes(tokenAudience);
42+
return request;
43+
}
44+
45+
private URI buildEventHubsServerUri(Map<String, ?> configs) {
46+
final List<String> bootstrapServers = (List<String>) configs.get(BOOTSTRAP_SERVERS_CONFIG);
47+
48+
if (null == bootstrapServers) {
49+
final String message = BOOTSTRAP_SERVERS_CONFIG + " is missing from the Kafka configuration.";
50+
LOGGER.error(message);
51+
throw new IllegalArgumentException(message);
52+
}
53+
54+
if (bootstrapServers.size() != 1) {
55+
final String message =
56+
BOOTSTRAP_SERVERS_CONFIG
57+
+ " contains multiple bootstrap servers. Only a single bootstrap server is supported.";
58+
LOGGER.error(message);
59+
throw new IllegalArgumentException(message);
60+
}
61+
62+
return URI.create("https://" + bootstrapServers.get(0));
63+
}
64+
65+
private String buildTokenAudience(URI uri) {
66+
return String.format(TOKEN_AUDIENCE_FORMAT, uri.getScheme(), uri.getHost());
67+
}
68+
69+
@Override
70+
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
71+
for (Callback callback : callbacks) {
72+
if (callback instanceof OAuthBearerTokenCallback oauthCallback) {
73+
handleOAuthCallback(oauthCallback);
74+
} else {
75+
throw new UnsupportedCallbackException(callback);
76+
}
77+
}
78+
}
79+
80+
private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) {
81+
try {
82+
final AccessToken accessToken = tokenCredential.getTokenSync(tokenRequestContext);
83+
final AzureEntraOAuthBearerTokenImpl oauthBearerToken = new AzureEntraOAuthBearerTokenImpl(accessToken);
84+
oauthCallback.token(oauthBearerToken);
85+
} catch (final RuntimeException e) {
86+
final String message =
87+
"Failed to acquire Azure token for Event Hub Authentication. "
88+
+ "Please ensure valid Azure credentials are configured.";
89+
LOGGER.error(message, e);
90+
oauthCallback.error("invalid_grant", message, null);
91+
}
92+
}
93+
94+
public void close() {
95+
// NOOP
96+
}
97+
98+
void setTokenCredential(final TokenCredential tokenCredential) {
99+
AzureEntraLoginCallbackHandler.tokenCredential = tokenCredential;
100+
}
101+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package io.kafbat.ui.sasl.azure.entra;
2+
3+
import com.azure.core.credential.AccessToken;
4+
import com.nimbusds.jwt.JWTClaimsSet;
5+
import com.nimbusds.jwt.JWTParser;
6+
import java.text.ParseException;
7+
import java.util.Arrays;
8+
import java.util.Optional;
9+
import java.util.Set;
10+
import java.util.stream.Collectors;
11+
import org.apache.kafka.common.errors.SaslAuthenticationException;
12+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
13+
14+
public class AzureEntraOAuthBearerTokenImpl implements OAuthBearerToken {
15+
16+
private final AccessToken accessToken;
17+
18+
private final JWTClaimsSet claims;
19+
20+
public AzureEntraOAuthBearerTokenImpl(AccessToken accessToken) {
21+
this.accessToken = accessToken;
22+
23+
try {
24+
claims = JWTParser.parse(accessToken.getToken()).getJWTClaimsSet();
25+
} catch (ParseException exception) {
26+
throw new SaslAuthenticationException("Unable to parse the access token", exception);
27+
}
28+
}
29+
30+
@Override
31+
public String value() {
32+
return accessToken.getToken();
33+
}
34+
35+
@Override
36+
public Long startTimeMs() {
37+
return claims.getIssueTime().getTime();
38+
}
39+
40+
@Override
41+
public long lifetimeMs() {
42+
return claims.getExpirationTime().getTime();
43+
}
44+
45+
@Override
46+
public Set<String> scope() {
47+
// Referring to
48+
// https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the
49+
// scp
50+
// claim is a String which is presented as a space separated list.
51+
return Optional.ofNullable(claims.getClaim("scp"))
52+
.map(s -> Arrays.stream(((String) s).split(" ")).collect(Collectors.toSet()))
53+
.orElse(null);
54+
}
55+
56+
@Override
57+
public String principalName() {
58+
return (String) claims.getClaim("upn");
59+
}
60+
61+
public boolean isExpired() {
62+
return accessToken.isExpired();
63+
}
64+
}

frontend/src/lib/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ export const AUTH_OPTIONS = [
8484
{ value: 'Delegation tokens', label: 'Delegation tokens' },
8585
{ value: 'SASL/LDAP', label: 'SASL/LDAP' },
8686
{ value: 'SASL/AWS IAM', label: 'SASL/AWS IAM' },
87+
{ value: 'SASL/Azure Entra', label: 'SASL/Azure Entra' },
8788
{ value: 'mTLS', label: 'mTLS' },
8889
];
8990

frontend/src/widgets/ClusterConfigForm/schema.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ const authPropsSchema = lazy((_, { parent }) => {
121121
return object({
122122
awsProfileName: string(),
123123
});
124+
case 'SASL/Azure Entra':
124125
case 'mTLS':
125126
default:
126127
return mixed().optional();
@@ -157,6 +158,7 @@ const authSchema = lazy((value) => {
157158
'SASL/SCRAM-512',
158159
'SASL/LDAP',
159160
'SASL/AWS IAM',
161+
'SASL/Azure Entra',
160162
].includes(v);
161163
},
162164
then: (schema) => schema.required('required field'),

frontend/src/widgets/ClusterConfigForm/utils/getJaasConfig.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const JAAS_CONFIGS = {
99
'org.apache.kafka.common.security.scram.ScramLoginModule',
1010
'SASL/LDAP': 'org.apache.kafka.common.security.plain.PlainLoginModule',
1111
'SASL/AWS IAM': 'software.amazon.msk.auth.iam.IAMLoginModule',
12+
'SASL/Azure Entra': 'org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule',
1213
};
1314

1415
type MethodName = keyof typeof JAAS_CONFIGS;

frontend/src/widgets/ClusterConfigForm/utils/transformFormDataToPayload.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,14 @@ export const transformFormDataToPayload = (data: ClusterConfigFormValues) => {
215215
}),
216216
};
217217
break;
218+
case 'SASL/Azure Entra':
219+
config.properties = {
220+
'security.protocol': securityProtocol,
221+
'sasl.mechanism': 'OAUTHBEARER',
222+
'sasl.client.callback.handler.class': 'io.kafbat.ui.sasl.azure.entra.AzureEntraLoginCallbackHandler',
223+
'sasl.jaas.config': getJaasConfig('SASL/Azure Entra', {}),
224+
};
225+
break;
218226
case 'mTLS':
219227
config.properties = {
220228
'security.protocol': 'SSL',

0 commit comments

Comments
 (0)