Skip to content

Commit 2d54d51

Browse files
committed
Simplified examples and added tests
1 parent 9280991 commit 2d54d51

File tree

9 files changed

+372
-75
lines changed

9 files changed

+372
-75
lines changed

reset-handler/README.md

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,28 @@
11
# Reset Handler
22
This demo is meant to show how to reset a tracking token.
33

4-
There are 2 ways of doing that, using Axon Server API or just Axon Framework.
4+
There are 3 ways of doing that, using Axon Server API, just Axon Framework or the Axon Server Connector.
5+
A more detailed description can be found in the [reference-guide](https://docs.axoniq.io/reference-guide/axon-server/administration/reset-event-processor-token).
56

6-
### Using Axon Server API
7+
### Using Axon Server REST API
78

89
When in a distributed environment, one can have several applications connected to Axon Server while sharing the same token store.
910
To be able to reset a token in this scenario, we have to ask Axon Server to pause every known instance of a given Processor Name to be able to reset it and start it back again.
1011

11-
> We recommend checking the [ServerEventProcessorRestController.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/server/ServerEventProcessorRestController.java) for more information.
12+
> We recommend checking the [RestEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java) for more information.
1213
1314
### Using Axon Framework
1415

1516
Axon Framework provides another easy way to do it using the `StreamingEventProcessor` methods, namely `shutDown`, `resetTokens` and `start`. When doing it through Axon Framework, the application instance doing the operation should be the one having the claim of the token.
1617

17-
> We recommend checking the [FrameworkEventProcessorRestController.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/framework/FrameworkEventProcessorRestController.java) for more information.
18-
18+
> We recommend checking the [FrameworkEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/FrameworkEventProcessorService.java) for more information.
19+
20+
## Using the Axon Server Connector
21+
The Axon Server Connector provides methods to pause and restart an event processor.
22+
This functionality can be combined to reset the event processor as shown in the other examples.
23+
24+
> We recommend checking the [ServerConnectorEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java) for more information.
25+
1926
### Running the application
2027
This is a Spring boot application, as such it can be ran as any other standard Spring Boot application. It has a simple `/event` endpoint where you can create new empty events. For resetting the token, it provides 2 reset endpoints:
2128
- `/server/reset/{processorName}`
@@ -27,6 +34,9 @@ Since Axon Server is a requirement for this sample, a `docker-compose` file is p
2734

2835
Also, if you are on Intellij, a `requests.http` file is provided to make it easy to call the endpoints.
2936

30-
Most of the logic for the Axon Server reset is on the [EventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/server/EventProcessorService.java) class and the added javadoc should be enough to explain what it does.
37+
Most of the logic for the Axon Server reset via REST is on the [RestEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java) class and the added javadoc should be enough to explain what it does.
38+
In the same way, details for the Axon Server reset via the Server Connector can be found in [ServerConnectorEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java).
3139

3240
For the Axon Framework version, we recommend checking the official [StreamingEventProcessor.java](https://github.com/AxonFramework/AxonFramework/blob/master/messaging/src/main/java/org/axonframework/eventhandling/StreamingEventProcessor.java) documentation.
41+
42+
A general introduction, regardless of the method used, can be found in the [reference-guide](https://docs.axoniq.io/reference-guide/axon-server/administration/reset-event-processor-token).

reset-handler/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,6 @@
2929
<groupId>org.springframework.boot</groupId>
3030
<artifactId>spring-boot-starter-webflux</artifactId>
3131
</dependency>
32-
<!-- <dependency>-->
33-
<!-- <groupId>org.axonframework</groupId>-->
34-
<!-- <artifactId>axon-server-connector</artifactId>-->
35-
<!-- <version>4.6.0-SNAPSHOT</version>-->
36-
<!-- </dependency>-->
3732
<dependency>
3833
<groupId>org.springframework.boot</groupId>
3934
<artifactId>spring-boot-starter-test</artifactId>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.axoniq.config;
2+
3+
import io.axoniq.axonserver.connector.AxonServerConnectionFactory;
4+
import io.axoniq.axonserver.connector.admin.AdminChannel;
5+
import org.springframework.beans.factory.annotation.Value;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.stereotype.Component;
8+
9+
@Component
10+
public class ConfigBasedAdminChannel {
11+
12+
private final String contextName;
13+
private final String componentName;
14+
15+
public ConfigBasedAdminChannel(@Value("${axon.axonserver.context}") String contextName,
16+
@Value("${axon.axonserver.component-name}") String componentName){
17+
this.contextName = contextName;
18+
this.componentName = componentName;
19+
}
20+
21+
@Bean
22+
public AdminChannel adminChannel() {
23+
return AxonServerConnectionFactory.forClient(componentName)
24+
.build()
25+
.connect(contextName)
26+
.adminChannel();
27+
}
28+
}

reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
package io.axoniq.service;
22

3-
import io.axoniq.axonserver.connector.AxonServerConnectionFactory;
43
import io.axoniq.axonserver.connector.ResultStreamPublisher;
54
import io.axoniq.axonserver.connector.admin.AdminChannel;
65
import io.axoniq.axonserver.grpc.admin.Result;
76
import org.axonframework.config.Configuration;
87
import org.axonframework.eventhandling.StreamingEventProcessor;
9-
import org.springframework.beans.factory.annotation.Value;
108
import org.springframework.stereotype.Service;
119
import reactor.core.publisher.Flux;
1210
import reactor.core.publisher.Mono;
1311
import reactor.util.retry.Retry;
1412

1513
import java.time.Duration;
16-
import java.util.function.Supplier;
1714

1815
/**
1916
* @author Sara Pellegrini
@@ -22,16 +19,12 @@
2219
@Service
2320
public class ServerConnectorEventProcessorService implements EventProcessorService {
2421

25-
private final Supplier<String> contextSupplier;
26-
private final Supplier<String> componentSupplier;
2722
private final Configuration configuration;
23+
private final AdminChannel adminChannel;
2824

29-
public ServerConnectorEventProcessorService(@Value("${axon.axonserver.context}") String context,
30-
@Value("${axon.axonserver.component-name}") String component,
31-
Configuration configuration) {
25+
public ServerConnectorEventProcessorService(Configuration configuration, AdminChannel adminChannel) {
3226
this.configuration = configuration;
33-
this.contextSupplier = () -> context;
34-
this.componentSupplier = () -> component;
27+
this.adminChannel = adminChannel;
3528
}
3629

3730
@Override
@@ -58,15 +51,15 @@ private Mono<Void> resetTokens(StreamingEventProcessor eventProcessor) {
5851
return Mono.fromRunnable(eventProcessor::resetTokens);
5952
}
6053

61-
private Mono<Void> start(String eventProcessorName, String tokenStoreIdentifier) {
62-
return Mono.fromFuture(() -> adminChannel().startEventProcessor(eventProcessorName, tokenStoreIdentifier))
54+
protected Mono<Void> start(String eventProcessorName, String tokenStoreIdentifier) {
55+
return Mono.fromFuture(() -> adminChannel.startEventProcessor(eventProcessorName, tokenStoreIdentifier))
6356
.filter(Result.SUCCESS::equals)
6457
.switchIfEmpty(awaitForStatus(eventProcessorName, tokenStoreIdentifier, true))
6558
.then();
6659
}
6760

68-
private Mono<Void> pause(String eventProcessorName, String tokenStoreIdentifier) {
69-
return Mono.fromFuture(() -> adminChannel().pauseEventProcessor(eventProcessorName, tokenStoreIdentifier))
61+
protected Mono<Void> pause(String eventProcessorName, String tokenStoreIdentifier) {
62+
return Mono.fromFuture(() -> adminChannel.pauseEventProcessor(eventProcessorName, tokenStoreIdentifier))
7063
.filter(Result.SUCCESS::equals)
7164
.switchIfEmpty(awaitForStatus(eventProcessorName, tokenStoreIdentifier, false))
7265
.then();
@@ -78,8 +71,8 @@ private Mono<Void> pause(String eventProcessorName, String tokenStoreIdentifier)
7871
For older clients, we use this method to poll the status of the event processors, to ensure they have started
7972
or stopped. In case you are using a client >= 4.6, this is no longer needed.
8073
*/
81-
private Mono<Result> awaitForStatus(String eventProcessorName, String tokenStoreIdentifier, boolean running) {
82-
return Flux.from(new ResultStreamPublisher<>(() -> adminChannel().eventProcessors()))
74+
protected Mono<Result> awaitForStatus(String eventProcessorName, String tokenStoreIdentifier, boolean running) {
75+
return Flux.from(new ResultStreamPublisher<>(adminChannel::eventProcessors))
8376
.filter(eventProcessor -> eventProcessor.getIdentifier().getProcessorName()
8477
.equals(eventProcessorName))
8578
.filter(eventProcessor -> eventProcessor.getIdentifier().getTokenStoreIdentifier()
@@ -93,12 +86,7 @@ private Mono<Result> awaitForStatus(String eventProcessorName, String tokenStore
9386
.thenReturn(Result.SUCCESS);
9487
}
9588

96-
private AdminChannel adminChannel() {
97-
AxonServerConnectionFactory connectionFactory = AxonServerConnectionFactory.forClient(componentSupplier.get())
98-
.build();
99-
return connectionFactory.connect(contextSupplier.get())
100-
.adminChannel();
101-
}
89+
10290

10391
private String tokenStoreId(String processorName) {
10492
StreamingEventProcessor eventProcessor = eventProcessor(processorName);
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package io.axoniq;
2+
3+
import io.axoniq.MyFakeProjection;
4+
import io.axoniq.service.FrameworkEventProcessorService;
5+
import io.axoniq.service.RestEventProcessorService;
6+
import io.axoniq.service.ServerConnectorEventProcessorService;
7+
import org.axonframework.eventhandling.gateway.EventGateway;
8+
import org.junit.ClassRule;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
import org.mockito.Mockito;
12+
import org.springframework.beans.factory.annotation.Autowired;
13+
import org.springframework.boot.test.context.SpringBootTest;
14+
import org.springframework.boot.test.mock.mockito.MockBean;
15+
import org.testcontainers.containers.DockerComposeContainer;
16+
import org.testcontainers.containers.wait.strategy.Wait;
17+
import org.testcontainers.junit.jupiter.Container;
18+
import org.testcontainers.junit.jupiter.Testcontainers;
19+
20+
import java.io.File;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.Mockito.*;
25+
26+
@Testcontainers
27+
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
28+
public class ResetServiceIntegrationTest {
29+
30+
@Autowired
31+
RestEventProcessorService restEventProcessorService;
32+
@Autowired
33+
FrameworkEventProcessorService frameworkEventProcessorService;
34+
@Autowired
35+
ServerConnectorEventProcessorService serverConnectorEventProcessorService;
36+
@Autowired
37+
EventGateway eventGateway;
38+
@MockBean
39+
private MyFakeProjection projection;
40+
41+
private static final String EVENT_PROCESSOR_NAME="io.axoniq";
42+
43+
@ClassRule
44+
@Container
45+
public static DockerComposeContainer environment =
46+
new DockerComposeContainer(new File("src/test/resources/compose-test.yml"))
47+
.withExposedService("axonserver", 8024, Wait.forListeningPort())
48+
.withExposedService("axonserver", 8124, Wait.forListeningPort())
49+
.waitingFor("axonserver", Wait.forLogMessage(".*Started AxonServer in .*",1));
50+
51+
52+
@BeforeEach
53+
void prepare(){
54+
createEvents();
55+
Mockito.clearInvocations(projection);
56+
}
57+
58+
@Test
59+
void verifyResetEventProcessorByFramework(){
60+
frameworkEventProcessorService.reset(EVENT_PROCESSOR_NAME).block();
61+
waitForAS();
62+
verify(projection, atLeast(10)).on(any());
63+
}
64+
65+
@Test
66+
void verifyResetEventProcessorByRest(){
67+
restEventProcessorService.reset(EVENT_PROCESSOR_NAME).block();
68+
waitForAS();
69+
verify(projection, atLeast(10)).on(any());
70+
}
71+
72+
@Test
73+
void verifyResetEventProcessorByServer(){
74+
serverConnectorEventProcessorService.reset(EVENT_PROCESSOR_NAME).block();
75+
waitForAS();
76+
verify(projection, atLeast(10)).on(any());
77+
}
78+
79+
80+
void createEvents(){
81+
for(int i = 0 ; i < 10; i++){
82+
eventGateway.publish(new Object());
83+
}
84+
waitForAS();
85+
}
86+
87+
private void waitForAS() {
88+
try {
89+
TimeUnit.SECONDS.sleep(2);
90+
} catch (InterruptedException e) {
91+
e.printStackTrace();
92+
}
93+
}
94+
95+
}

reset-handler/src/test/java/io/axoniq/controller/DELETE_ME_EventProcessorRestControllerTest.java

Lines changed: 0 additions & 32 deletions
This file was deleted.

reset-handler/src/test/java/io/axoniq/controller/ResetIntegrationTest.java renamed to reset-handler/src/test/java/io/axoniq/controller/ResetE2ETest.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,27 @@
22

33
import io.axoniq.MyFakeProjection;
44
import org.junit.ClassRule;
5-
import org.junit.jupiter.api.BeforeAll;
65
import org.junit.jupiter.api.Test;
76
import org.mockito.Mockito;
87
import org.springframework.beans.factory.annotation.Autowired;
98
import org.springframework.boot.test.context.SpringBootTest;
109
import org.springframework.boot.test.mock.mockito.MockBean;
1110
import org.springframework.boot.web.server.LocalServerPort;
1211
import org.springframework.test.web.reactive.server.WebTestClient;
13-
import org.springframework.web.reactive.function.client.WebClient;
1412
import org.testcontainers.containers.DockerComposeContainer;
1513
import org.testcontainers.containers.wait.strategy.Wait;
1614
import org.testcontainers.junit.jupiter.Container;
1715
import org.testcontainers.junit.jupiter.Testcontainers;
18-
import reactor.core.publisher.Mono;
19-
import reactor.test.StepVerifier;
2016

2117
import java.io.File;
22-
import java.time.Duration;
2318
import java.util.concurrent.TimeUnit;
2419

2520
import static org.mockito.ArgumentMatchers.any;
2621
import static org.mockito.Mockito.*;
2722

2823
@Testcontainers
2924
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
30-
public class ResetIntegrationTest {
25+
public class ResetE2ETest {
3126

3227
final static int PORT_A = 8024;
3328
final static int PORT_B = 8124;
@@ -60,20 +55,20 @@ void testEventsAreCreated(){
6055
.exchange()
6156
.expectStatus()
6257
.isOk();
63-
58+
waitForAS();
6459
verify(projection, times(1)).on(any());
6560
}
6661

6762

6863
@Test
69-
void testResetFrameworkWorks(){
64+
void testResetWorks(){
7065
prepareIntegrationTest();
7166

7267
createEvents();
7368

7469
verifyResetEventProcessorByMethod("framework");
7570
verifyResetEventProcessorByMethod("rest");
76-
// verifyResetEventProcessorByMethod("server");
71+
verifyResetEventProcessorByMethod("server");
7772

7873
}
7974

0 commit comments

Comments
 (0)