Skip to content

Commit de1622d

Browse files
authored
Add KafkaListener support for shared consumer containers
- Add AbstractShareKafkaListenerContainerFactory base class for share consumer factories - Add ShareKafkaListenerContainerFactory concrete implementation - Add ShareRecordMessagingMessageListenerAdapter for share consumer message handling - Modify MethodKafkaListenerEndpoint to create appropriate listener adapters based on container type - Add integration tests for ShareKafkaListener functionality Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
1 parent fbb6dac commit de1622d

File tree

4 files changed

+355
-22
lines changed

4 files changed

+355
-22
lines changed
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.config;
18+
19+
import java.util.Arrays;
20+
import java.util.Collection;
21+
import java.util.regex.Pattern;
22+
23+
import org.jspecify.annotations.Nullable;
24+
25+
import org.springframework.context.ApplicationContext;
26+
import org.springframework.context.ApplicationContextAware;
27+
import org.springframework.context.ApplicationEventPublisher;
28+
import org.springframework.context.ApplicationEventPublisherAware;
29+
import org.springframework.kafka.core.ShareConsumerFactory;
30+
import org.springframework.kafka.listener.ContainerProperties;
31+
import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer;
32+
import org.springframework.kafka.support.JavaUtils;
33+
import org.springframework.kafka.support.TopicPartitionOffset;
34+
import org.springframework.util.Assert;
35+
36+
/**
37+
* A {@link KafkaListenerContainerFactory} implementation to create {@link ShareKafkaMessageListenerContainer}
38+
* instances for Kafka's share consumer model.
39+
* <p>
40+
* This factory provides common configuration and lifecycle management for share consumer containers.
41+
* It handles the creation of containers based on endpoints, topics, or patterns, and applies common
42+
* configuration properties to the created containers.
43+
* <p>
44+
* The share consumer model enables cooperative rebalancing, allowing consumers to maintain ownership of
45+
* some partitions while relinquishing others during rebalances, which can reduce disruption compared to
46+
* the classic consumer model.
47+
*
48+
* @param <K> the key type
49+
* @param <V> the value type
50+
*
51+
* @author Soby Chacko
52+
* @since 4.0
53+
*/
54+
public class ShareKafkaListenerContainerFactory<K, V>
55+
implements KafkaListenerContainerFactory<ShareKafkaMessageListenerContainer<K, V>>, ApplicationEventPublisherAware, ApplicationContextAware {
56+
57+
private final ShareConsumerFactory<? super K, ? super V> shareConsumerFactory;
58+
59+
private @Nullable Boolean autoStartup;
60+
61+
private @Nullable Integer phase;
62+
63+
private @Nullable ApplicationEventPublisher applicationEventPublisher;
64+
65+
private @Nullable ApplicationContext applicationContext;
66+
67+
/**
68+
* Construct an instance with the provided consumer factory.
69+
* @param shareConsumerFactory the share consumer factory
70+
*/
71+
public ShareKafkaListenerContainerFactory(ShareConsumerFactory<K, V> shareConsumerFactory) {
72+
this.shareConsumerFactory = shareConsumerFactory;
73+
}
74+
75+
@Override
76+
public void setApplicationContext(ApplicationContext applicationContext) {
77+
this.applicationContext = applicationContext;
78+
}
79+
80+
/**
81+
* Set whether containers created by this factory should auto-start.
82+
* @param autoStartup true to auto-start
83+
*/
84+
public void setAutoStartup(Boolean autoStartup) {
85+
this.autoStartup = autoStartup;
86+
}
87+
88+
/**
89+
* Set the phase in which containers created by this factory should start and stop.
90+
* @param phase the phase
91+
*/
92+
public void setPhase(Integer phase) {
93+
this.phase = phase;
94+
}
95+
96+
@Override
97+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
98+
this.applicationEventPublisher = applicationEventPublisher;
99+
}
100+
101+
@Override
102+
@SuppressWarnings({"unchecked", "rawtypes"})
103+
public ShareKafkaMessageListenerContainer<K, V> createListenerContainer(KafkaListenerEndpoint endpoint) {
104+
ShareKafkaMessageListenerContainer<K, V> instance = createContainerInstance(endpoint);
105+
JavaUtils.INSTANCE
106+
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
107+
if (endpoint instanceof AbstractKafkaListenerEndpoint abstractKafkaListenerEndpoint) {
108+
configureEndpoint(abstractKafkaListenerEndpoint);
109+
}
110+
// TODO: No message converter for queue at the moment
111+
endpoint.setupListenerContainer(instance, null);
112+
initializeContainer(instance, endpoint);
113+
return instance;
114+
}
115+
116+
private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> endpoint) {
117+
// Minimal configuration; can add more properties later
118+
}
119+
120+
/**
121+
* Initialize the provided container with common configuration properties.
122+
* @param instance the container instance
123+
* @param endpoint the endpoint
124+
*/
125+
protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> instance, KafkaListenerEndpoint endpoint) {
126+
ContainerProperties properties = instance.getContainerProperties();
127+
Boolean effectiveAutoStartup = endpoint.getAutoStartup() != null ? endpoint.getAutoStartup() : this.autoStartup;
128+
JavaUtils.INSTANCE
129+
.acceptIfNotNull(effectiveAutoStartup, instance::setAutoStartup)
130+
.acceptIfNotNull(this.phase, instance::setPhase)
131+
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
132+
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
133+
.acceptIfNotNull(endpoint.getGroupId(), properties::setGroupId)
134+
.acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId)
135+
.acceptIfNotNull(endpoint.getConsumerProperties(), properties::setKafkaConsumerProperties);
136+
}
137+
138+
@Override
139+
public ShareKafkaMessageListenerContainer<K, V> createContainer(TopicPartitionOffset... topicPartitions) {
140+
throw new UnsupportedOperationException("ShareConsumer does not support explicit partition assignment");
141+
}
142+
143+
@Override
144+
public ShareKafkaMessageListenerContainer<K, V> createContainer(String... topics) {
145+
return createContainerInstance(new KafkaListenerEndpointAdapter() {
146+
147+
@Override
148+
public Collection<String> getTopics() {
149+
return Arrays.asList(topics);
150+
}
151+
});
152+
}
153+
154+
@Override
155+
public ShareKafkaMessageListenerContainer<K, V> createContainer(Pattern topicPattern) {
156+
throw new UnsupportedOperationException("ShareConsumer does not support topic patterns");
157+
}
158+
159+
/**
160+
* Create a container instance for the provided endpoint.
161+
* @param endpoint the endpoint
162+
* @return the container instance
163+
*/
164+
protected ShareKafkaMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
165+
Collection<String> topics = endpoint.getTopics();
166+
Assert.state(topics != null, "'topics' must not be null");
167+
return new ShareKafkaMessageListenerContainer<>(this.shareConsumerFactory,
168+
new ContainerProperties(topics.toArray(new String[0])));
169+
}
170+
171+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public abstract class AbstractShareKafkaMessageListenerContainer<K, V>
5858
*/
5959
public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100;
6060

