Skip to content

Commit b9a78ba

Browse files
ferblacaolegz
authored andcommitted
Add test to verify correct bean conditional resolution order in a multi-binder scenario
This change adds KafkaMultiBinderCustomConfigurationTests to verify that user configuration classes specified via spring.main.sources are properly loaded before binder configuration classes. This ensures that @ConditionalOnMissingBean annotations in the binder configurations correctly detect user-provided beans. The test confirms that when using KafkaBinderConfiguration with @ConditionalOnMissingBean(KafkaBinderMetrics.class), a custom KafkaBinderMetrics bean configured via spring.main.sources is correctly detected and used instead of the default implementation. Related to fix in gh-3114 Signed-off-by: ferblaca <fernandobc@ext.inditex.com> Resolves #3127
1 parent c9bdf21 commit b9a78ba

File tree

1 file changed

+145
-0
lines changed

1 file changed

+145
-0
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright 2023-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka.integration;
18+
19+
import java.lang.reflect.Field;
20+
import java.util.Map;
21+
22+
import io.micrometer.core.instrument.MeterRegistry;
23+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
28+
import org.springframework.boot.test.context.SpringBootTest;
29+
import org.springframework.cloud.stream.binder.Binder;
30+
import org.springframework.cloud.stream.binder.DefaultBinderFactory;
31+
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
32+
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
33+
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
34+
import org.springframework.context.ConfigurableApplicationContext;
35+
import org.springframework.context.annotation.Bean;
36+
import org.springframework.kafka.test.context.EmbeddedKafka;
37+
import org.springframework.test.annotation.DirtiesContext;
38+
import org.springframework.util.ReflectionUtils;
39+
40+
import static org.assertj.core.api.Assertions.assertThat;
41+
42+
/**
43+
* Integration tests to verify that custom configurations defined in spring.main.sources
44+
* are properly loaded before the default binder configuration in multi-binder scenarios.
45+
*
46+
* @author Fernando Blanch
47+
* @since 4.1.0
48+
*/
49+
@SpringBootTest(classes = KafkaMultiBinderCustomConfigurationTests.class,
50+
webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
51+
"spring.cloud.stream.defaultBinder=kafka1",
52+
"spring.cloud.stream.binders.kafka1.type=kafka",
53+
"spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}",
54+
"spring.cloud.stream.binders.kafka2.type=kafka",
55+
"spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}",
56+
"spring.cloud.stream.binders.kafka2.environment.spring.main.sources=" +
57+
"org.springframework.cloud.stream.binder.kafka.integration.KafkaMultiBinderCustomConfigurationTests$CustomConfiguration"
58+
})
59+
@DirtiesContext
60+
@EnableAutoConfiguration
61+
@EmbeddedKafka(controlledShutdown = true)
62+
class KafkaMultiBinderCustomConfigurationTests {
63+
64+
@Autowired
65+
private DefaultBinderFactory binderFactory;
66+
67+
/**
68+
* Verifies that the custom user configuration is loaded from spring.main.sources
69+
*/
70+
@Test
71+
void binderKafka2UsesCustomConfigurationIsLoadedFromSpringMainSources() throws IllegalAccessException {
72+
// Force initialization of binders
73+
Binder<?, ?, ?> kafka2Binder = binderFactory.getBinder("kafka2", Object.class);
74+
assertThat(kafka2Binder).isNotNull();
75+
76+
// Get the kafka2 binder context
77+
ConfigurableApplicationContext kafka2Context = getBinderContext("kafka2");
78+
assertThat(kafka2Context).isNotNull();
79+
80+
// Verify that our custom bean is used instead of the default one
81+
KafkaBinderMetrics kafkaBinderMetrics = kafka2Context.getBean(KafkaBinderMetrics.class);
82+
assertThat(kafkaBinderMetrics).isInstanceOf(CustomKafkaBinderMetrics.class);
83+
}
84+
85+
/**
86+
* Verifies that the default configuration is used when no custom user configuration is provided
87+
*/
88+
@Test
89+
void binderKafka1UsesDefaultBeanFromKafkaBinderMetricsConfigurationWithMultiBinder() throws IllegalAccessException {
90+
// Force initialization of binders
91+
Binder<?, ?, ?> kafka1Binder = binderFactory.getBinder("kafka1", Object.class);
92+
assertThat(kafka1Binder).isNotNull();
93+
94+
ConfigurableApplicationContext kafka1Context = getBinderContext("kafka1");
95+
assertThat(kafka1Context).isNotNull();
96+
97+
// Verify that the metrics bean is from KafkaBinderMetricsConfigurationWithMultiBinder configuration
98+
// (not a CustomKafkaBinderMetrics instance)
99+
KafkaBinderMetrics kafka1BinderMetrics = kafka1Context.getBean(KafkaBinderMetrics.class);
100+
assertThat(kafka1BinderMetrics).isNotInstanceOf(CustomKafkaBinderMetrics.class);
101+
}
102+
103+
/**
104+
* Helper method to get the binder context from the binderInstanceCache field in DefaultBinderFactory
105+
*/
106+
private ConfigurableApplicationContext getBinderContext(String binderName) throws IllegalAccessException {
107+
Field binderInstanceCacheField = ReflectionUtils.findField(DefaultBinderFactory.class, "binderInstanceCache");
108+
assertThat(binderInstanceCacheField).isNotNull();
109+
ReflectionUtils.makeAccessible(binderInstanceCacheField);
110+
@SuppressWarnings("unchecked")
111+
Map<String, Map.Entry<Binder<?, ?, ?>, ConfigurableApplicationContext>> binderInstanceCache =
112+
(Map<String, Map.Entry<Binder<?, ?, ?>, ConfigurableApplicationContext>>) binderInstanceCacheField.get(this.binderFactory);
113+
return binderInstanceCache.get(binderName).getValue();
114+
}
115+
116+
/**
117+
* Custom configuration that provides a custom KafkaBinderMetrics
118+
*/
119+
static class CustomConfiguration {
120+
121+
@Bean
122+
MeterRegistry meterRegistry() {
123+
return new SimpleMeterRegistry();
124+
}
125+
126+
@Bean
127+
KafkaBinderMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
128+
KafkaBinderConfigurationProperties configurationProperties,
129+
MeterRegistry meterRegistry) {
130+
return new CustomKafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, meterRegistry);
131+
}
132+
133+
}
134+
135+
static class CustomKafkaBinderMetrics extends KafkaBinderMetrics {
136+
137+
CustomKafkaBinderMetrics(KafkaMessageChannelBinder binder,
138+
KafkaBinderConfigurationProperties binderConfigurationProperties,
139+
MeterRegistry meterRegistry) {
140+
super(binder, binderConfigurationProperties, null, meterRegistry);
141+
}
142+
143+
}
144+
145+
}

0 commit comments

Comments
 (0)