Skip to content

Commit 951d727

Browse files
authored
Merge pull request #203 from amosproj/feature/deduplicate-messages
2 parents cb1bcec + 9549cf7 commit 951d727

File tree

5 files changed

+34
-22
lines changed

5 files changed

+34
-22
lines changed

backend/src/main/java/de/amos/apachepulsarui/controller/MessageController.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.springframework.web.bind.annotation.RestController;
1919

2020
import java.util.List;
21+
import java.util.Set;
2122

2223
@RestController
2324
@RequestMapping("/messages")
@@ -30,8 +31,10 @@ public class MessageController {
3031
public ResponseEntity<MessagesDto> getMessages(@RequestParam String topic,
3132
@RequestParam(required = false, defaultValue = "10") Integer numMessages,
3233
@RequestParam(required = false, defaultValue = "") List<String> producers,
33-
@RequestParam(required = false, defaultValue = "") List<String> subscriptions) {
34-
List<MessageDto> messageDtos = messageService.getLatestMessagesFiltered(topic, numMessages, producers, subscriptions);
34+
@RequestParam(required = false, defaultValue = "") List<String> subscriptions)
35+
{
36+
Set<MessageDto> messageDtos = messageService.getLatestMessagesFiltered(topic, numMessages, producers, subscriptions);
3537
return new ResponseEntity<>(new MessagesDto(messageDtos), HttpStatus.OK);
3638
}
39+
3740
}

backend/src/main/java/de/amos/apachepulsarui/dto/MessagesDto.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
import lombok.AllArgsConstructor;
99
import lombok.Data;
1010

11-
import java.util.List;
11+
import java.util.Set;
1212

1313

1414
@Data
1515
@AllArgsConstructor
1616
public class MessagesDto {
1717

18-
private List<MessageDto> messages;
18+
private Set<MessageDto> messages;
1919

2020
}

backend/src/main/java/de/amos/apachepulsarui/service/MessageService.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@
1919

2020
import java.util.ArrayList;
2121
import java.util.Collections;
22+
import java.util.Comparator;
23+
import java.util.LinkedHashSet;
2224
import java.util.List;
25+
import java.util.Set;
26+
import java.util.stream.Collectors;
2327

