Skip to content

Commit 04d15b3

Browse files
Auth: Support Azure Entra (Event Hub with Kafka Protocol) (#530)
1 parent 273e64c commit 04d15b3

File tree

9 files changed

+421
-0
lines changed

9 files changed

+421
-0
lines changed

api/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@
9494
<version>2.1.0</version>
9595
</dependency>
9696

97+
<dependency>
98+
<groupId>com.azure</groupId>
99+
<artifactId>azure-identity</artifactId>
100+
<version>1.13.0</version>
101+
</dependency>
102+
97103
<dependency>
98104
<groupId>org.apache.avro</groupId>
99105
<artifactId>avro</artifactId>
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package io.kafbat.ui.config.auth.azure;
2+
3+
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
4+
5+
import com.azure.core.credential.TokenCredential;
6+
import com.azure.core.credential.TokenRequestContext;
7+
import com.azure.identity.DefaultAzureCredentialBuilder;
8+
import java.net.URI;
9+
import java.time.Duration;
10+
import java.util.List;
11+
import java.util.Map;
12+
import javax.security.auth.callback.Callback;
13+
import javax.security.auth.callback.UnsupportedCallbackException;
14+
import javax.security.auth.login.AppConfigurationEntry;
15+
import lombok.extern.slf4j.Slf4j;
16+
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
17+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
18+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
19+
20+
@Slf4j
21+
public class AzureEntraLoginCallbackHandler implements AuthenticateCallbackHandler {
22+
23+
private static final Duration ACCESS_TOKEN_REQUEST_BLOCK_TIME = Duration.ofSeconds(10);
24+
25+
private static final int ACCESS_TOKEN_REQUEST_MAX_RETRIES = 6;
26+
27+
private static final String TOKEN_AUDIENCE_FORMAT = "%s://%s/.default";
28+
29+
static TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
30+
31+
private TokenRequestContext tokenRequestContext;
32+
33+
@Override
34+
public void configure(
35+
Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
36+
tokenRequestContext = buildTokenRequestContext(configs);
37+
}
38+
39+
private TokenRequestContext buildTokenRequestContext(Map<String, ?> configs) {
40+
URI uri = buildEventHubsServerUri(configs);
41+
String tokenAudience = buildTokenAudience(uri);
42+
43+
TokenRequestContext request = new TokenRequestContext();
44+
request.addScopes(tokenAudience);
45+
return request;
46+
}
47+
48+
private URI buildEventHubsServerUri(Map<String, ?> configs) {
49+
final List<String> bootstrapServers = (List<String>) configs.get(BOOTSTRAP_SERVERS_CONFIG);
50+
51+
if (null == bootstrapServers) {
52+
final String message = BOOTSTRAP_SERVERS_CONFIG + " is missing from the Kafka configuration.";
53+
log.error(message);
54+
throw new IllegalArgumentException(message);
55+
}
56+
57+
if (bootstrapServers.size() != 1) {
58+
final String message =
59+
BOOTSTRAP_SERVERS_CONFIG
60+
+ " contains multiple bootstrap servers. Only a single bootstrap server is supported.";
61+
log.error(message);
62+
throw new IllegalArgumentException(message);
63+
}
64+
65+
return URI.create("https://" + bootstrapServers.get(0));
66+
}
67+
68+
private String buildTokenAudience(URI uri) {
69+
return String.format(TOKEN_AUDIENCE_FORMAT, uri.getScheme(), uri.getHost());
70+
}
71+
72+
@Override
73+
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
74+
for (Callback callback : callbacks) {
75+
if (callback instanceof OAuthBearerTokenCallback oauthCallback) {
76+
handleOAuthCallback(oauthCallback);
77+
} else {
78+
throw new UnsupportedCallbackException(callback);
79+
}
80+
}
81+
}
82+
83+
private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) {
84+
try {
85+
final OAuthBearerToken token = tokenCredential
86+
.getToken(tokenRequestContext)
87+
.map(AzureEntraOAuthBearerToken::new)
88+
.timeout(ACCESS_TOKEN_REQUEST_BLOCK_TIME)
89+
.doOnError(e -> log.warn("Failed to acquire Azure token for Event Hub Authentication. Retrying.", e))
90+
.retry(ACCESS_TOKEN_REQUEST_MAX_RETRIES)
91+
.block();
92+
93+
oauthCallback.token(token);
94+
} catch (final RuntimeException e) {
95+
final String message =
96+
"Failed to acquire Azure token for Event Hub Authentication. "
97+
+ "Please ensure valid Azure credentials are configured.";
98+
log.error(message, e);
99+
oauthCallback.error("invalid_grant", message, null);
100+
}
101+
}
102+
103+
public void close() {
104+
// NOOP
105+
}
106+
107+
void setTokenCredential(final TokenCredential tokenCredential) {
108+
AzureEntraLoginCallbackHandler.tokenCredential = tokenCredential;
109+
}
110+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.kafbat.ui.config.auth.azure;
2+
3+
import com.azure.core.credential.AccessToken;
4+
import com.nimbusds.jwt.JWTClaimsSet;
5+
import com.nimbusds.jwt.JWTParser;
6+
import java.text.ParseException;
7+
import java.util.Arrays;
8+
import java.util.Optional;
9+
import java.util.Set;
10+
import java.util.stream.Collectors;
11+
import org.apache.kafka.common.errors.SaslAuthenticationException;
12+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
13+
14+
public class AzureEntraOAuthBearerToken implements OAuthBearerToken {
15+
16+
private final AccessToken accessToken;
17+
18+
private final JWTClaimsSet claims;
19+
20+
public AzureEntraOAuthBearerToken(AccessToken accessToken) {
21+
this.accessToken = accessToken;
22+
23+
try {
24+
claims = JWTParser.parse(accessToken.getToken()).getJWTClaimsSet();
25+
} catch (ParseException exception) {
26+
throw new SaslAuthenticationException("Unable to parse the access token", exception);
27+
}
28+
}
29+
30+
@Override
31+
public String value() {
32+
return accessToken.getToken();
33+
}
34+
35+
@Override
36+
public Long startTimeMs() {
37+
return claims.getIssueTime().getTime();
38+
}
39+
40+
@Override
41+
public long lifetimeMs() {
42+
return claims.getExpirationTime().getTime();
43+
}
44+
45+
@Override
46+
public Set<String> scope() {
47+
// Referring to
48+
// https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the
49+
// scp
50+
// claim is a String which is presented as a space separated list.
51+
return Arrays.stream(((String) claims.getClaim("scp")).split(" ")).collect(Collectors.toSet());
52+
}
53+
54+
@Override
55+
public String principalName() {
56+
return (String) claims.getClaim("upn");
57+
}
58+
59+
public boolean isExpired() {
60+
return accessToken.isExpired();
61+
}
62+
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package io.kafbat.ui.config.auth.azure;
2+
3+
import static org.hamcrest.CoreMatchers.is;
4+
import static org.hamcrest.CoreMatchers.notNullValue;
5+
import static org.hamcrest.CoreMatchers.nullValue;
6+
import static org.hamcrest.MatcherAssert.assertThat;
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.junit.jupiter.api.Assertions.assertFalse;
9+
import static org.junit.jupiter.api.Assertions.assertThrows;
10+
import static org.mockito.ArgumentMatchers.any;
11+
import static org.mockito.ArgumentMatchers.anyString;
12+
import static org.mockito.Mockito.mock;
13+
import static org.mockito.Mockito.times;
14+
import static org.mockito.Mockito.verify;
15+
import static org.mockito.Mockito.when;
16+
17+
import com.azure.core.credential.AccessToken;
18+
import com.azure.core.credential.TokenCredential;
19+
import com.azure.core.credential.TokenRequestContext;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
import javax.security.auth.callback.Callback;
24+
import javax.security.auth.callback.UnsupportedCallbackException;
25+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
26+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.extension.ExtendWith;
30+
import org.mockito.ArgumentCaptor;
31+
import org.mockito.Mock;
32+
import org.mockito.junit.jupiter.MockitoExtension;
33+
import reactor.core.publisher.Mono;
34+
35+
@ExtendWith(MockitoExtension.class)
36+
public class AzureEntraLoginCallbackHandlerTest {
37+
38+
// These are not real tokens. It was generated using fake values with an invalid signature,
39+
// so it is safe to store here.
40+
private static final String VALID_SAMPLE_TOKEN =
41+
"eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6IjlHbW55RlBraGMzaE91UjIybXZTdmduTG83WSIsImtpZCI6IjlHbW55"
42+
+ "RlBraGMzaE91UjIybXZTdmduTG83WSJ9.eyJhdWQiOiJodHRwczovL3NhbXBsZS5zZXJ2aWNlYnVzLndpbmRvd3MubmV0IiwiaX"
43+
+ "NzIjoiaHR0cHM6Ly9zdHMud2luZG93cy5uZXQvc2FtcGxlLyIsImlhdCI6MTY5ODQxNTkxMiwibmJmIjoxNjk4NDE1OTEzLCJleH"
44+
+ "AiOjE2OTg0MTU5MTQsImFjciI6IjEiLCJhaW8iOiJzYW1wbGUtYWlvIiwiYW1yIjpbXSwiYXBwaWQiOiJzYW1wbGUtYXBwLWlkIi"
45+
+ "wiYXBwaWRhY3IiOiIwIiwiZmFtaWx5X25hbWUiOiJTYW1wbGUiLCJnaXZlbl9uYW1lIjoiU2FtcGxlIiwiZ3JvdXBzIjpbXSwiaX"
46+
+ "BhZGRyIjoiMTI3LjAuMC4xIiwibmFtZSI6IlNhbXBsZSBOYW1lIiwib2lkIjoic2FtcGxlLW9pZCIsIm9ucHJlbV9zaWQiOiJzYW"
47+
+ "1wbGUtb25wcmVtX3NpZCIsInB1aWQiOiJzYW1wbGUtcHVpZCIsInJoIjoic2FtcGxlLXJoIiwic2NwIjoiZXZlbnRfaHViIHN0b3"
48+
+ "JhZ2VfYWNjb3VudCIsInN1YiI6IlNhbXBsZSBTdWJqZWN0IiwidGlkIjoic2FtcGxlLXRpZCIsInVuaXF1ZV9uYW1lIjoic2FtcG"
49+
+ "xlQG1pY3Jvc29mdC5jb20iLCJ1cG4iOiJzYW1wbGVAbWljcm9zb2Z0LmNvbSIsInV0aSI6InNhbXBsZS11dGkiLCJ2ZXIiOiIxLj"
50+
+ "AiLCJ3aWRzIjpbXX0.DC_guYOsDlRc5GsXE39dn_zlBX54_Y8_mDTLXLgienl9dPMX5RE2X1QXGXA9ukZtptMzP_0wcoqDDjNrys"
51+
+ "GrNhztyeOr0YSeMMFq2NQ5vMBzLapwONwsnv55Hn0jOje9cqnMf43z1LHI6q6-rIIRz-SiTuoYUgOTxzFftpt-7FSqLjQpYEH7bL"
52+
+ "p-0yIU_aJUSb5HQTJbtYYOb54hsZ6VXpaiZ013qGtKODbHTG37kdoIw2MPn66CxanLZKeZM31IVxC-duAqxDgK4O2Ne6xRZRIPW1"
53+
+ "yt61QnZutWTJ4bAyhmplym3OWZ369cyiSJek0uyS5tibXeCYG4Kk8UQSFcsyfwgOsD0xvvcXcLexcUcEekoNBj6ixDhWssFzhC8T"
54+
+ "Npy8-QKNe_Tp6qHzJdI6OV71jpDkGvcmseLHC9GOxBWB0IdYbePTFK-rz2dkN3uMUiFwQJvEbORsq1IaQXj2esT0F7sMfqzWQF9h"
55+
+ "koVy4mJg_auvrZlnQkNPdLHfCacU33ZPwtuSS6b-0XolbxZ5DlJ4p1OJPeHl2xsi61qiHuCBsmnkLNtHmyxNTXGs7xc4dEQokaCK"
56+
+ "-FB_lzC3D4mkJMxKWopQGXnQtizaZjyclGpiUFs3mEauxC7RpsbanitxPFs7FK3mY0MQJk9JNVi1oM-8qfEp8nYT2DwFBhLcIp2z"
57+
+ "Q";
58+
59+
@Mock
60+
private OAuthBearerTokenCallback oauthBearerTokenCallBack;
61+
62+
@Mock
63+
private OAuthBearerToken oauthBearerToken;
64+
65+
@Mock
66+
private TokenCredential tokenCredential;
67+
68+
@Mock
69+
private AccessToken accessToken;
70+
71+
private AzureEntraLoginCallbackHandler azureEntraLoginCallbackHandler;
72+
73+
@BeforeEach
74+
public void beforeEach() {
75+
azureEntraLoginCallbackHandler = new AzureEntraLoginCallbackHandler();
76+
azureEntraLoginCallbackHandler.setTokenCredential(tokenCredential);
77+
}
78+
79+
@Test
80+
public void shouldProvideTokenToCallbackWithSuccessfulTokenRequest()
81+
throws UnsupportedCallbackException {
82+
final Map<String, Object> configs = new HashMap<>();
83+
configs.put(
84+
"bootstrap.servers",
85+
List.of("test-eh.servicebus.windows.net:9093"));
86+
87+
when(tokenCredential.getToken(any(TokenRequestContext.class))).thenReturn(Mono.just(accessToken));
88+
when(accessToken.getToken()).thenReturn(VALID_SAMPLE_TOKEN);
89+
90+
azureEntraLoginCallbackHandler.configure(configs, null, null);
91+
azureEntraLoginCallbackHandler.handle(new Callback[] {oauthBearerTokenCallBack});
92+
93+
final ArgumentCaptor<TokenRequestContext> contextCaptor =
94+
ArgumentCaptor.forClass(TokenRequestContext.class);
95+
final ArgumentCaptor<OAuthBearerToken> tokenCaptor =
96+
ArgumentCaptor.forClass(OAuthBearerToken.class);
97+
98+
verify(tokenCredential, times(1)).getToken(contextCaptor.capture());
99+
verify(oauthBearerTokenCallBack, times(0)).error(anyString(), anyString(), anyString());
100+
verify(oauthBearerTokenCallBack, times(1)).token(tokenCaptor.capture());
101+
102+
final TokenRequestContext tokenRequestContext = contextCaptor.getValue();
103+
assertThat(tokenRequestContext, is(notNullValue()));
104+
assertThat(
105+
tokenRequestContext.getScopes(),
106+
is(List.of("https://test-eh.servicebus.windows.net/.default")));
107+
assertThat(tokenRequestContext.getClaims(), is(nullValue()));
108+
assertThat(tokenRequestContext.getTenantId(), is(nullValue()));
109+
assertFalse(tokenRequestContext.isCaeEnabled());
110+
111+
assertThat(tokenCaptor.getValue(), is(notNullValue()));
112+
assertEquals(VALID_SAMPLE_TOKEN, tokenCaptor.getValue().value());
113+
}
114+
115+
@Test
116+
public void shouldProvideErrorToCallbackWithTokenError() throws UnsupportedCallbackException {
117+
final Map<String, Object> configs = new HashMap<>();
118+
configs.put(
119+
"bootstrap.servers",
120+
List.of("test-eh.servicebus.windows.net:9093"));
121+
122+
when(tokenCredential.getToken(any(TokenRequestContext.class)))
123+
.thenThrow(new RuntimeException("failed to acquire token"));
124+
125+
azureEntraLoginCallbackHandler.configure(configs, null, null);
126+
azureEntraLoginCallbackHandler.handle(new Callback[] {oauthBearerTokenCallBack});
127+
128+
verify(oauthBearerTokenCallBack, times(1))
129+
.error(
130+
"invalid_grant",
131+
"Failed to acquire Azure token for Event Hub Authentication. "
132+
+ "Please ensure valid Azure credentials are configured.",
133+
null);
134+
verify(oauthBearerTokenCallBack, times(0)).token(any());
135+
}
136+
137+
@Test
138+
public void shouldThrowExceptionWithNullBootstrapServers() {
139+
final Map<String, Object> configs = new HashMap<>();
140+
141+
assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure(
142+
configs, null, null));
143+
}
144+
145+
@Test
146+
public void shouldThrowExceptionWithMultipleBootstrapServers() {
147+
final Map<String, Object> configs = new HashMap<>();
148+
configs.put("bootstrap.servers", List.of("server1", "server2"));
149+
150+
assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure(
151+
configs, null, null));
152+
}
153+
154+
@Test
155+
public void shouldThrowExceptionWithUnsupportedCallback() {
156+
assertThrows(UnsupportedCallbackException.class, () -> azureEntraLoginCallbackHandler.handle(
157+
new Callback[] {mock(Callback.class)}));
158+
}
159+
160+
@Test
161+
public void shouldDoNothingOnClose() {
162+
azureEntraLoginCallbackHandler.close();
163+
}
164+
165+
@Test
166+
public void shouldSupportDefaultConstructor() {
167+
new AzureEntraLoginCallbackHandler();
168+
}
169+
}

0 commit comments

Comments
 (0)