Skip to content

Commit 92d66bb

Browse files
committed
GH-3173 Fix merge mechanism for Kafka bootstrap-server properties
Resolves #3173
1 parent 44e8db0 commit 92d66bb

2 files changed

Lines changed: 40 additions & 2 deletions

File tree

binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.springframework.core.io.Resource;
4747
import org.springframework.expression.Expression;
4848
import org.springframework.util.Assert;
49+
import org.springframework.util.CollectionUtils;
4950
import org.springframework.util.ObjectUtils;
5051
import org.springframework.util.StringUtils;
5152

@@ -401,7 +402,7 @@ public void setProducerProperties(Map<String, String> producerProperties) {
401402
*/
402403
public Map<String, Object> mergedConsumerConfiguration() {
403404
Map<String, Object> consumerConfiguration = new HashMap<>(this.kafkaProperties.buildConsumerProperties());
404-
if (this.kafkaConnectionDetails != null) {
405+
if (!consumerConfiguration.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) && this.kafkaConnectionDetails != null) {
405406
consumerConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConnectionDetails.getConsumer().getBootstrapServers());
406407
}
407408
// Copy configured binder properties that apply to consumers
@@ -430,7 +431,7 @@ public Map<String, Object> mergedConsumerConfiguration() {
430431
*/
431432
public Map<String, Object> mergedProducerConfiguration() {
432433
Map<String, Object> producerConfiguration = new HashMap<>(this.kafkaProperties.buildProducerProperties());
433-
if (this.kafkaConnectionDetails != null) {
434+
if (!producerConfiguration.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) && this.kafkaConnectionDetails != null) {
434435
producerConfiguration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConnectionDetails.getProducer().getBootstrapServers());
435436
}
436437
// Copy configured binder properties that apply to producers

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderBootstrapTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package org.springframework.cloud.stream.binder.kafka.bootstrap;
1818

19+
import java.beans.BeanProperty;
20+
import java.util.function.Function;
21+
1922
import org.junit.jupiter.api.BeforeAll;
2023
import org.junit.jupiter.params.ParameterizedTest;
2124
import org.junit.jupiter.params.provider.ValueSource;
@@ -24,6 +27,7 @@
2427
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
2528
import org.springframework.boot.builder.SpringApplicationBuilder;
2629
import org.springframework.context.ConfigurableApplicationContext;
30+
import org.springframework.context.annotation.Bean;
2731
import org.springframework.context.annotation.Configuration;
2832
import org.springframework.kafka.test.EmbeddedKafkaBroker;
2933
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
@@ -35,6 +39,7 @@
3539
* @author Marius Bogoevici
3640
* @author Chris Bono
3741
* @author Soby Chacko
42+
* @author Oleg Zhurakousky
3843
*/
3944
@EmbeddedKafka(controlledShutdown = true)
4045
class KafkaBinderBootstrapTest {
@@ -78,6 +83,28 @@ void kafkaBinderWithCustomConfigCanStart(boolean excludeKafkaAutoConfig) {
7883

7984
}
8085

86+
@ParameterizedTest
87+
@ValueSource(booleans = { false, true })
88+
void validateDoNotOverrideBootstrap(boolean excludeKafkaAutoConfig) {
89+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(ApplicationConfiguration.class)
90+
.web(WebApplicationType.NONE).run(
91+
"--spring.kafka.bootstrap-servers=foo:8907", // something that would have fail
92+
"--spring.cloud.stream.bindings.uppercase-in-0.destination=inputTopic",
93+
"--spring.cloud.stream.bindings.uppercase-in-0.group=inputGroup",
94+
"--spring.cloud.stream.bindings.uppercase-in-0.binder=kafka1",
95+
"--spring.cloud.stream.bindings.uppercase-out-0.destination=outputTopic",
96+
"--spring.cloud.stream.bindings.uppercase-out-0.binder=kafka2",
97+
"--spring.cloud.stream.binders.kafka1.type=kafka",
98+
"--spring.cloud.stream.binders.kafka2.type=kafka",
99+
"--spring.cloud.stream.binders.kafka1.environment"
100+
+ ".spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(),
101+
"--spring.cloud.stream.binders.kafka2.environment"
102+
+ ".spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(),
103+
excludeKafkaAutoConfigParam(excludeKafkaAutoConfig))) { // @checkstyle:off
104+
} // @checkstyle:on
105+
106+
}
107+
81108
private String excludeKafkaAutoConfigParam(boolean excludeKafkaAutoConfig) {
82109
return excludeKafkaAutoConfig ?
83110
"--spring.autoconfigure.exclude=org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration" : "a=a";
@@ -89,4 +116,14 @@ static class SimpleApplication {
89116

90117
}
91118

119+
@EnableAutoConfiguration
120+
@Configuration
121+
static class ApplicationConfiguration {
122+
123+
@Bean
124+
public Function<String, String> uppercase() {
125+
return v -> v;
126+
}
127+
}
128+
92129
}

0 commit comments

Comments
 (0)