diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolverBuilder.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolverBuilder.java index fb903322..62ec7479 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolverBuilder.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolverBuilder.java @@ -12,7 +12,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.logging.Level; import java.util.stream.Stream; import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; @@ -45,35 +44,44 @@ public static HandlerResolver buildResolver(String domain, .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll); - final ConcurrentMap> eventsToBind = getEventsToBind(domain, registries); + final ConcurrentMap> eventsToBind = getEventsToBind(domain, + registries); - final ConcurrentMap> eventHandlers = getEventHandlersWithDynamics(domain, registries); + final ConcurrentMap> eventHandlers = + getEventHandlersWithDynamics(domain, registries); - return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener, commandHandlers) { + return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener, + commandHandlers) { @Override @SuppressWarnings("unchecked") public RegisteredCommandHandler getCommandHandler(String path) { final RegisteredCommandHandler handler = super.getCommandHandler(path); - return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class); + return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, + Object.class); } }; } final ConcurrentMap> eventsToBind = getEventsToBind(domain, registries); - final ConcurrentMap> eventHandlers = getEventHandlersWithDynamics(domain, registries); + final ConcurrentMap> eventHandlers = + getEventHandlersWithDynamics(domain, registries); - return new HandlerResolver(new ConcurrentHashMap<>(), eventHandlers, eventsToBind, new ConcurrentHashMap<>(), new ConcurrentHashMap<>()) { + return new HandlerResolver(new ConcurrentHashMap<>(), eventHandlers, eventsToBind, new ConcurrentHashMap<>(), + new ConcurrentHashMap<>()) { @Override @SuppressWarnings("unchecked") public RegisteredCommandHandler getCommandHandler(String path) { final RegisteredCommandHandler handler = super.getCommandHandler(path); - return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class); + return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, + Object.class); } }; } - private static ConcurrentMap> getEventHandlersWithDynamics(String domain, Map registries) { + private static ConcurrentMap> getEventHandlersWithDynamics(String domain, + Map registries) { // event handlers and dynamic handlers return registries .values().stream() @@ -81,7 +89,6 @@ public RegisteredCommandHandler getCommandHandler(String path) { if (r.getDomainEventListeners().containsKey(domain)) { return Stream.concat(r.getDomainEventListeners().get(domain).stream(), getDynamics(domain, r)); } - log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it"); return Stream.empty(); }) .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), @@ -95,14 +102,14 @@ public RegisteredCommandHandler getCommandHandler(String path) { return Stream.of(); } - private static ConcurrentMap> getEventsToBind(String domain, Map registries) { + private static ConcurrentMap> getEventsToBind(String domain, Map registries) { return registries .values().stream() .flatMap(r -> { if (r.getDomainEventListeners().containsKey(domain)) { return r.getDomainEventListeners().get(domain).stream(); } - log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it"); return Stream.empty(); }) .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/LoggerSubscriber.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/LoggerSubscriber.java index f7511312..0d2aff9c 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/LoggerSubscriber.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/LoggerSubscriber.java @@ -14,7 +14,7 @@ public class LoggerSubscriber extends BaseSubscriber { private final String flowName; private static final String ON_COMPLETE_MSG = "%s: ##On Complete Hook!!"; - private static final String ON_ERROR_MSG = "%s: ##On Error Hook!!"; + private static final String ON_ERROR_MSG = "%s: ##On Error Hook!! %s"; private static final String ON_CANCEL_MSG = "%s: ##On Cancel Hook!!"; private static final String ON_FINALLY_MSG = "%s: ##On Finally Hook! Signal type: %s"; @@ -29,7 +29,7 @@ protected void hookOnComplete() { @Override protected void hookOnError(Throwable throwable) { - log.log(Level.SEVERE, format(ON_ERROR_MSG), throwable); + log.log(Level.SEVERE, format(ON_ERROR_MSG, throwable.getMessage()), throwable); } @Override diff --git a/build.gradle b/build.gradle index 732ba3fb..8ebc5ddf 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ plugins { id 'org.sonarqube' version '6.0.1.5171' id 'org.springframework.boot' version '3.4.1' apply false id 'io.github.gradle-nexus.publish-plugin' version '2.0.0' - id 'co.com.bancolombia.cleanArchitecture' version '3.20.7' + id 'co.com.bancolombia.cleanArchitecture' version '3.20.8' } repositories { diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index df97d72b..e2847c82 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/main.gradle b/main.gradle index 86094ef5..5796af1d 100644 --- a/main.gradle +++ b/main.gradle @@ -176,5 +176,5 @@ tasks.register('generateMergedReport', JacocoReport) { } tasks.named('wrapper') { - gradleVersion = '8.11' + gradleVersion = '8.11.1' } \ No newline at end of file diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/broker/BrokerProvider.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/broker/BrokerProvider.java index 539f7330..771b9e88 100644 --- a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/broker/BrokerProvider.java +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/broker/BrokerProvider.java @@ -13,7 +13,7 @@ public interface BrokerProvider { DomainEventBus getDomainBus(); - DirectAsyncGateway getDirectAsyncGateway(HandlerResolver resolver); + DirectAsyncGateway getDirectAsyncGateway(); void listenDomainEvents(HandlerResolver resolver); @@ -23,7 +23,7 @@ public interface BrokerProvider { void listenQueries(HandlerResolver resolver); - void listenReplies(HandlerResolver resolver); + void listenReplies(); Mono healthCheck(); } diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsConfig.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsConfig.java index 1f7c1986..1b612fdc 100644 --- a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsConfig.java +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsConfig.java @@ -1,19 +1,13 @@ package org.reactivecommons.async.starter.config; +import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import lombok.RequiredArgsConstructor; import lombok.extern.java.Log; -import org.reactivecommons.async.api.DefaultCommandHandler; -import org.reactivecommons.async.api.DefaultQueryHandler; -import org.reactivecommons.async.api.HandlerRegistry; -import org.reactivecommons.async.commons.HandlerResolver; -import org.reactivecommons.async.commons.HandlerResolverBuilder; import org.reactivecommons.async.commons.config.BrokerConfig; import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier; import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier; -import org.reactivecommons.async.commons.ext.CustomReporter; -import org.reactivecommons.async.commons.ext.DefaultCustomReporter; import org.reactivecommons.async.commons.reply.ReactiveReplyRouter; import org.reactivecommons.async.starter.broker.BrokerProvider; import org.reactivecommons.async.starter.broker.BrokerProviderFactory; @@ -27,7 +21,6 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import reactor.core.publisher.Mono; import java.util.Map; @@ -66,24 +59,6 @@ public ConnectionManager buildConnectionManager(ApplicationContext context) { return connectionManager; } - @Bean - @SuppressWarnings({"rawtypes", "unchecked"}) - public DomainHandlers buildHandlers(ApplicationContext context, - HandlerRegistry primaryRegistry, DefaultCommandHandler commandHandler) { - DomainHandlers handlers = new DomainHandlers(); - final Map registries = context.getBeansOfType(HandlerRegistry.class); - if (!registries.containsValue(primaryRegistry)) { - registries.put("primaryHandlerRegistry", primaryRegistry); - } - final Map props = context.getBeansOfType(GenericAsyncPropsDomain.class); - props.forEach((beanName, properties) -> properties.forEach((domain, asyncProps) -> { - String domainName = (String) domain; - HandlerResolver resolver = HandlerResolverBuilder.buildResolver(domainName, registries, commandHandler); - handlers.add(domainName, resolver); - })); - return handlers; - } - @Bean @ConditionalOnMissingBean public BrokerConfig brokerConfig() { @@ -98,29 +73,8 @@ public ObjectMapperSupplier objectMapperSupplier() { @Bean @ConditionalOnMissingBean - public CustomReporter reactiveCommonsCustomErrorReporter() { - return new DefaultCustomReporter(); - } - - @Bean - @ConditionalOnMissingBean - @SuppressWarnings("rawtypes") - public DefaultQueryHandler defaultHandler() { - return (DefaultQueryHandler) command -> - Mono.error(new RuntimeException("No Handler Registered")); - } - - @Bean - @ConditionalOnMissingBean - @SuppressWarnings("rawtypes") - public DefaultCommandHandler defaultCommandHandler() { - return message -> Mono.error(new RuntimeException("No Handler Registered")); - } - - @Bean - @ConditionalOnMissingBean - public HandlerRegistry defaultHandlerRegistry() { - return HandlerRegistry.register(); + public ObjectMapper defaultReactiveCommonsObjectMapper(ObjectMapperSupplier supplier) { + return supplier.get(); } @Bean diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsListenersConfig.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsListenersConfig.java new file mode 100644 index 00000000..96bdc138 --- /dev/null +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsListenersConfig.java @@ -0,0 +1,70 @@ +package org.reactivecommons.async.starter.config; + +import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; +import org.reactivecommons.async.api.DefaultCommandHandler; +import org.reactivecommons.async.api.DefaultQueryHandler; +import org.reactivecommons.async.api.HandlerRegistry; +import org.reactivecommons.async.commons.HandlerResolver; +import org.reactivecommons.async.commons.HandlerResolverBuilder; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.commons.ext.DefaultCustomReporter; +import org.reactivecommons.async.starter.props.GenericAsyncPropsDomain; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.core.publisher.Mono; + +import java.util.Map; + +@Log +@Configuration +@RequiredArgsConstructor +public class ReactiveCommonsListenersConfig { + + @Bean + @SuppressWarnings({"rawtypes", "unchecked"}) + public DomainHandlers buildHandlers(ApplicationContext context, + HandlerRegistry primaryRegistry, DefaultCommandHandler commandHandler) { + DomainHandlers handlers = new DomainHandlers(); + final Map registries = context.getBeansOfType(HandlerRegistry.class); + if (!registries.containsValue(primaryRegistry)) { + registries.put("primaryHandlerRegistry", primaryRegistry); + } + final Map props = context.getBeansOfType(GenericAsyncPropsDomain.class); + props.forEach((beanName, properties) -> properties.forEach((domain, asyncProps) -> { + String domainName = (String) domain; + HandlerResolver resolver = HandlerResolverBuilder.buildResolver(domainName, registries, commandHandler); + handlers.add(domainName, resolver); + })); + return handlers; + } + + @Bean + @ConditionalOnMissingBean + public CustomReporter reactiveCommonsCustomErrorReporter() { + return new DefaultCustomReporter(); + } + + @Bean + @ConditionalOnMissingBean + @SuppressWarnings("rawtypes") + public DefaultQueryHandler defaultHandler() { + return (DefaultQueryHandler) command -> + Mono.error(new RuntimeException("No Handler Registered")); + } + + @Bean + @ConditionalOnMissingBean + @SuppressWarnings("rawtypes") + public DefaultCommandHandler defaultCommandHandler() { + return message -> Mono.error(new RuntimeException("No Handler Registered")); + } + + @Bean + @ConditionalOnMissingBean + public HandlerRegistry defaultHandlerRegistry() { + return HandlerRegistry.register(); + } +} diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/CommandsListenerConfig.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/CommandsListenerConfig.java index 7690d131..80f1dbc6 100644 --- a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/CommandsListenerConfig.java +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/CommandsListenerConfig.java @@ -2,15 +2,16 @@ import org.reactivecommons.async.commons.HandlerResolver; -import org.reactivecommons.async.starter.config.ConnectionManager; import org.reactivecommons.async.starter.broker.BrokerProvider; +import org.reactivecommons.async.starter.config.ConnectionManager; import org.reactivecommons.async.starter.config.DomainHandlers; import org.reactivecommons.async.starter.config.ReactiveCommonsConfig; +import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @Configuration -@Import(ReactiveCommonsConfig.class) +@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class}) public class CommandsListenerConfig extends AbstractListenerConfig { public CommandsListenerConfig(ConnectionManager manager, DomainHandlers handlers) { diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/EventsListenerConfig.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/EventsListenerConfig.java index be7d075e..6e465f84 100644 --- a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/EventsListenerConfig.java +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/EventsListenerConfig.java @@ -6,11 +6,12 @@ import org.reactivecommons.async.starter.broker.BrokerProvider; import org.reactivecommons.async.starter.config.DomainHandlers; import org.reactivecommons.async.starter.config.ReactiveCommonsConfig; +import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @Configuration -@Import(ReactiveCommonsConfig.class) +@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class}) public class EventsListenerConfig extends AbstractListenerConfig { public EventsListenerConfig(ConnectionManager manager, DomainHandlers handlers) { diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/NotificationEventsListenerConfig.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/NotificationEventsListenerConfig.java index 647f7994..cae527fe 100644 --- a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/NotificationEventsListenerConfig.java +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/NotificationEventsListenerConfig.java @@ -6,11 +6,12 @@ import org.reactivecommons.async.starter.broker.BrokerProvider; import org.reactivecommons.async.starter.config.DomainHandlers; import org.reactivecommons.async.starter.config.ReactiveCommonsConfig; +import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @Configuration -@Import(ReactiveCommonsConfig.class) +@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class}) public class NotificationEventsListenerConfig extends AbstractListenerConfig { public NotificationEventsListenerConfig(ConnectionManager manager, DomainHandlers handlers) { diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/QueriesListenerConfig.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/QueriesListenerConfig.java index 9710c770..35ed0f21 100644 --- a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/QueriesListenerConfig.java +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/QueriesListenerConfig.java @@ -2,15 +2,16 @@ import org.reactivecommons.async.commons.HandlerResolver; -import org.reactivecommons.async.starter.config.ConnectionManager; import org.reactivecommons.async.starter.broker.BrokerProvider; +import org.reactivecommons.async.starter.config.ConnectionManager; import org.reactivecommons.async.starter.config.DomainHandlers; import org.reactivecommons.async.starter.config.ReactiveCommonsConfig; +import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @Configuration -@Import(ReactiveCommonsConfig.class) +@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class}) public class QueriesListenerConfig extends AbstractListenerConfig { public QueriesListenerConfig(ConnectionManager manager, DomainHandlers handlers) { diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/senders/DirectAsyncGatewayConfig.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/senders/DirectAsyncGatewayConfig.java index df6efea3..e1654baf 100644 --- a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/senders/DirectAsyncGatewayConfig.java +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/senders/DirectAsyncGatewayConfig.java @@ -4,7 +4,6 @@ import lombok.extern.java.Log; import org.reactivecommons.async.api.DirectAsyncGateway; import org.reactivecommons.async.starter.config.ConnectionManager; -import org.reactivecommons.async.starter.config.DomainHandlers; import org.reactivecommons.async.starter.config.ReactiveCommonsConfig; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -20,10 +19,10 @@ public class DirectAsyncGatewayConfig { @Bean - public DirectAsyncGateway genericDirectAsyncGateway(ConnectionManager manager, DomainHandlers handlers) { + public DirectAsyncGateway genericDirectAsyncGateway(ConnectionManager manager) { ConcurrentMap directAsyncGateways = new ConcurrentHashMap<>(); manager.forDomain((domain, provider) -> directAsyncGateways.put(domain, - provider.getDirectAsyncGateway(handlers.get(domain)))); + provider.getDirectAsyncGateway())); return new GenericDirectAsyncGateway(directAsyncGateways); } } diff --git a/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/mybroker/MyBrokerProvider.java b/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/mybroker/MyBrokerProvider.java index f9c0365a..57f27e66 100644 --- a/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/mybroker/MyBrokerProvider.java +++ b/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/mybroker/MyBrokerProvider.java @@ -27,7 +27,7 @@ public DomainEventBus getDomainBus() { } @Override - public DirectAsyncGateway getDirectAsyncGateway(HandlerResolver resolver) { + public DirectAsyncGateway getDirectAsyncGateway() { return null; } @@ -52,7 +52,7 @@ public void listenQueries(HandlerResolver resolver) { } @Override - public void listenReplies(HandlerResolver resolver) { + public void listenReplies() { // for testing purposes } diff --git a/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/senders/DirectAsyncGatewayConfigTest.java b/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/senders/DirectAsyncGatewayConfigTest.java index 6a02949c..506f619d 100644 --- a/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/senders/DirectAsyncGatewayConfigTest.java +++ b/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/senders/DirectAsyncGatewayConfigTest.java @@ -9,10 +9,8 @@ import org.reactivecommons.async.commons.HandlerResolver; import org.reactivecommons.async.starter.broker.BrokerProvider; import org.reactivecommons.async.starter.config.ConnectionManager; -import org.reactivecommons.async.starter.config.DomainHandlers; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -40,14 +38,11 @@ void setUp() { @Test void shouldCreateAllDomainEventBuses() { // Arrange - when(brokerProvider.getDirectAsyncGateway(any())).thenReturn(domainEventBus); - DomainHandlers handlers = new DomainHandlers(); - handlers.add("domain", resolver); - handlers.add("domain2", resolver); + when(brokerProvider.getDirectAsyncGateway()).thenReturn(domainEventBus); // Act - DirectAsyncGateway genericDomainEventBus = directAsyncGatewayConfig.genericDirectAsyncGateway(manager, handlers); + DirectAsyncGateway genericDomainEventBus = directAsyncGatewayConfig.genericDirectAsyncGateway(manager); // Assert assertNotNull(genericDomainEventBus); - verify(brokerProvider, times(2)).getDirectAsyncGateway(resolver); + verify(brokerProvider, times(2)).getDirectAsyncGateway(); } } diff --git a/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/KafkaBrokerProvider.java b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/KafkaBrokerProvider.java index ea3b6732..f5d04458 100644 --- a/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/KafkaBrokerProvider.java +++ b/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/KafkaBrokerProvider.java @@ -46,7 +46,7 @@ public DomainEventBus getDomainBus() { } @Override - public DirectAsyncGateway getDirectAsyncGateway(HandlerResolver resolver) { + public DirectAsyncGateway getDirectAsyncGateway() { return new KafkaDirectAsyncGateway(); } @@ -95,7 +95,7 @@ public void listenQueries(HandlerResolver resolver) { } @Override - public void listenReplies(HandlerResolver resolver) { + public void listenReplies() { // May be implemented in the future } diff --git a/starters/async-kafka-starter/src/test/java/org/reactivecommons/async/kafka/KafkaBrokerProviderTest.java b/starters/async-kafka-starter/src/test/java/org/reactivecommons/async/kafka/KafkaBrokerProviderTest.java index 0db6bd4c..0f6f6680 100644 --- a/starters/async-kafka-starter/src/test/java/org/reactivecommons/async/kafka/KafkaBrokerProviderTest.java +++ b/starters/async-kafka-starter/src/test/java/org/reactivecommons/async/kafka/KafkaBrokerProviderTest.java @@ -99,7 +99,7 @@ void shouldCreateDomainEventBus() { @Test void shouldCreateDirectAsyncGateway() { // Act - DirectAsyncGateway domainBus = brokerProvider.getDirectAsyncGateway(handlerResolver); + DirectAsyncGateway domainBus = brokerProvider.getDirectAsyncGateway(); // Assert assertThat(domainBus).isExactlyInstanceOf(KafkaDirectAsyncGateway.class); } diff --git a/starters/async-kafka-starter/src/test/java/org/reactivecommons/async/starter/impl/rabbit/KafkaConfigTest.java b/starters/async-kafka-starter/src/test/java/org/reactivecommons/async/starter/impl/rabbit/KafkaConfigTest.java index d9fcac5b..9fb2ddb4 100644 --- a/starters/async-kafka-starter/src/test/java/org/reactivecommons/async/starter/impl/rabbit/KafkaConfigTest.java +++ b/starters/async-kafka-starter/src/test/java/org/reactivecommons/async/starter/impl/rabbit/KafkaConfigTest.java @@ -7,6 +7,7 @@ import org.reactivecommons.async.kafka.converters.json.KafkaJacksonMessageConverter; import org.reactivecommons.async.starter.config.ConnectionManager; import org.reactivecommons.async.starter.config.ReactiveCommonsConfig; +import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig; import org.reactivecommons.async.starter.impl.kafka.RCKafkaConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -17,7 +18,9 @@ RCKafkaConfig.class, AsyncKafkaPropsDomain.class, KafkaBrokerProviderFactory.class, - ReactiveCommonsConfig.class}) + ReactiveCommonsConfig.class, + ReactiveCommonsListenersConfig.class +}) class KafkaConfigTest { @Autowired private KafkaJacksonMessageConverter converter; diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java index b116ced6..3abf1e90 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java @@ -53,12 +53,12 @@ public DomainEventBus getDomainBus() { } @Override - public DirectAsyncGateway getDirectAsyncGateway(HandlerResolver resolver) { + public DirectAsyncGateway getDirectAsyncGateway() { String exchangeName = props.getBrokerConfigProps().getDirectMessagesExchangeName(); if (props.getCreateTopology()) { sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("direct")).subscribe(); } - listenReplies(resolver); + listenReplies(); return new RabbitDirectAsyncGateway(config, router, sender, exchangeName, converter, meterRegistry); } @@ -141,7 +141,7 @@ public void listenQueries(HandlerResolver resolver) { } @Override - public void listenReplies(HandlerResolver resolver) { + public void listenReplies() { if (props.isListenReplies()) { final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java index c97c51f5..18be2b17 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java @@ -150,8 +150,8 @@ private static Mono createConnectionMono(ConnectionFactory factory, String connectionType) { return Mono.fromCallable(() -> factory.newConnection(connectionPrefix + " " + connectionType)) .doOnError(err -> - log.log(Level.SEVERE, "Error creating connection to RabbitMQ Broker in host" + - factory.getHost() + ". Starting retry process...", err) + log.log(Level.SEVERE, "Error creating connection to RabbitMQ Broker in host '" + + factory.getHost() + "'. Starting retry process...", err) ) .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(START_INTERVAL)) .maxBackoff(Duration.ofMillis(MAX_BACKOFF_INTERVAL))) diff --git a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java index cb25344f..5ea55767 100644 --- a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java +++ b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java @@ -112,7 +112,7 @@ void shouldCreateDirectAsyncGateway() { when(listener.getReceiver()).thenReturn(receiver); when(receiver.consumeAutoAck(any(String.class))).thenReturn(Flux.never()); // Act - DirectAsyncGateway domainBus = brokerProvider.getDirectAsyncGateway(handlerResolver); + DirectAsyncGateway domainBus = brokerProvider.getDirectAsyncGateway(); // Assert assertThat(domainBus).isExactlyInstanceOf(RabbitDirectAsyncGateway.class); } diff --git a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/starter/impl/rabbit/RabbitMQConfigTest.java b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/starter/impl/rabbit/RabbitMQConfigTest.java index 240fe4b7..f811c68d 100644 --- a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/starter/impl/rabbit/RabbitMQConfigTest.java +++ b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/starter/impl/rabbit/RabbitMQConfigTest.java @@ -6,6 +6,7 @@ import org.reactivecommons.async.rabbit.converters.json.RabbitJacksonMessageConverter; import org.reactivecommons.async.starter.config.ConnectionManager; import org.reactivecommons.async.starter.config.ReactiveCommonsConfig; +import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -15,7 +16,9 @@ RabbitMQConfig.class, AsyncPropsDomain.class, RabbitMQBrokerProviderFactory.class, - ReactiveCommonsConfig.class}) + ReactiveCommonsConfig.class, + ReactiveCommonsListenersConfig.class +}) class RabbitMQConfigTest { @Autowired private RabbitJacksonMessageConverter converter;