Skip to content

Commit 6af2523

Browse files
committed
fix(citrus-spring-boot-simulator): process messages in fifo manner
this commit introduces a nice link between a scenario and its messages. an object is being attached to the `TestContext` which is unique per scenario execution. both the received `Message` as well as the response `Future` are contained in it.
1 parent aa44a5c commit 6af2523

16 files changed

+232
-166
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
*
2727
* @see SimulatorEndpointAdapter
2828
*/
29-
public class SimulationFailedUnexpectedlyException extends DefaultMessage {
29+
public class SimulationFailedUnexpectedlyExceptionMessage extends DefaultMessage {
3030

31-
public static final String EXCEPTION_TYPE = SimulationFailedUnexpectedlyException.class.getSimpleName() + ":Exception";
31+
public static final String EXCEPTION_TYPE = SimulationFailedUnexpectedlyExceptionMessage.class.getSimpleName() + ":Exception";
3232

33-
public SimulationFailedUnexpectedlyException(Throwable e) {
33+
public SimulationFailedUnexpectedlyExceptionMessage(Throwable e) {
3434
super(e);
3535
}
3636

simulator-spring-boot/src/main/java/org/citrusframework/simulator/endpoint/SimulatorEndpointAdapter.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import static java.util.Collections.emptyList;
4040
import static java.util.Objects.nonNull;
4141
import static java.util.concurrent.TimeUnit.MILLISECONDS;
42-
import static org.citrusframework.simulator.endpoint.SimulationFailedUnexpectedlyException.EXCEPTION_TYPE;
42+
import static org.citrusframework.simulator.endpoint.SimulationFailedUnexpectedlyExceptionMessage.EXCEPTION_TYPE;
4343
import static org.citrusframework.util.StringUtils.hasText;
4444

4545
public class SimulatorEndpointAdapter extends RequestDispatchingEndpointAdapter {
@@ -79,7 +79,7 @@ protected Message handleMessageInternal(Message message) {
7979

8080
private Message handleMessageWithCorrelation(Message request, CorrelationHandler handler) {
8181
CompletableFuture<Message> responseFuture = new CompletableFuture<>();
82-
handler.getScenarioEndpoint().add(request, responseFuture);
82+
// handler.getScenarioEndpoint().add(request, responseFuture);
8383

8484
return awaitResponseOrThrowException(responseFuture, handler.getScenarioEndpoint().getName());
8585
}
@@ -98,10 +98,13 @@ public Message dispatchMessage(Message message, String mappingName) {
9898
scenario.getScenarioEndpoint().setName(scenarioName);
9999

100100
CompletableFuture<Message> responseFuture = new CompletableFuture<>();
101-
scenario.getScenarioEndpoint().add(message, responseFuture);
102-
103101
try {
104-
scenarioExecutorService.run(scenario, scenarioName, emptyList());
102+
scenarioExecutorService.run(
103+
scenario,
104+
scenarioName,
105+
emptyList(),
106+
new ScenarioExecutorService.ExecutionRequestAndResponse(message, responseFuture)
107+
);
105108
} catch (Exception e) {
106109
throw getResponseStatusException(e);
107110
}

simulator-spring-boot/src/main/java/org/citrusframework/simulator/scenario/ScenarioEndpoint.java

Lines changed: 49 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,52 +16,46 @@
1616

1717
package org.citrusframework.simulator.scenario;
1818

19+
import com.google.common.annotations.VisibleForTesting;
20+
import jakarta.annotation.Nonnull;
21+
import lombok.extern.slf4j.Slf4j;
1922
import org.citrusframework.context.TestContext;
2023
import org.citrusframework.endpoint.AbstractEndpoint;
2124
import org.citrusframework.message.Message;
2225
import org.citrusframework.messaging.Consumer;
2326
import org.citrusframework.messaging.Producer;
2427
import org.citrusframework.simulator.endpoint.EndpointMessageHandler;
25-
import org.citrusframework.simulator.endpoint.SimulationFailedUnexpectedlyException;
28+
import org.citrusframework.simulator.endpoint.SimulationFailedUnexpectedlyExceptionMessage;
2629
import org.citrusframework.simulator.exception.SimulatorException;
27-
28-
import java.util.Stack;
29-
import java.util.concurrent.CompletableFuture;
30-
import java.util.concurrent.LinkedBlockingQueue;
30+
import org.citrusframework.simulator.service.ScenarioExecutorService;
3131

3232
import static java.lang.Thread.currentThread;
3333
import static java.util.Objects.isNull;
34-
import static java.util.concurrent.TimeUnit.MILLISECONDS;
34+
import static java.util.Objects.nonNull;
35+
import static org.citrusframework.simulator.service.runner.DefaultScenarioExecutorService.REQUEST_RESPONSE_MAPPING_VARIABLE_NAME;
3536

37+
@Slf4j
3638
public class ScenarioEndpoint extends AbstractEndpoint implements Producer, Consumer {
3739

38-
/**
39-
* Internal im memory message channel
40-
*/
41-
private final LinkedBlockingQueue<Message> channel = new LinkedBlockingQueue<>();
42-
43-
/**
44-
* Stack of response futures to complete
45-
*/
46-
private final Stack<CompletableFuture<Message>> responseFutures = new Stack<>();
47-
48-
/**
49-
* Default constructor using endpoint configuration.
50-
*
51-
* @param endpointConfiguration
52-
*/
53-
public ScenarioEndpoint(ScenarioEndpointConfiguration endpointConfiguration) {
54-
super(endpointConfiguration);
40+
@VisibleForTesting
41+
static final String NO_REQUEST_RESPONSE_MAPPING_IN_TEST_CONTEXT_MESSAGE = "No request-response mapping found in test context! This may happen if you're using the deprecated `ScenarioEndpoint#fail(Throwable)` API.";
42+
43+
@VisibleForTesting
44+
static final String NO_RESPONSE_FUTURE_IN_TEST_CONTEXT_MESSAGE = "Failed to match response futures to test context! This may happen if you're using the deprecated `ScenarioEndpoint#fail(Throwable)` API.";
45+
46+
private static @Nonnull ScenarioExecutorService.ExecutionRequestAndResponse getExecutionRequestAndResponse(TestContext testContext) {
47+
var requestResponseMapping = testContext.getVariables().get(REQUEST_RESPONSE_MAPPING_VARIABLE_NAME);
48+
49+
if (nonNull(requestResponseMapping) &&
50+
requestResponseMapping instanceof ScenarioExecutorService.ExecutionRequestAndResponse executionRequestAndResponse) {
51+
return executionRequestAndResponse;
52+
}
53+
54+
throw new SimulatorException(NO_REQUEST_RESPONSE_MAPPING_IN_TEST_CONTEXT_MESSAGE);
5555
}
5656

57-
/**
58-
* Adds new message for direct message consumption.
59-
*
60-
* @param request
61-
*/
62-
public void add(Message request, CompletableFuture<Message> future) {
63-
responseFutures.push(future);
64-
channel.add(request);
57+
public ScenarioEndpoint(ScenarioEndpointConfiguration endpointConfiguration) {
58+
super(endpointConfiguration);
6559
}
6660

6761
@Override
@@ -81,38 +75,43 @@ public Message receive(TestContext context) {
8175

8276
@Override
8377
public Message receive(TestContext context, long timeout) {
84-
try {
85-
Message message = channel.poll(timeout, MILLISECONDS);
78+
var message = pollMessageForExecution(context, timeout);
79+
messageReceived(message, context);
80+
81+
return message;
82+
}
8683

87-
if (isNull(message)) {
88-
throw new SimulatorException("Failed to receive scenario inbound message");
89-
}
84+
@Override
85+
public void send(Message message, TestContext testContext) {
86+
messageSent(message, testContext);
87+
completeNextResponseFuture(message, testContext);
88+
}
9089

91-
messageReceived(message, context);
90+
void fail(Throwable e, TestContext testContext) {
91+
completeNextResponseFuture(new SimulationFailedUnexpectedlyExceptionMessage(e), testContext);
92+
}
9293

93-
return message;
94+
private Message pollMessageForExecution(TestContext testContext, long timeout) {
95+
try {
96+
return receiveNextMessageFromChannel(testContext);
9497
} catch (InterruptedException e) {
9598
currentThread().interrupt();
9699
throw new SimulatorException(e);
97100
}
98101
}
99102

100-
@Override
101-
public void send(Message message, TestContext context) {
102-
messageSent(message, context);
103-
completeNextResponseFuture(message);
103+
private Message receiveNextMessageFromChannel(TestContext testContext) throws InterruptedException {
104+
return getExecutionRequestAndResponse(testContext).requestMessage();
104105
}
105106

106-
void fail(Throwable e) {
107-
completeNextResponseFuture(new SimulationFailedUnexpectedlyException(e));
108-
}
107+
private void completeNextResponseFuture(Message message, TestContext testContext) {
108+
var responseFuture = getExecutionRequestAndResponse(testContext).responseFuture();
109109

110-
private void completeNextResponseFuture(Message message) {
111-
if (responseFutures.isEmpty()) {
112-
throw new SimulatorException("Failed to process scenario response message - missing response consumer!");
113-
} else {
114-
responseFutures.pop().complete(message);
110+
if (isNull(responseFuture)) {
111+
throw new SimulatorException(NO_RESPONSE_FUTURE_IN_TEST_CONTEXT_MESSAGE);
115112
}
113+
114+
responseFuture.complete(message);
116115
}
117116

118117
private void messageSent(Message message, TestContext context) {

simulator-spring-boot/src/main/java/org/citrusframework/simulator/scenario/SimulatorScenario.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,9 @@ default String getName() {
5151
default Void registerException(Throwable e) {
5252
if (nonNull(getTestCaseRunner()) && getTestCaseRunner() instanceof DefaultTestCaseRunner defaultTestCaseRunner) {
5353
defaultTestCaseRunner.getContext().addException(new CitrusRuntimeException(e));
54+
getScenarioEndpoint().fail(e, defaultTestCaseRunner.getContext());
5455
}
5556

56-
getScenarioEndpoint().fail(e);
57-
5857
return null;
5958
}
6059

simulator-spring-boot/src/main/java/org/citrusframework/simulator/service/ScenarioExecutorService.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
package org.citrusframework.simulator.service;
1818

1919
import jakarta.annotation.Nullable;
20+
import org.citrusframework.message.Message;
2021
import org.citrusframework.simulator.model.ScenarioParameter;
2122
import org.citrusframework.simulator.scenario.SimulatorScenario;
2223

2324
import java.util.List;
25+
import java.util.concurrent.CompletableFuture;
2426

2527
/**
2628
* Service capable of executing test executables. It takes care on setting up the executable before execution. The given
@@ -40,7 +42,7 @@ public interface ScenarioExecutorService {
4042
* @param scenarioParameters the list of parameters to pass to the scenario when starting
4143
* @return the scenario execution id
4244
*/
43-
Long run(String name, @Nullable List<ScenarioParameter> scenarioParameters);
45+
Long run(String name, @Nullable List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse);
4446

4547
/**
4648
* Starts a new scenario instance using the collection of supplied parameters.
@@ -50,5 +52,10 @@ public interface ScenarioExecutorService {
5052
* @param scenarioParameters the list of parameters to pass to the scenario when starting
5153
* @return the scenario execution id
5254
*/
53-
Long run(SimulatorScenario scenario, String name, @Nullable List<ScenarioParameter> scenarioParameters);
55+
Long run(SimulatorScenario scenario, String name, @Nullable List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse);
56+
57+
record ExecutionRequestAndResponse(@Nullable Message requestMessage, @Nullable CompletableFuture<Message> responseFuture) {
58+
59+
public static ExecutionRequestAndResponse NOOP_EXECUTION = new ExecutionRequestAndResponse(null, null);
60+
}
5461
}

simulator-spring-boot/src/main/java/org/citrusframework/simulator/service/runner/AsyncScenarioExecutorService.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.stereotype.Service;
3333

3434
import java.util.List;
35+
import java.util.concurrent.CompletableFuture;
3536
import java.util.concurrent.ExecutorService;
3637

3738
import static java.util.concurrent.CompletableFuture.runAsync;
@@ -101,30 +102,30 @@ public void onApplicationEvent(ContextClosedEvent event) {
101102
}
102103

103104
/**
104-
* Overrides the {@link DefaultScenarioExecutorService#startScenario(Long, String, SimulatorScenario, List)} method
105+
* Overrides the {@link DefaultScenarioExecutorService#startScenario(Long, String, SimulatorScenario, List, ExecutionRequestAndResponse)} method
105106
* to execute the scenario asynchronously using the executor service.
106107
*
107-
* @param executionId the unique identifier for the scenario execution
108-
* @param name the name of the scenario to start
109-
* @param scenario the scenario instance to execute
110-
* @param scenarioParameters the list of parameters to pass to the scenario when starting
108+
* @param executionId the unique identifier for the scenario execution
109+
* @param name the name of the scenario to start
110+
* @param scenario the scenario instance to execute
111+
* @param scenarioParameters the list of parameters to pass to the scenario when starting
111112
*/
112113
@Override
113-
public void startScenario(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters) {
114-
startScenarioAsync(executionId, name, scenario, scenarioParameters);
114+
public void startScenario(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse) {
115+
startScenarioAsync(executionId, name, scenario, scenarioParameters, executionRequestAndResponse);
115116
}
116117

117118
/**
118119
* Submits the scenario execution task to the executor service for asynchronous execution.
119120
*
120-
* @param executionId the unique identifier for the scenario execution
121-
* @param name the name of the scenario to start
122-
* @param scenario the scenario instance to execute
123-
* @param scenarioParameters the list of parameters to pass to the scenario when starting
121+
* @param executionId the unique identifier for the scenario execution
122+
* @param name the name of the scenario to start
123+
* @param scenario the scenario instance to execute
124+
* @param scenarioParameters the list of parameters to pass to the scenario when starting
124125
*/
125-
private void startScenarioAsync(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters) {
126-
runAsync(() -> super.startScenario(executionId, name, scenario, scenarioParameters), executorService)
127-
.exceptionally(scenario::registerException);
126+
private void startScenarioAsync(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse) {
127+
runAsync(() -> super.startScenario(executionId, name, scenario, scenarioParameters, executionRequestAndResponse), executorService)
128+
.exceptionally(scenario::registerException);
128129
}
129130

130131
private void shutdownExecutor() {

simulator-spring-boot/src/main/java/org/citrusframework/simulator/service/runner/DefaultScenarioExecutorService.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class DefaultScenarioExecutorService implements ScenarioExecutorService {
6464

6565
private static final Logger logger = LoggerFactory.getLogger(DefaultScenarioExecutorService.class);
6666

67+
public static final String REQUEST_RESPONSE_MAPPING_VARIABLE_NAME = "scenario-execution-message-to-response-future";
68+
6769
private final ApplicationContext applicationContext;
6870
private final Citrus citrus;
6971
private final ScenarioExecutionService scenarioExecutionService;
@@ -79,41 +81,44 @@ public DefaultScenarioExecutorService(ApplicationContext applicationContext, Cit
7981
* the provided parameters. This method serves as an entry point for scenario execution, handling the entire
8082
* lifecycle from scenario lookup to execution completion.
8183
*
82-
* @param name the name of the scenario to execute, used to look up the corresponding {@link SimulatorScenario} bean
83-
* @param scenarioParameters a list of {@link ScenarioParameter}s to pass to the scenario, may be {@code null}
84+
* @param name the name of the scenario to execute, used to look up the corresponding {@link SimulatorScenario} bean
85+
* @param scenarioParameters a list of {@link ScenarioParameter}s to pass to the scenario, may be {@code null}
86+
* @param executionRequestAndResponse mapping of request and response message
8487
* @return the unique identifier of the scenario execution, used for tracking and management purposes
8588
*/
8689
@Override
87-
public final Long run(String name, @Nullable List<ScenarioParameter> scenarioParameters) {
88-
return run(applicationContext.getBean(name, SimulatorScenario.class), name, scenarioParameters);
90+
public final Long run(String name, @Nullable List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse) {
91+
return run(applicationContext.getBean(name, SimulatorScenario.class), name, scenarioParameters, executionRequestAndResponse);
8992
}
9093

9194
/**
9295
* Executes the given {@link SimulatorScenario} with the provided name and parameters. This method orchestrates
9396
* the scenario execution process, including pre-execution preparation, scenario execution, and post-execution
9497
* cleanup, ensuring a consistent execution environment for each scenario.
9598
*
96-
* @param scenario the {@link SimulatorScenario} to execute
97-
* @param name the name of the scenario, used for logging and tracking purposes
98-
* @param scenarioParameters a list of {@link ScenarioParameter}s to pass to the scenario, may be {@code null}
99+
* @param scenario the {@link SimulatorScenario} to execute
100+
* @param name the name of the scenario, used for logging and tracking purposes
101+
* @param scenarioParameters a list of {@link ScenarioParameter}s to pass to the scenario, may be {@code null}
102+
* @param executionRequestAndResponse mapping of request and response message
99103
* @return the unique identifier of the scenario execution
100104
*/
101105
@Override
102-
public final Long run(SimulatorScenario scenario, String name, @Nullable List<ScenarioParameter> scenarioParameters) {
106+
public final Long run(SimulatorScenario scenario, String name, @Nullable List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse) {
103107
ScenarioExecution scenarioExecution = scenarioExecutionService.createAndSaveExecutionScenario(name, scenarioParameters);
104108

105109
prepareBeforeExecution(scenario);
106110

107-
startScenario(scenarioExecution.getExecutionId(), name, scenario, scenarioParameters);
111+
startScenario(scenarioExecution.getExecutionId(), name, scenario, scenarioParameters, executionRequestAndResponse);
108112

109113
return scenarioExecution.getExecutionId();
110114
}
111115

112-
protected void startScenario(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters) {
116+
protected void startScenario(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse) {
113117
logger.info("Starting scenario : {}", name);
114118

115-
var context = createTestContext();
116-
createAndRunScenarioRunner(context, executionId, name, scenario, scenarioParameters);
119+
var testContext = createTestContext();
120+
testContext.setVariable(REQUEST_RESPONSE_MAPPING_VARIABLE_NAME, executionRequestAndResponse);
121+
createAndRunScenarioRunner(testContext, executionId, name, scenario, scenarioParameters);
117122

118123
logger.debug("Scenario completed: {}", name);
119124
}

simulator-spring-boot/src/main/java/org/citrusframework/simulator/web/rest/ScenarioResource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import static java.net.URLDecoder.decode;
5050
import static java.nio.charset.StandardCharsets.UTF_8;
5151
import static java.util.Comparator.comparing;
52+
import static org.citrusframework.simulator.service.ScenarioExecutorService.ExecutionRequestAndResponse.NOOP_EXECUTION;
5253
import static org.citrusframework.simulator.web.rest.ScenarioResource.Scenario.ScenarioType.MESSAGE_TRIGGERED;
5354
import static org.citrusframework.simulator.web.rest.ScenarioResource.Scenario.ScenarioType.STARTER;
5455
import static org.citrusframework.simulator.web.util.PaginationUtil.createPage;
@@ -140,7 +141,7 @@ public Collection<ScenarioParameterDTO> getScenarioParameters(@PathVariable("sce
140141
@PostMapping("scenarios/{scenarioName}/launch")
141142
public Long launchScenario(@NotEmpty @PathVariable("scenarioName") String scenarioName, @RequestBody(required = false) List<ScenarioParameterDTO> scenarioParameters) {
142143
logger.debug("REST request to launch Scenario '{}' with Parameters: {}", scenarioName, scenarioParameters);
143-
return scenarioExecutorService.run(scenarioName, scenarioParameters.stream().map(scenarioParameterMapper::toEntity).toList());
144+
return scenarioExecutorService.run(scenarioName, scenarioParameters.stream().map(scenarioParameterMapper::toEntity).toList(), NOOP_EXECUTION);
144145
}
145146

146147
public record Scenario(String name, ScenarioResource.Scenario.ScenarioType type) {

0 commit comments

Comments
 (0)