2428
@Service
2529
@Slf4j
2630
@RequiredArgsConstructor
2731
public class MessageService {
2832
private final PulsarAdmin pulsarAdmin;
2933

30-
public List<MessageDto> getLatestMessagesFiltered(String topic, Integer numMessages, List<String> producers, List<String> subscriptions) {
31-
List<MessageDto> messageDtos = getLatestMessagesOfTopic(topic, numMessages);
34+
public Set<MessageDto> getLatestMessagesFiltered(String topic, Integer numMessages, List<String> producers, List<String> subscriptions) {
35+
Set<MessageDto> messageDtos = getLatestMessagesOfTopic(topic, numMessages);
3236
if (!producers.isEmpty()) {
3337
messageDtos = filterByProducers(messageDtos, producers);
3438
}
@@ -39,12 +43,14 @@ public List<MessageDto> getLatestMessagesFiltered(String topic, Integer numMessa
3943
return messageDtos;
4044
}
4145

42-
private List<MessageDto> filterBySubscription(List<MessageDto> messageDtos, Integer numMessages, String topic, List<String> subscriptions) {
46+
private Set<MessageDto> filterBySubscription(Set<MessageDto> messageDtos, Integer numMessages, String topic, List<String> subscriptions) {
4347
List<String> messageIds = subscriptions.stream()
4448
.flatMap(s -> peekMessageIds(topic, s, numMessages).stream())
4549
.toList();
4650

47-
return messageDtos.stream().filter(m -> messageIds.contains(m.getMessageId())).toList();
51+
return messageDtos.stream()
52+
.filter(m -> messageIds.contains(m.getMessageId()))
53+
.collect(Collectors.toCollection(LinkedHashSet::new));
4854
}
4955

5056
private List<String> peekMessageIds(String topic, String subscription, Integer numMessages) {
@@ -60,14 +66,13 @@ private List<String> peekMessageIds(String topic, String subscription, Integer n
6066
}
6167

6268

63-
private List<MessageDto> filterByProducers(List<MessageDto> messageDtos, List<String> producers) {
69+
private Set<MessageDto> filterByProducers(Set<MessageDto> messageDtos, List<String> producers) {
6470
return messageDtos.stream()
6571
.filter(m -> producers.contains(m.getProducer()))
66-
.toList();
67-
72+
.collect(Collectors.toCollection(LinkedHashSet::new));
6873
}
6974

70-
private List<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessages) {
75+
private Set<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessages) {
7176
var schema = getSchemaIfExists(topic);
7277
try {
7378
var messages = new ArrayList<Message<byte[]>>();
@@ -83,7 +88,10 @@ private List<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessa
8388
}
8489
return messages.stream()
8590
.map(message -> MessageDto.fromExistingMessage(message, schema))
86-
.toList();
91+
// latest message first in set
92+
.sorted(Comparator.comparing(MessageDto::getPublishTime, Comparator.reverseOrder()))
93+
// linked to keep the order!
94+
.collect(Collectors.toCollection(LinkedHashSet::new));
8795
} catch (PulsarAdminException e) {
8896
throw new PulsarApiException(
8997
"Could not examine the amount of '%d' messages for topic '%s'".formatted(numMessages, topic),

backend/src/test/java/de/amos/apachepulsarui/controller/MessageControllerTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
import org.springframework.http.MediaType;
1717
import org.springframework.test.web.servlet.MockMvc;
1818

19-
import java.util.ArrayList;
19+
import java.util.HashSet;
2020
import java.util.List;
21+
import java.util.Set;
2122

2223
import static java.util.Collections.emptyList;
2324
import static org.hamcrest.Matchers.equalTo;
@@ -37,7 +38,7 @@ public class MessageControllerTest {
3738

3839
@Test
3940
void getMessages_returnsMessages() throws Exception {
40-
List<MessageDto> messageDtos = List.of(
41+
Set<MessageDto> messageDtos = Set.of(
4142
aMessage("persistent://public/default/spaceships", "Nebuchadnezzar"),
4243
aMessage("persistent://public/default/spaceships", "Serenity")
4344
);
@@ -54,7 +55,7 @@ void getMessages_returnsMessages() throws Exception {
5455

5556
@Test
5657
void getMessages_withoutNumMessages_returns10Messages() throws Exception {
57-
var messageDtos = new ArrayList<MessageDto>();
58+
HashSet<MessageDto> messageDtos = new HashSet<>();
5859
for (int i = 0; i < 10; i++) {
5960
messageDtos.add(aMessage("persistent://public/default/test", "Test" + i));
6061
}
@@ -70,7 +71,7 @@ void getMessages_withoutNumMessages_returns10Messages() throws Exception {
7071

7172
@Test
7273
void getMessages_withProducer_returns10Messages() throws Exception {
73-
var messageDtos = new ArrayList<MessageDto>();
74+
HashSet<MessageDto> messageDtos = new HashSet<>();
7475
for (int i = 0; i < 10; i++) {
7576
messageDtos.add(aMessage("persistent://public/default/test", "Test" + i));
7677
}
@@ -86,7 +87,7 @@ void getMessages_withProducer_returns10Messages() throws Exception {
8687

8788
@Test
8889
void getMessages_withSubscription_returns10Messages() throws Exception {
89-
var messageDtos = new ArrayList<MessageDto>();
90+
HashSet<MessageDto> messageDtos = new HashSet<>();
9091
for (int i = 0; i < 10; i++) {
9192
messageDtos.add(aMessage("persistent://public/default/test", "Test" + i));
9293
}

backend/src/test/java/de/amos/apachepulsarui/service/MessageServiceIntegrationTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ void getNumberOfLatestMessagesFromTopic_returnsMessages() throws Exception {
6565
}
6666
var messages = messageService.getLatestMessagesFiltered(TOPICNAME, 1, emptyList(), emptyList());
6767

68-
MessageDto messageReceived = messages.get(0);
68+
MessageDto messageReceived = messages.iterator().next();
6969
assertThat(messageReceived.getMessageId()).isNotEmpty(); // generated
7070
assertThat(messageReceived.getTopic()).isEqualTo(messageToSend.getTopic());
7171
assertThat(messageReceived.getPayload()).isEqualTo(messageToSend.getPayload());
@@ -91,7 +91,7 @@ void getNumberOfLatestMessagesFromTopicFilteredByProducer_returnsMessages() thro
9191
}
9292
var messages = messageService.getLatestMessagesFiltered(TOPICNAME, 1, List.of(producerName), emptyList());
9393

94-
MessageDto messageReceived = messages.get(0);
94+
MessageDto messageReceived = messages.iterator().next();
9595
assertThat(messageReceived.getMessageId()).isNotEmpty(); // generated
9696
assertThat(messageReceived.getTopic()).isEqualTo(messageToSend.getTopic());
9797
assertThat(messageReceived.getPayload()).isEqualTo(messageToSend.getPayload());
@@ -155,7 +155,7 @@ void getNumberOfLatestMessagesFromTopicFilteredBySubscription_returnsMessages()
155155
}
156156
var messages = messageService.getLatestMessagesFiltered(TOPICNAME, 1, emptyList(), List.of(subscriptionName));
157157

158-
MessageDto messageReceived = messages.get(0);
158+
MessageDto messageReceived = messages.iterator().next();
159159
assertThat(messageReceived.getMessageId()).isNotEmpty(); // generated
160160
assertThat(messageReceived.getTopic()).isEqualTo(messageToSend.getTopic());
161161
assertThat(messageReceived.getPayload()).isEqualTo(messageToSend.getPayload());
@@ -179,7 +179,7 @@ void getNumberOfLatestMessagesFromTopic_forMessageWithSchema_returnsSchema() thr
179179
}
180180
var messages = messageService.getLatestMessagesFiltered(TOPICNAME, 1, emptyList(), emptyList());
181181

182-
MessageDto messageReceived = messages.get(0);
182+
MessageDto messageReceived = messages.iterator().next();
183183
assertThat(messageReceived.getSchema()).isEqualTo(schema.getSchemaInfo().getSchemaDefinition());
184184
}
185185

0 commit comments

Comments
 (0)