Skip to content

Commit 64b3ee4

Browse files
author
Thomas Newman
committed
Added LoginCallbackHandler and tests
1 parent b66fbd2 commit 64b3ee4

File tree

3 files changed

+243
-3
lines changed

3 files changed

+243
-3
lines changed

api/src/main/java/io/kafbat/ui/sasl/azure/entra/AzureEntraLoginCallbackHandler.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
import com.azure.core.credential.TokenRequestContext;
88
import com.azure.identity.DefaultAzureCredentialBuilder;
99
import java.net.URI;
10+
import java.time.Duration;
1011
import java.util.List;
1112
import java.util.Map;
1213
import javax.security.auth.callback.Callback;
1314
import javax.security.auth.callback.UnsupportedCallbackException;
1415
import javax.security.auth.login.AppConfigurationEntry;
1516
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
17+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
1618
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
1719
import org.slf4j.Logger;
1820
import org.slf4j.LoggerFactory;
@@ -21,6 +23,10 @@ public class AzureEntraLoginCallbackHandler implements AuthenticateCallbackHandl
2123

2224
private static final Logger LOGGER = LoggerFactory.getLogger(AzureEntraLoginCallbackHandler.class);
2325

26+
private static final Duration ACCESS_TOKEN_REQUEST_BLOCK_TIME = Duration.ofSeconds(10);
27+
28+
private static final int ACCESS_TOKEN_REQUEST_MAX_RETRIES = 6;
29+
2430
private static final String TOKEN_AUDIENCE_FORMAT = "%s://%s/.default";
2531

2632
static TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
@@ -79,9 +85,15 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
7985

