From 90afffb2735d218f74c888f5cb50a285ffcd6ccb Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 3 Jan 2025 14:43:28 -0500 Subject: [PATCH 1/2] GH-3062: Fix `KafkaBinderMetrics` for resource leaks Fixes: https://github.com/spring-cloud/spring-cloud-stream/issues/3062 The `KafkaBinderMetrics` creates `KafkaConsumer` instances and schedule the fix rate task for them, but never closes them even when the `scheduler` is shut downed * Implement a `Lifecycle` contract in the `KafkaBinderMetrics` and call `close()` from the `stop()` to satisfy CRaC resource management expectations. * Also close all the `KafkaConsumer` instances from the `metadataConsumers` **Cherry-pick to `4.1.x`** --- .../binder/kafka/KafkaBinderMetrics.java | 63 ++++++++++++++++--- .../binder/kafka/KafkaBinderMetricsTest.java | 8 ++- 2 files changed, 58 insertions(+), 13 deletions(-) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java index c565634929..34f4b071bd 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,16 +20,15 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.function.ToDoubleFunction; @@ -50,10 +49,12 @@ import org.springframework.cloud.stream.binder.kafka.common.TopicInformation; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; import org.springframework.context.ApplicationListener; +import org.springframework.context.Lifecycle; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.lang.Nullable; import org.springframework.util.ObjectUtils; +import org.springframework.util.ReflectionUtils; /** * Metrics for Kafka binder. @@ -72,7 +73,7 @@ * @author Omer Celik */ public class KafkaBinderMetrics - implements MeterBinder, ApplicationListener, AutoCloseable { + implements MeterBinder, ApplicationListener, AutoCloseable, Lifecycle { private static final int DEFAULT_TIMEOUT = 5; @@ -101,6 +102,8 @@ public class KafkaBinderMetrics private final ReentrantLock consumerFactoryLock = new ReentrantLock(); + private final AtomicBoolean running = new AtomicBoolean(); + public KafkaBinderMetrics(KafkaMessageChannelBinder binder, KafkaBinderConfigurationProperties binderConfigurationProperties, ConsumerFactory defaultConsumerFactory, @@ -125,14 +128,14 @@ public void setTimeout(int timeout) { @Override public void bindTo(MeterRegistry registry) { - /** + /* * We can't just replace one scheduler with another. * Before and even after the old one is gathered by GC, it's threads still exist, consume memory and CPU resources to switch contexts. * Theoretically, as a result of processing n topics, there will be about (1+n)*n/2 threads simultaneously at the same time. */ if (this.scheduler != null) { LOG.info("Try to shutdown the old scheduler with " + ((ScheduledThreadPoolExecutor) scheduler).getPoolSize() + " threads"); - this.scheduler.shutdown(); + this.scheduler.shutdownNow(); } this.scheduler = Executors.newScheduledThreadPool(this.binder.getTopicsInUse().size()); @@ -278,10 +281,50 @@ public void onApplicationEvent(BindingCreatedEvent event) { } @Override - public void close() throws Exception { - if (this.meterRegistry != null) { - this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(this.meterRegistry::remove); + public void close() { + if (this.scheduler != null) { + this.consumerFactoryLock.lock(); + try { + if (this.meterRegistry != null) { + this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(this.meterRegistry::remove); + } + this.scheduler.shutdownNow(); + try { + this.scheduler.awaitTermination( + binderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval().toSeconds(), + TimeUnit.SECONDS); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + ReflectionUtils.rethrowRuntimeException(ex); + } + } + finally { + this.scheduler = null; + this.metadataConsumers.values().forEach(Consumer::close); + this.metadataConsumers.clear(); + this.consumerFactoryLock.unlock(); + } } - Optional.ofNullable(scheduler).ifPresent(ExecutorService::shutdown); } + + @Override + public void start() { + this.running.set(true); + // Nothing else to do here. The 'bindTo()' is called from the 'onApplicationEvent()', + // which, in turn, is emitted from the 'AbstractBindingLifecycle.start()' logic. + } + + @Override + public void stop() { + if (this.running.compareAndSet(true, false)) { + close(); + } + } + + @Override + public boolean isRunning() { + return this.running.get(); + } + } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java index 4559a25f43..2828bbaeaf 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,6 +57,7 @@ * @author Tomek Szmytka * @author Nico Heller * @author Kurt Hong + * @author Artem Bilan */ class KafkaBinderMetricsTest { @@ -346,10 +347,11 @@ public void usesBeginningOffsetIfNoCommittedOffsetFound() { } @Test - public void shouldShutdownSchedulerOnClose() throws Exception { + public void shouldShutdownSchedulerOnClose() { metrics.bindTo(meterRegistry); + assertThat(metrics.scheduler).isNotNull(); metrics.close(); - assertThat(metrics.scheduler.isShutdown()).isTrue(); + assertThat(metrics.scheduler).isNull(); } @Test From f259c557045c91abf813214f91a089d0a885a825 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 7 Jan 2025 14:22:23 -0500 Subject: [PATCH 2/2] * Implement `SmartLifecycle` contract on the `DefaultBinderFactory`. When CRaC checkpoint is requested, the binder contexts must be stopped as well, which includes all their managed beans, respectively. --- .../stream/binder/DefaultBinderFactory.java | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java index c33b4abba7..6a797914d0 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2023 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; @@ -47,6 +48,7 @@ import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.SmartLifecycle; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.convert.converter.Converter; @@ -78,7 +80,7 @@ * @author Byungjun You * @author Omer Celik */ -public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware { +public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware, SmartLifecycle { protected final Log logger = LogFactory.getLog(getClass()); @@ -94,6 +96,8 @@ public class DefaultBinderFactory implements BinderFactory, DisposableBean, Appl private final BinderCustomizer binderCustomizer; + private final AtomicBoolean running = new AtomicBoolean(); + private volatile ConfigurableApplicationContext context; private Collection listeners; @@ -144,6 +148,27 @@ public void destroy() { this.defaultBinderForBindingTargetType.clear(); } + @Override + public void start() { + // This is essentially used when CRaC checkpoint is restored + if (this.running.compareAndSet(false, true)) { + this.binderInstanceCache.values().stream().map(Entry::getValue).forEach(ConfigurableApplicationContext::start); + } + } + + @Override + public void stop() { + // Makes sense for CRaC checkpoint + if (this.running.compareAndSet(true, false)) { + this.binderInstanceCache.values().stream().map(Entry::getValue).forEach(ConfigurableApplicationContext::stop); + } + } + + @Override + public boolean isRunning() { + return this.running.get(); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public Binder getBinder(String name, Class bindingTargetType) {