Skip to content

Commit 4620409

Browse files
authored
Merge pull request #139 from amosproj/feature/#116-message-endpoint
Feature/#116 message endpoint
2 parents d24f10f + 85ea523 commit 4620409

File tree

11 files changed

+81
-116
lines changed

11 files changed

+81
-116
lines changed

backend/src/main/java/de/amos/apachepulsarui/controller/MessageController.java

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88

99
import de.amos.apachepulsarui.dto.MessageDto;
1010
import de.amos.apachepulsarui.dto.MessagesDto;
11-
import de.amos.apachepulsarui.exception.BadRequestException;
1211
import de.amos.apachepulsarui.service.MessageService;
1312
import lombok.RequiredArgsConstructor;
1413
import org.springframework.http.HttpStatus;
15-
import org.springframework.http.MediaType;
1614
import org.springframework.http.ResponseEntity;
17-
import org.springframework.web.bind.annotation.*;
15+
import org.springframework.web.bind.annotation.GetMapping;
16+
import org.springframework.web.bind.annotation.RequestMapping;
17+
import org.springframework.web.bind.annotation.RequestParam;
18+
import org.springframework.web.bind.annotation.RestController;
1819

19-
import javax.validation.Valid;
2020
import java.util.List;
2121

2222
@RestController
@@ -27,21 +27,9 @@ public class MessageController {
2727
private final MessageService messageService;
2828

2929
@GetMapping
30-
public ResponseEntity<MessagesDto> getMessages(@RequestParam String topic, @RequestParam String subscription) {
31-
List<MessageDto> messageDtos = messageService.peekMessages(topic, subscription);
30+
public ResponseEntity<MessagesDto> getMessages(@RequestParam String topic,
31+
@RequestParam(required = false, defaultValue = "10") Integer numMessages) {
32+
List<MessageDto> messageDtos = messageService.getLatestMessagesOfTopic(topic, numMessages);
3233
return new ResponseEntity<>(new MessagesDto(messageDtos), HttpStatus.OK);
3334
}
34-
35-
@PostMapping(
36-
value = "/send",
37-
consumes = {MediaType.APPLICATION_JSON_VALUE}
38-
)
39-
public ResponseEntity<Void> sendMessage(@RequestBody @Valid MessageDto messageDto) {
40-
if (messageService.inValidTopicName(messageDto)) {
41-
throw new BadRequestException.InvalidTopicName();
42-
}
43-
messageService.sendMessage(messageDto);
44-
return new ResponseEntity<>(HttpStatus.CREATED);
45-
}
46-
4735
}

backend/src/main/java/de/amos/apachepulsarui/dto/ProducerDto.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import lombok.Data;
1212
import org.apache.pulsar.common.policies.data.PublisherStats;
1313

14-
import java.util.List;
15-
1614
@Data
1715
@AllArgsConstructor
1816
@Builder(access = AccessLevel.PRIVATE)
@@ -22,9 +20,8 @@ public class ProducerDto {
2220

2321
private String name;
2422

25-
private List<MessageDto> messagesDto;
26-
27-
private long amountOfMessages;
23+
//todo figure out how to get the amount of messages
24+
//private long amountOfMessages;
2825

2926
private String address;
3027

@@ -34,12 +31,11 @@ public class ProducerDto {
3431

3532
private String connectedSince;
3633

37-
public static ProducerDto create(PublisherStats publisherStats, List<MessageDto> messages) {
34+
public static ProducerDto create(PublisherStats publisherStats) {
3835
return ProducerDto.builder()
3936
.id(publisherStats.getProducerId())
4037
.name(publisherStats.getProducerName())
41-
.messagesDto(messages)
42-
.amountOfMessages(messages.size())
38+
//.amountOfMessages(numOfMsgs)
4339
.connectedSince(publisherStats.getConnectedSince())
4440
.address(publisherStats.getAddress())
4541
.averageMsgSize(publisherStats.getAverageMsgSize())

backend/src/main/java/de/amos/apachepulsarui/dto/SubscriptionDto.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ public class SubscriptionDto {
2424

2525
private List<String> inactiveConsumers;
2626

27-
private List<MessageDto> messages;
28-
2927
private long numberConsumers;
3028

3129
private double msgAckRate;
@@ -42,7 +40,7 @@ public class SubscriptionDto {
4240

4341
private String type;
4442

45-
public static SubscriptionDto create(SubscriptionStats subscriptionStats, List<MessageDto> messages, String name) {
43+
public static SubscriptionDto create(SubscriptionStats subscriptionStats, String name) {
4644
List<String> consumers = getConsumers(subscriptionStats);
4745
String active = consumers.stream()
4846
.filter(c -> Objects.equals(c, subscriptionStats.getActiveConsumerName()))
@@ -53,7 +51,6 @@ public static SubscriptionDto create(SubscriptionStats subscriptionStats, List<M
5351
.toList();
5452
return SubscriptionDto.builder()
5553
.name(name)
56-
.messages(messages)
5754
.activeConsumer(active)
5855
.inactiveConsumers(inactive)
5956
.msgAckRate(subscriptionStats.getMessageAckRate())

backend/src/main/java/de/amos/apachepulsarui/dto/TopicDetailDto.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import java.util.ArrayList;
1515
import java.util.Collections;
1616
import java.util.Comparator;
17-
import java.util.List;
17+
import java.util.List;
1818

1919
@Data
2020
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@@ -64,5 +64,4 @@ public static TopicDetailDto create(
6464

6565
return topicDetailDto;
6666
}
67-
6867
}

backend/src/main/java/de/amos/apachepulsarui/service/MessageService.java

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,40 @@
1212
import lombok.extern.slf4j.Slf4j;
1313
import org.apache.pulsar.client.admin.PulsarAdmin;
1414
import org.apache.pulsar.client.admin.PulsarAdminException;
15-
import org.apache.pulsar.client.api.Producer;
16-
import org.apache.pulsar.client.api.PulsarClient;
17-
import org.apache.pulsar.client.api.PulsarClientException;
15+
import org.apache.pulsar.client.api.Message;
16+
import org.apache.pulsar.common.api.proto.CommandSubscribe;
1817
import org.apache.pulsar.common.naming.TopicName;
1918
import org.springframework.stereotype.Service;
2019

21-
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
2221
import java.util.List;
2322

2423
@Service
2524
@Slf4j
2625
@RequiredArgsConstructor
2726
public class MessageService {
28-
29-
public static final int MAX_NUM_MESSAGES = 20;
30-
31-
private final PulsarClient pulsarClient;
3227
private final PulsarAdmin pulsarAdmin;
3328

34-
public List<MessageDto> peekMessages(String topic, String subscription) throws PulsarApiException {
29+
public List<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessages) {
3530
var schema = getSchemaIfExists(topic);
3631
try {
37-
var messages = pulsarAdmin.topics().peekMessages(topic, subscription, MAX_NUM_MESSAGES);
32+
var messages = new ArrayList<Message<byte[]>>();
33+
34+
// ensure that we don't look up more messages than exist
35+
var producedMessagesUntilNow = pulsarAdmin.topics().getStats(topic).getMsgInCounter();
36+
var numLookUpMessages = Math.min(producedMessagesUntilNow, numMessages);
37+
38+
for (int i = 0; i < numLookUpMessages; i++) {
39+
messages.add(
40+
pulsarAdmin.topics()
41+
.examineMessage(topic, CommandSubscribe.InitialPosition.Latest.name(), i));
42+
}
3843
return messages.stream()
3944
.map(message -> MessageDto.fromExistingMessage(message, schema))
4045
.toList();
4146
} catch (PulsarAdminException e) {
4247
throw new PulsarApiException(
43-
"Could not peek messages for topic '%s' with subscription '%s'".formatted(topic, subscription),
48+
"Could not examine the amount of '%d' messages for topic '%s'".formatted(numMessages, topic),
4449
e
4550
);
4651
}
@@ -54,39 +59,7 @@ private String getSchemaIfExists(String topic) {
5459
}
5560
}
5661

57-
public void sendMessage(MessageDto messageDto) throws PulsarApiException {
58-
try {
59-
Producer<byte[]> producer = this.createProducerFor(messageDto.getTopic());
60-
producer.send(messageDto.getPayload().getBytes(StandardCharsets.UTF_8));
61-
producer.close();
62-
} catch (PulsarClientException e) {
63-
throw new PulsarApiException("Could not send messageDto to topic '%s'".formatted(messageDto.getTopic()), e);
64-
}
65-
}
66-
67-
public void sendMessages(List<MessageDto> messageDtos) throws PulsarApiException {
68-
try {
69-
Producer<byte[]> producer = this.createProducerFor(messageDtos.iterator().next().getTopic());
70-
for (MessageDto messageDto : messageDtos) {
71-
producer.send(messageDto.getPayload().getBytes(StandardCharsets.UTF_8));
72-
}
73-
producer.close();
74-
} catch (PulsarClientException e) {
75-
throw new PulsarApiException(
76-
"Could not list of %s messageDtos to topic '%s'".formatted(messageDtos.size(), messageDtos.iterator().next().getTopic()),
77-
e
78-
);
79-
}
80-
}
81-
8262
public boolean inValidTopicName(MessageDto messageDto) {
8363
return !TopicName.isValid(messageDto.getTopic());
8464
}
85-
86-
private Producer<byte []> createProducerFor(String topicName) throws PulsarClientException {
87-
return pulsarClient.newProducer()
88-
.topic(topicName)
89-
.create();
90-
}
91-
9265
}

backend/src/main/java/de/amos/apachepulsarui/service/TopicService.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.Objects;
32-
import java.util.Set;
3332
import java.util.concurrent.ConcurrentHashMap;
3433
import java.util.function.Function;
3534
import java.util.function.Predicate;
@@ -87,7 +86,7 @@ public void createNewTopic(String topic) throws PulsarApiException {
8786

8887
/**
8988
* @param topicName The Name of the Topic you want to get detailed information about
90-
* @return A {@link TopicDetailDto}'s including {@link TopicStatsDto}, List of {@link MessageDto} and
89+
* @return A {@link TopicDetailDto}'s including {@link TopicStatsDto} and
9190
* additional metadata.
9291
*/
9392
public TopicDetailDto getTopicDetails(String topicName) throws PulsarApiException {
@@ -144,20 +143,23 @@ public ProducerDto getProducerByTopic(String topic, String producer) {
144143
PublisherStats publisherStats = topicStats.getPublishers().stream()
145144
.filter(ps -> Objects.equals(ps.getProducerName(), producer))
146145
.findFirst().orElseThrow();
147-
return create(publisherStats, getMessagesByProducer(topic, topicStats.getSubscriptions().keySet(), producer));
148-
}
149146

150-
private List<MessageDto> getMessagesByProducer(String topic, Set<String> subscriptions, String producer) {
151-
return subscriptions.stream()
152-
.flatMap(s -> messageService.peekMessages(topic, s).stream())
153-
.filter(distinctByKey(MessageDto::getMessageId))
154-
.filter(message -> Objects.equals(message.getProducer(), producer))
155-
.toList();
147+
return create(publisherStats);
156148
}
157149

150+
// Todo: will there be an extra call to get the producer messages?
151+
// if not needed, this can be thrown away, but I will leave it here until we know for sure
152+
153+
// private List<MessageDto> getMessagesByProducer(String topic, Set<String> subscriptions, String producer) {
154+
// return subscriptions.stream()
155+
// .flatMap(s -> messageService.peekMessages(topic, s).stream())
156+
// .filter(distinctByKey(MessageDto::getMessageId))
157+
// .filter(message -> Objects.equals(message.getProducer(), producer))
158+
// .toList();
159+
// }
160+
158161
public SubscriptionDto getSubscriptionByTopic(String topic, String subscription) {
159-
List<MessageDto> messages = messageService.peekMessages(topic, subscription);
160-
return SubscriptionDto.create(getTopicStats(topic).getSubscriptions().get(subscription), messages, subscription);
162+
return SubscriptionDto.create(getTopicStats(topic).getSubscriptions().get(subscription), subscription);
161163
}
162164

163165
//source: https://www.baeldung.com/java-streams-distinct-by

backend/src/test/java/de/amos/apachepulsarui/controller/MessageControllerTest.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.springframework.http.MediaType;
1717
import org.springframework.test.web.servlet.MockMvc;
1818

19+
import java.util.ArrayList;
1920
import java.util.List;
2021

2122
import static org.hamcrest.Matchers.equalTo;
@@ -36,19 +37,36 @@ public class MessageControllerTest {
3637
@Test
3738
void getMessages_returnsMessages() throws Exception {
3839
List<MessageDto> messageDtos = List.of(
39-
aMessage("topic-1", "Nebuchadnezzar"),
40-
aMessage("topic-2", "Serenity")
40+
aMessage("persistent://public/default/spaceships", "Nebuchadnezzar"),
41+
aMessage("persistent://public/default/spaceships", "Serenity")
4142
);
42-
Mockito.when(messageService.peekMessages("spaceships", "nasa-subscription")).thenReturn(messageDtos);
43+
Mockito.when(messageService.getLatestMessagesOfTopic("persistent://public/default/spaceships", 5))
44+
.thenReturn(messageDtos);
4345

44-
mockMvc.perform(get("/messages?topic=spaceships&subscription=nasa-subscription")
46+
mockMvc.perform(get("/messages?topic=persistent://public/default/spaceships&numMessages=5")
4547
.contentType(MediaType.APPLICATION_JSON))
4648
.andExpect(status().isOk())
4749
.andExpect(jsonPath("$.messages", hasSize(2)))
4850
.andExpect(jsonPath("$.messages[0].payload", equalTo("Nebuchadnezzar")))
4951
.andExpect(jsonPath("$.messages[1].payload", equalTo("Serenity")));
5052
}
5153

54+
@Test
55+
void getMessages_withoutNumMessages_returns10Messages() throws Exception {
56+
var messageDtos = new ArrayList<MessageDto>();
57+
for (int i = 0; i < 10; i++) {
58+
messageDtos.add(aMessage("persistent://public/default/test", "Test" + i));
59+
}
60+
61+
Mockito.when(messageService.getLatestMessagesOfTopic("persistent://public/default/test", 10))
62+
.thenReturn(messageDtos);
63+
64+
mockMvc.perform(get("/messages?topic=persistent://public/default/test")
65+
.contentType(MediaType.APPLICATION_JSON))
66+
.andExpect(status().isOk())
67+
.andExpect(jsonPath("$.messages", hasSize(10)));
68+
}
69+
5270
@NotNull
5371
private static MessageDto aMessage(String topic, String payload) {
5472
return MessageDto.create(topic, payload);

backend/src/test/java/de/amos/apachepulsarui/controller/TopicControllerTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,8 @@ void getTopicDetails() throws Exception {
173173
void getSubscriptionByNameAndTopic() throws Exception {
174174
String subscription = "R2D2";
175175
String topic = "persistent://public/default/droide";
176-
List<MessageDto> messages = List.of();
177176

178-
SubscriptionDto subscriptionDto = SubscriptionDto.create(subscriptionStats, messages ,subscription);
177+
SubscriptionDto subscriptionDto = SubscriptionDto.create(subscriptionStats, subscription);
179178

180179

181180
when(topicService.getSubscriptionByTopic(topic, subscription)).thenReturn(subscriptionDto);
@@ -191,10 +190,9 @@ void getSubscriptionByNameAndTopic() throws Exception {
191190
void getProducerByNameAndTopic() throws Exception {
192191
String producer = "C3PO";
193192
String topic = "persistent://public/default/droide";
194-
List<MessageDto> messages = List.of();
195193
when(publisherStats.getProducerName()).thenReturn(producer);
196194

197-
ProducerDto dto = ProducerDto.create(publisherStats, messages);
195+
ProducerDto dto = ProducerDto.create(publisherStats);
198196

199197
when(topicService.getProducerByTopic(topic, producer)).thenReturn(dto);
200198

0 commit comments

Comments
 (0)