8086
private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) {
8187
try {
82-
final AccessToken accessToken = tokenCredential.getTokenSync(tokenRequestContext);
83-
final AzureEntraOAuthBearerTokenImpl oauthBearerToken = new AzureEntraOAuthBearerTokenImpl(accessToken);
84-
oauthCallback.token(oauthBearerToken);
88+
final OAuthBearerToken token = tokenCredential
89+
.getToken(tokenRequestContext)
90+
.map(AzureEntraOAuthBearerTokenImpl::new)
91+
.timeout(ACCESS_TOKEN_REQUEST_BLOCK_TIME)
92+
.doOnError(e -> LOGGER.warn("Failed to acquire Azure token for Event Hub Authentication. Retrying.", e))
93+
.retry(ACCESS_TOKEN_REQUEST_MAX_RETRIES)
94+
.block();
95+
96+
oauthCallback.token(token);
8597
} catch (final RuntimeException e) {
8698
final String message =
8799
"Failed to acquire Azure token for Event Hub Authentication. "
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package io.kafbat.ui.sasl.azure.entra;
2+
3+
import static org.hamcrest.CoreMatchers.is;
4+
import static org.hamcrest.CoreMatchers.notNullValue;
5+
import static org.hamcrest.CoreMatchers.nullValue;
6+
import static org.hamcrest.MatcherAssert.assertThat;
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.junit.jupiter.api.Assertions.assertFalse;
9+
import static org.junit.jupiter.api.Assertions.assertThrows;
10+
import static org.mockito.ArgumentMatchers.any;
11+
import static org.mockito.ArgumentMatchers.anyString;
12+
import static org.mockito.Mockito.mock;
13+
import static org.mockito.Mockito.times;
14+
import static org.mockito.Mockito.verify;
15+
import static org.mockito.Mockito.when;
16+
17+
import com.azure.core.credential.AccessToken;
18+
import com.azure.core.credential.TokenCredential;
19+
import com.azure.core.credential.TokenRequestContext;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
import javax.security.auth.callback.Callback;
24+
import javax.security.auth.callback.UnsupportedCallbackException;
25+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
26+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.extension.ExtendWith;
30+
import org.mockito.ArgumentCaptor;
31+
import org.mockito.Mock;
32+
import org.mockito.junit.jupiter.MockitoExtension;
33+
import reactor.core.publisher.Mono;
34+
35+
@ExtendWith(MockitoExtension.class)
36+
public class AzureEntraLoginCallbackHandlerTest {
37+
38+
// These are not real tokens. It was generated using fake values with an invalid signature,
39+
// so it is safe to store here.
40+
private static final String VALID_SAMPLE_TOKEN =
41+
"eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6IjlHbW55RlBraGMzaE91UjIybXZTdmduTG83WSIsImtpZCI6IjlHbW55"
42+
+ "RlBraGMzaE91UjIybXZTdmduTG83WSJ9.eyJhdWQiOiJodHRwczovL3NhbXBsZS5zZXJ2aWNlYnVzLndpbmRvd3MubmV0IiwiaX"
43+
+ "NzIjoiaHR0cHM6Ly9zdHMud2luZG93cy5uZXQvc2FtcGxlLyIsImlhdCI6MTY5ODQxNTkxMiwibmJmIjoxNjk4NDE1OTEzLCJleH"
44+
+ "AiOjE2OTg0MTU5MTQsImFjciI6IjEiLCJhaW8iOiJzYW1wbGUtYWlvIiwiYW1yIjpbXSwiYXBwaWQiOiJzYW1wbGUtYXBwLWlkIi"
45+
+ "wiYXBwaWRhY3IiOiIwIiwiZmFtaWx5X25hbWUiOiJTYW1wbGUiLCJnaXZlbl9uYW1lIjoiU2FtcGxlIiwiZ3JvdXBzIjpbXSwiaX"
46+
+ "BhZGRyIjoiMTI3LjAuMC4xIiwibmFtZSI6IlNhbXBsZSBOYW1lIiwib2lkIjoic2FtcGxlLW9pZCIsIm9ucHJlbV9zaWQiOiJzYW"
47+
+ "1wbGUtb25wcmVtX3NpZCIsInB1aWQiOiJzYW1wbGUtcHVpZCIsInJoIjoic2FtcGxlLXJoIiwic2NwIjoiZXZlbnRfaHViIHN0b3"
48+
+ "JhZ2VfYWNjb3VudCIsInN1YiI6IlNhbXBsZSBTdWJqZWN0IiwidGlkIjoic2FtcGxlLXRpZCIsInVuaXF1ZV9uYW1lIjoic2FtcG"
49+
+ "xlQG1pY3Jvc29mdC5jb20iLCJ1cG4iOiJzYW1wbGVAbWljcm9zb2Z0LmNvbSIsInV0aSI6InNhbXBsZS11dGkiLCJ2ZXIiOiIxLj"
50+
+ "AiLCJ3aWRzIjpbXX0.DC_guYOsDlRc5GsXE39dn_zlBX54_Y8_mDTLXLgienl9dPMX5RE2X1QXGXA9ukZtptMzP_0wcoqDDjNrys"
51+
+ "GrNhztyeOr0YSeMMFq2NQ5vMBzLapwONwsnv55Hn0jOje9cqnMf43z1LHI6q6-rIIRz-SiTuoYUgOTxzFftpt-7FSqLjQpYEH7bL"
52+
+ "p-0yIU_aJUSb5HQTJbtYYOb54hsZ6VXpaiZ013qGtKODbHTG37kdoIw2MPn66CxanLZKeZM31IVxC-duAqxDgK4O2Ne6xRZRIPW1"
53+
+ "yt61QnZutWTJ4bAyhmplym3OWZ369cyiSJek0uyS5tibXeCYG4Kk8UQSFcsyfwgOsD0xvvcXcLexcUcEekoNBj6ixDhWssFzhC8T"
54+
+ "Npy8-QKNe_Tp6qHzJdI6OV71jpDkGvcmseLHC9GOxBWB0IdYbePTFK-rz2dkN3uMUiFwQJvEbORsq1IaQXj2esT0F7sMfqzWQF9h"
55+
+ "koVy4mJg_auvrZlnQkNPdLHfCacU33ZPwtuSS6b-0XolbxZ5DlJ4p1OJPeHl2xsi61qiHuCBsmnkLNtHmyxNTXGs7xc4dEQokaCK"
56+
+ "-FB_lzC3D4mkJMxKWopQGXnQtizaZjyclGpiUFs3mEauxC7RpsbanitxPFs7FK3mY0MQJk9JNVi1oM-8qfEp8nYT2DwFBhLcIp2z"
57+
+ "Q";
58+
59+
@Mock
60+
private OAuthBearerTokenCallback oauthBearerTokenCallBack;
61+
62+
@Mock
63+
private OAuthBearerToken oauthBearerToken;
64+
65+
@Mock
66+
private TokenCredential tokenCredential;
67+
68+
@Mock
69+
private AccessToken accessToken;
70+
71+
private AzureEntraLoginCallbackHandler azureEntraLoginCallbackHandler;
72+
73+
@BeforeEach
74+
public void beforeEach() {
75+
azureEntraLoginCallbackHandler = new AzureEntraLoginCallbackHandler();
76+
azureEntraLoginCallbackHandler.setTokenCredential(tokenCredential);
77+
}
78+
79+
@Test
80+
public void shouldProvideTokenToCallbackWithSuccessfulTokenRequest()
81+
throws UnsupportedCallbackException {
82+
final Map<String, Object> configs = new HashMap<>();
83+
configs.put(
84+
"bootstrap.servers",
85+
List.of("test-eh.servicebus.windows.net:9093"));
86+
87+
when(tokenCredential.getToken(any(TokenRequestContext.class))).thenReturn(Mono.just(accessToken));
88+
when(accessToken.getToken()).thenReturn(VALID_SAMPLE_TOKEN);
89+
90+
azureEntraLoginCallbackHandler.configure(configs, null, null);
91+
azureEntraLoginCallbackHandler.handle(new Callback[] {oauthBearerTokenCallBack});
92+
93+
final ArgumentCaptor<TokenRequestContext> contextCaptor =
94+
ArgumentCaptor.forClass(TokenRequestContext.class);
95+
final ArgumentCaptor<OAuthBearerToken> tokenCaptor =
96+
ArgumentCaptor.forClass(OAuthBearerToken.class);
97+
98+
verify(tokenCredential, times(1)).getToken(contextCaptor.capture());
99+
verify(oauthBearerTokenCallBack, times(0)).error(anyString(), anyString(), anyString());
100+
verify(oauthBearerTokenCallBack, times(1)).token(tokenCaptor.capture());
101+
102+
final TokenRequestContext tokenRequestContext = contextCaptor.getValue();
103+
assertThat(tokenRequestContext, is(notNullValue()));
104+
assertThat(
105+
tokenRequestContext.getScopes(),
106+
is(List.of("https://test-eh.servicebus.windows.net/.default")));
107+
assertThat(tokenRequestContext.getClaims(), is(nullValue()));
108+
assertThat(tokenRequestContext.getTenantId(), is(nullValue()));
109+
assertFalse(tokenRequestContext.isCaeEnabled());
110+
111+
assertThat(tokenCaptor.getValue(), is(notNullValue()));
112+
assertEquals(VALID_SAMPLE_TOKEN, tokenCaptor.getValue().value());
113+
}
114+
115+
@Test
116+
public void shouldProvideErrorToCallbackWithTokenError() throws UnsupportedCallbackException {
117+
final Map<String, Object> configs = new HashMap<>();
118+
configs.put(
119+
"bootstrap.servers",
120+
List.of("test-eh.servicebus.windows.net:9093"));
121+
122+
when(tokenCredential.getToken(any(TokenRequestContext.class)))
123+
.thenThrow(new RuntimeException("failed to acquire token"));
124+
125+
azureEntraLoginCallbackHandler.configure(configs, null, null);
126+
azureEntraLoginCallbackHandler.handle(new Callback[] {oauthBearerTokenCallBack});
127+
128+
verify(oauthBearerTokenCallBack, times(1))
129+
.error(
130+
"invalid_grant",
131+
"Failed to acquire Azure token for Event Hub Authentication. "
132+
+ "Please ensure valid Azure credentials are configured.",
133+
null);
134+
verify(oauthBearerTokenCallBack, times(0)).token(any());
135+
}
136+
137+
@Test
138+
public void shouldThrowExceptionWithNullBootstrapServers() {
139+
final Map<String, Object> configs = new HashMap<>();
140+
141+
assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure(
142+
configs, null, null));
143+
}
144+
145+
@Test
146+
public void shouldThrowExceptionWithMultipleBootstrapServers() {
147+
final Map<String, Object> configs = new HashMap<>();
148+
configs.put("bootstrap.servers", List.of("server1", "server2"));
149+
150+
assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure(
151+
configs, null, null));
152+
}
153+
154+
@Test
155+
public void shouldThrowExceptionWithUnsupportedCallback() {
156+
assertThrows(UnsupportedCallbackException.class, () -> azureEntraLoginCallbackHandler.handle(
157+
new Callback[] {mock(Callback.class)}));
158+
}
159+
160+
@Test
161+
public void shouldDoNothingOnClose() {
162+
azureEntraLoginCallbackHandler.close();
163+
}
164+
165+
@Test
166+
public void shouldSupportDefaultConstructor() {
167+
new AzureEntraLoginCallbackHandler();
168+
}
169+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package io.kafbat.ui.sasl.azure.entra;
2+
3+
import static org.hamcrest.CoreMatchers.is;
4+
import static org.hamcrest.CoreMatchers.notNullValue;
5+
import static org.hamcrest.MatcherAssert.assertThat;
6+
import static org.junit.jupiter.api.Assertions.assertThrows;
7+
import static org.junit.jupiter.api.Assertions.assertTrue;
8+
9+
import com.azure.core.credential.AccessToken;
10+
import java.time.OffsetDateTime;
11+
import java.util.Set;
12+
import org.apache.kafka.common.errors.SaslAuthenticationException;
13+
import org.junit.jupiter.api.Test;
14+
15+
public class AzureEntraOAuthBearerTokenImplTest {
16+
17+
// These are not real tokens. It was generated using fake values with an invalid signature,
18+
// so it is safe to store here.
19+
private static final String VALID_SAMPLE_TOKEN =
20+
"eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6IjlHbW55RlBraGMzaE91UjIybXZTdmduTG83WSIsImtpZCI6IjlHbW55"
21+
+ "RlBraGMzaE91UjIybXZTdmduTG83WSJ9.eyJhdWQiOiJodHRwczovL3NhbXBsZS5zZXJ2aWNlYnVzLndpbmRvd3MubmV0IiwiaX"
22+
+ "NzIjoiaHR0cHM6Ly9zdHMud2luZG93cy5uZXQvc2FtcGxlLyIsImlhdCI6MTY5ODQxNTkxMiwibmJmIjoxNjk4NDE1OTEzLCJleH"
23+
+ "AiOjE2OTg0MTU5MTQsImFjciI6IjEiLCJhaW8iOiJzYW1wbGUtYWlvIiwiYW1yIjpbXSwiYXBwaWQiOiJzYW1wbGUtYXBwLWlkIi"
24+
+ "wiYXBwaWRhY3IiOiIwIiwiZmFtaWx5X25hbWUiOiJTYW1wbGUiLCJnaXZlbl9uYW1lIjoiU2FtcGxlIiwiZ3JvdXBzIjpbXSwiaX"
25+
+ "BhZGRyIjoiMTI3LjAuMC4xIiwibmFtZSI6IlNhbXBsZSBOYW1lIiwib2lkIjoic2FtcGxlLW9pZCIsIm9ucHJlbV9zaWQiOiJzYW"
26+
+ "1wbGUtb25wcmVtX3NpZCIsInB1aWQiOiJzYW1wbGUtcHVpZCIsInJoIjoic2FtcGxlLXJoIiwic2NwIjoiZXZlbnRfaHViIHN0b3"
27+
+ "JhZ2VfYWNjb3VudCIsInN1YiI6IlNhbXBsZSBTdWJqZWN0IiwidGlkIjoic2FtcGxlLXRpZCIsInVuaXF1ZV9uYW1lIjoic2FtcG"
28+
+ "xlQG1pY3Jvc29mdC5jb20iLCJ1cG4iOiJzYW1wbGVAbWljcm9zb2Z0LmNvbSIsInV0aSI6InNhbXBsZS11dGkiLCJ2ZXIiOiIxLj"
29+
+ "AiLCJ3aWRzIjpbXX0.DC_guYOsDlRc5GsXE39dn_zlBX54_Y8_mDTLXLgienl9dPMX5RE2X1QXGXA9ukZtptMzP_0wcoqDDjNrys"
30+
+ "GrNhztyeOr0YSeMMFq2NQ5vMBzLapwONwsnv55Hn0jOje9cqnMf43z1LHI6q6-rIIRz-SiTuoYUgOTxzFftpt-7FSqLjQpYEH7bL"
31+
+ "p-0yIU_aJUSb5HQTJbtYYOb54hsZ6VXpaiZ013qGtKODbHTG37kdoIw2MPn66CxanLZKeZM31IVxC-duAqxDgK4O2Ne6xRZRIPW1"
32+
+ "yt61QnZutWTJ4bAyhmplym3OWZ369cyiSJek0uyS5tibXeCYG4Kk8UQSFcsyfwgOsD0xvvcXcLexcUcEekoNBj6ixDhWssFzhC8T"
33+
+ "Npy8-QKNe_Tp6qHzJdI6OV71jpDkGvcmseLHC9GOxBWB0IdYbePTFK-rz2dkN3uMUiFwQJvEbORsq1IaQXj2esT0F7sMfqzWQF9h"
34+
+ "koVy4mJg_auvrZlnQkNPdLHfCacU33ZPwtuSS6b-0XolbxZ5DlJ4p1OJPeHl2xsi61qiHuCBsmnkLNtHmyxNTXGs7xc4dEQokaCK"
35+
+ "-FB_lzC3D4mkJMxKWopQGXnQtizaZjyclGpiUFs3mEauxC7RpsbanitxPFs7FK3mY0MQJk9JNVi1oM-8qfEp8nYT2DwFBhLcIp2z"
36+
+ "Q";
37+
38+
@Test
39+
void constructorShouldParseToken() {
40+
final AccessToken accessToken = new AccessToken(VALID_SAMPLE_TOKEN, OffsetDateTime.MIN);
41+
42+
final AzureEntraOAuthBearerTokenImpl azureOAuthBearerToken =
43+
new AzureEntraOAuthBearerTokenImpl(accessToken);
44+
45+
assertThat(azureOAuthBearerToken, is(notNullValue()));
46+
assertThat(azureOAuthBearerToken.value(), is(VALID_SAMPLE_TOKEN));
47+
assertThat(azureOAuthBearerToken.startTimeMs(), is(1698415912000L));
48+
assertThat(azureOAuthBearerToken.lifetimeMs(), is(1698415914000L));
49+
assertThat(azureOAuthBearerToken.scope(), is(Set.of("event_hub", "storage_account")));
50+
assertThat(azureOAuthBearerToken.principalName(), is("sample@microsoft.com"));
51+
assertTrue(azureOAuthBearerToken.isExpired());
52+
}
53+
54+
@Test
55+
void constructorShouldRejectInvalidToken() {
56+
assertThrows(SaslAuthenticationException.class, () -> new AzureEntraOAuthBearerTokenImpl(
57+
new AccessToken("invalid", OffsetDateTime.MIN)));
58+
}
59+
}

0 commit comments

Comments
 (0)