61+
/**
62+
* The share consumer factory used to create consumer instances.
63+
*/
6164
protected final ShareConsumerFactory<K, V> shareConsumerFactory;
6265

6366
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
@@ -86,7 +89,7 @@ public abstract class AbstractShareKafkaMessageListenerContainer<K, V>
8689
* @param containerProperties the properties.
8790
*/
8891
@SuppressWarnings("unchecked")
89-
protected AbstractShareKafkaMessageListenerContainer(@Nullable ShareConsumerFactory<? super K, ? super V> shareConsumerFactory,
92+
protected AbstractShareKafkaMessageListenerContainer(ShareConsumerFactory<? super K, ? super V> shareConsumerFactory,
9093
ContainerProperties containerProperties) {
9194
Assert.notNull(containerProperties, "'containerProperties' cannot be null");
9295
Assert.notNull(shareConsumerFactory, "'shareConsumerFactory' cannot be null");

spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-present the original author or authors.
2+
* Copyright 2025-present the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.CountDownLatch;
2424

2525
import org.apache.kafka.clients.consumer.AcknowledgeType;
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2627
import org.apache.kafka.clients.consumer.ShareConsumer;
2728
import org.apache.kafka.common.Metric;
2829
import org.apache.kafka.common.MetricName;
@@ -43,8 +44,6 @@
4344
* This container manages a single-threaded consumer loop using a {@link org.springframework.kafka.core.ShareConsumerFactory}.
4445
* It is designed for use cases where Kafka's cooperative sharing protocol is desired, and provides a simple polling loop
4546
* with per-record dispatch and acknowledgement.
46-
* <p>
47-
* Lifecycle events are published for consumer starting and started. The container supports direct setting of the client.id.
4847
*
4948
* @param <K> the key type
5049
* @param <V> the value type
@@ -79,14 +78,6 @@ public ShareKafkaMessageListenerContainer(ShareConsumerFactory<? super K, ? supe
7978
Assert.notNull(shareConsumerFactory, "A ShareConsumerFactory must be provided");
8079
}
8180

82-
/**
83-
* Set the {@code client.id} to use for the consumer.
84-
* @param clientId the client id to set
85-
*/
86-
public void setClientId(String clientId) {
87-
this.clientId = clientId;
88-
}
89-
9081
/**
9182
* Get the {@code client.id} for the consumer.
9283
* @return the client id, or null if not set
@@ -96,6 +87,14 @@ public String getClientId() {
9687
return this.clientId;
9788
}
9889

90+
/**
91+
* Set the {@code client.id} to use for the consumer.
92+
* @param clientId the client id to set
93+
*/
94+
public void setClientId(String clientId) {
95+
this.clientId = clientId;
96+
}
97+
9998
@Override
10099
public boolean isInExpectedState() {
101100
return isRunning();
@@ -152,7 +151,7 @@ private void publishConsumerStartedEvent() {
152151
}
153152

154153
/**
155-
* The inner share consumer thread: polls for records and dispatches to the listener.
154+
* The inner share consumer thread that polls for records and dispatches to the listener.
156155
*/
157156
private class ShareListenerConsumer implements Runnable {
158157

@@ -168,12 +167,11 @@ private class ShareListenerConsumer implements Runnable {
168167

169168
ShareListenerConsumer(GenericMessageListener<?> listener) {
170169
this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer(
171-
ShareKafkaMessageListenerContainer.this.getGroupId(),
172-
ShareKafkaMessageListenerContainer.this.getClientId());
170+
ShareKafkaMessageListenerContainer.this.getGroupId(),
171+
ShareKafkaMessageListenerContainer.this.getClientId());
173172

174173
this.genericListener = listener;
175174
this.clientId = ShareKafkaMessageListenerContainer.this.getClientId();
176-
// Subscribe to topics, just like in the test
177175
ContainerProperties containerProperties = getContainerProperties();
178176
this.consumer.subscribe(Arrays.asList(containerProperties.getTopics()));
179177
}
@@ -184,6 +182,7 @@ String getClientId() {
184182
}
185183

186184
@Override
185+
@SuppressWarnings({"unchecked", "rawtypes"})
187186
public void run() {
188187
initialize();
189188
Throwable exitThrowable = null;
@@ -192,9 +191,14 @@ public void run() {
192191
var records = this.consumer.poll(java.time.Duration.ofMillis(POLL_TIMEOUT));
193192
if (records != null && records.count() > 0) {
194193
for (var record : records) {
195-
@SuppressWarnings("unchecked")
196-
GenericMessageListener<Object> listener = (GenericMessageListener<Object>) this.genericListener;
197-
listener.onMessage(record);
194+
if (this.genericListener instanceof AcknowledgingConsumerAwareMessageListener ackListener) {
195+
ackListener.onMessage(record, null, null);
196+
}
197+
else {
198+
GenericMessageListener<ConsumerRecord<K, V>> listener =
199+
(GenericMessageListener<ConsumerRecord<K, V>>) this.genericListener;
200+
listener.onMessage(record);
201+
}
198202
// Temporarily auto-acknowledge and commit.
199203
// We will refactor it later on to support more production-like scenarios.
200204
this.consumer.acknowledge(record, AcknowledgeType.ACCEPT);
@@ -235,9 +239,11 @@ private void wrapUp() {
235239
@Override
236240
public String toString() {
237241
return "ShareKafkaMessageListenerContainer.ShareListenerConsumer ["
238-
+ "consumerGroupId=" + this.consumerGroupId
239-
+ ", clientId=" + this.clientId
240-
+ "]";
242+
+ "consumerGroupId=" + this.consumerGroupId
243+
+ ", clientId=" + this.clientId
244+
+ "]";
241245
}
246+
242247
}
248+
243249
}

0 commit comments

Comments
 (0)