Skip to content

Commit 6cb0767

Browse files
authored
feat: add RabbitPropsCustomizer and KafkaPropsCustomizer for flexible multi-domain configuration (#169)
* feat: enhance AsyncPropsDomain with customizer support and add tests * feat: add validation for customizer configuration in AsyncKafkaPropsDomain and AsyncPropsDomain * feat: add RabbitPropsCustomizer and KafkaPropsCustomizer for flexible multi-domain configuration
1 parent 608c54a commit 6cb0767

26 files changed

Lines changed: 1609 additions & 837 deletions

File tree

async/async-commons/async-commons.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ dependencies {
1010

1111
compileOnly 'io.projectreactor:reactor-core'
1212
api 'tools.jackson.core:jackson-databind'
13-
implementation 'commons-io:commons-io:2.21.0'
14-
implementation 'io.cloudevents:cloudevents-core:4.0.1'
13+
implementation 'commons-io:commons-io:2.22.0'
14+
implementation 'io.cloudevents:cloudevents-core:4.0.2'
1515

1616
testImplementation 'io.projectreactor:reactor-test'
1717
}

async/async-kafka/async-kafka.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ dependencies {
99
api project(':async-commons')
1010
api project(':cloudevents-json-jackson')
1111
api 'io.projectreactor.kafka:reactor-kafka:1.3.25'
12-
implementation 'io.cloudevents:cloudevents-core:4.0.1'
12+
implementation 'io.cloudevents:cloudevents-core:4.0.2'
1313
}

async/async-rabbit/async-rabbit.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ dependencies {
1515
api 'com.rabbitmq:amqp-client'
1616
api 'tools.jackson.core:jackson-databind'
1717

18-
implementation 'io.cloudevents:cloudevents-core:4.0.1'
18+
implementation 'io.cloudevents:cloudevents-core:4.0.2'
1919
testImplementation 'io.projectreactor:reactor-test'
2020
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@
99
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
1010
import org.reactivecommons.async.rabbit.InstanceIdentifier;
1111
import org.reactivecommons.async.rabbit.RabbitMessage;
12-
import reactor.rabbitmq.AcknowledgableDelivery;
13-
import reactor.rabbitmq.ConsumeOptions;
1412
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
1513
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
16-
import reactor.rabbitmq.Receiver;
1714
import reactor.core.publisher.Flux;
1815
import reactor.core.publisher.Mono;
1916
import reactor.core.scheduler.Scheduler;
2017
import reactor.core.scheduler.Schedulers;
18+
import reactor.rabbitmq.AcknowledgableDelivery;
19+
import reactor.rabbitmq.ConsumeOptions;
20+
import reactor.rabbitmq.Receiver;
2121
import reactor.util.retry.Retry;
2222

2323
import java.time.Duration;
@@ -93,9 +93,18 @@ public void startListener() {
9393
log.log(Level.INFO, "ATTENTION! Using infinite fast retries as Retry Strategy");
9494
}
9595

96-
ConsumeOptions consumeOptions = new ConsumeOptions();
97-
consumeOptions.qos(messageListener.prefetchCount());
98-
consumeOptions.consumerTag(InstanceIdentifier.getInstanceId(getKind()));
96+
ConsumeOptions consumeOptions = new ConsumeOptions()
97+
.qos(messageListener.prefetchCount())
98+
.consumerTag(InstanceIdentifier.getInstanceId(getKind()))
99+
.channelCallback(channel -> channel.addShutdownListener(cause -> {
100+
log.log(Level.WARNING, cause, () -> "Channel shutdown detected in queue " + queueName
101+
+ " channel open: " + channel.isOpen() +
102+
" connection open: " + channel.getConnection().isOpen());
103+
if (channel.getConnection().isOpen() && !channel.isOpen()) {
104+
log.warning("Recovering listener for queue: " + queueName);
105+
onTerminate();
106+
}
107+
}));
99108

100109
if (createTopology) {
101110
this.messageFlux = setUpBindings(messageListener.topologyCreator())

async/cloudevents-json-jackson/cloudevents-json-jackson.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ ext {
55

66
dependencies {
77
api 'tools.jackson.core:jackson-databind'
8-
implementation 'io.cloudevents:cloudevents-api:4.0.1'
9-
implementation 'io.cloudevents:cloudevents-core:4.0.1'
8+
implementation 'io.cloudevents:cloudevents-api:4.0.2'
9+
implementation 'io.cloudevents:cloudevents-core:4.0.2'
1010
}

build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ buildscript {
1212

1313
plugins {
1414
id 'jacoco'
15-
id 'org.sonarqube' version '7.2.2.6593'
16-
id 'org.springframework.boot' version '4.0.4' apply false
15+
id 'org.sonarqube' version '7.3.0.8198'
16+
id 'org.springframework.boot' version '4.0.6' apply false
1717
id 'io.github.gradle-nexus.publish-plugin' version '2.0.0'
18-
id 'co.com.bancolombia.cleanArchitecture' version '4.3.0'
18+
id 'co.com.bancolombia.cleanArchitecture' version '4.4.1'
1919
}
2020

2121
repositories {

0 commit comments

Comments
 (0)