Skip to content

Kafka Endless Rebalancing When Adding New Instance #4233

@SzymonZasada

Description

@SzymonZasada

I'm experiencing an endless rebalancing loop when adding new instances. The consumer group never stabilizes and keeps rebalancing continuously.
I can only use one instance, regardless of whether I have 1-10 concurrency per instance. Each additional instance (above 1) results in infinite rebalancing.

I pool 200 messages at a time. It takes me about 50-60 seconds max to process them all.
-20 topics each 30 partitions

Environment:
Spring Boot 3.5.8 with Spring Kafka
30 partitions per topic
concurrency=10 per instance
Running in Docker with graceful shutdown working correctly

Errors:
Request joining group due to: group is already rebalancing

Kafka config:

`@EnableKafka
@configuration
@slf4j
public class KafkaConfig {

private static final int POLL_TIMEOUT_MS = 150_000;  // 2.5 min

@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

//producer
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    configProps.put(ProducerConfig.RETRIES_CONFIG, new DefaultKafkaConfig().getMaxRetries());
    configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
    configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    configProps.put(ProducerConfig.ACKS_CONFIG, "all");

    configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
            LoggingProducerInterceptor.class.getName());

    return new DefaultKafkaProducerFactory<>(configProps);
}

//consumer
@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
    configProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);

    configProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");


    configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10_000);
    configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3_000);
    configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, POLL_TIMEOUT_MS);

    configProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
    configProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 90_000);

    return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaMdcInterceptor kafkaMdcInterceptor) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    int maxRetries = new DefaultKafkaConfig().getMaxConsumerRetries();
    factory.setCommonErrorHandler(new LoggingErrorHandler(new FixedBackOff(500L, maxRetries - 1)));

    configureFactory(factory, kafkaMdcInterceptor);

    return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryNoRetry(KafkaMdcInterceptor kafkaMdcInterceptor) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    // Without retry - improtant
    factory.setCommonErrorHandler(new LoggingErrorHandler(new FixedBackOff(0L, 0L)));

    configureFactory(factory, kafkaMdcInterceptor);

    return factory;
}


private void configureFactory(ConcurrentKafkaListenerContainerFactory<String, String> factory,
                              KafkaMdcInterceptor kafkaMdcInterceptor) {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
    executor.setVirtualThreads(true);

    factory.getContainerProperties().setShutdownTimeout((long) POLL_TIMEOUT_MS);
    factory.getContainerProperties().setStopImmediate(false);


    factory.getContainerProperties().setListenerTaskExecutor(executor);
    factory.getContainerProperties().setDeliveryAttemptHeader(true);
    factory.setRecordInterceptor(kafkaMdcInterceptor);
}

}`

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions