Skip to content

Commit 088208e

Browse files
committed
Merge remote-tracking branch 'origin/sprint-09' into feature/95-create-subscription-information-popup
Signed-off-by: Anna Haverkamp <anna.lucia.haverkamp@gmail.com> # Conflicts: # backend/src/main/java/de/amos/apachepulsarui/service/TopicService.java # backend/src/test/java/de/amos/apachepulsarui/service/TopicServiceTest.java # frontend/src/store/filterSlice.ts
2 parents 28ea420 + d948441 commit 088208e

23 files changed

+284
-188
lines changed

backend/src/main/java/de/amos/apachepulsarui/controller/MessageController.java

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88

99
import de.amos.apachepulsarui.dto.MessageDto;
1010
import de.amos.apachepulsarui.dto.MessagesDto;
11-
import de.amos.apachepulsarui.exception.BadRequestException;
1211
import de.amos.apachepulsarui.service.MessageService;
1312
import lombok.RequiredArgsConstructor;
1413
import org.springframework.http.HttpStatus;
15-
import org.springframework.http.MediaType;
1614
import org.springframework.http.ResponseEntity;
17-
import org.springframework.web.bind.annotation.*;
15+
import org.springframework.web.bind.annotation.GetMapping;
16+
import org.springframework.web.bind.annotation.RequestMapping;
17+
import org.springframework.web.bind.annotation.RequestParam;
18+
import org.springframework.web.bind.annotation.RestController;
1819

19-
import javax.validation.Valid;
2020
import java.util.List;
2121

2222
@RestController
@@ -27,21 +27,9 @@ public class MessageController {
2727
private final MessageService messageService;
2828

2929
@GetMapping
30-
public ResponseEntity<MessagesDto> getMessages(@RequestParam String topic, @RequestParam String subscription) {
31-
List<MessageDto> messageDtos = messageService.peekMessages(topic, subscription);
30+
public ResponseEntity<MessagesDto> getMessages(@RequestParam String topic,
31+
@RequestParam(required = false, defaultValue = "10") Integer numMessages) {
32+
List<MessageDto> messageDtos = messageService.getLatestMessagesOfTopic(topic, numMessages);
3233
return new ResponseEntity<>(new MessagesDto(messageDtos), HttpStatus.OK);
3334
}
34-
35-
@PostMapping(
36-
value = "/send",
37-
consumes = {MediaType.APPLICATION_JSON_VALUE}
38-
)
39-
public ResponseEntity<Void> sendMessage(@RequestBody @Valid MessageDto messageDto) {
40-
if (messageService.inValidTopicName(messageDto)) {
41-
throw new BadRequestException.InvalidTopicName();
42-
}
43-
messageService.sendMessage(messageDto);
44-
return new ResponseEntity<>(HttpStatus.CREATED);
45-
}
46-
4735
}

backend/src/main/java/de/amos/apachepulsarui/dto/ProducerDto.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import lombok.Data;
1212
import org.apache.pulsar.common.policies.data.PublisherStats;
1313

14-
import java.util.List;
15-
1614
@Data
1715
@AllArgsConstructor
1816
@Builder(access = AccessLevel.PRIVATE)
@@ -22,9 +20,8 @@ public class ProducerDto {
2220

2321
private String name;
2422

25-
private List<MessageDto> messagesDto;
26-
27-
private long amountOfMessages;
23+
//todo figure out how to get the amount of messages
24+
//private long amountOfMessages;
2825

2926
private String address;
3027

@@ -34,12 +31,11 @@ public class ProducerDto {
3431

3532
private String connectedSince;
3633

37-
public static ProducerDto create(PublisherStats publisherStats, List<MessageDto> messages) {
34+
public static ProducerDto create(PublisherStats publisherStats) {
3835
return ProducerDto.builder()
3936
.id(publisherStats.getProducerId())
4037
.name(publisherStats.getProducerName())
41-
.messagesDto(messages)
42-
.amountOfMessages(messages.size())
38+
//.amountOfMessages(numOfMsgs)
4339
.connectedSince(publisherStats.getConnectedSince())
4440
.address(publisherStats.getAddress())
4541
.averageMsgSize(publisherStats.getAverageMsgSize())
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SPDX-License-Identifier: MIT
3+
* SPDX-FileCopyrightText: 2023 Jonas Arnhold <jonasarnhold@web.de>
4+
*/
5+
6+
package de.amos.apachepulsarui.dto;
7+
8+
import lombok.Builder;
9+
import lombok.Value;
10+
import org.apache.pulsar.common.schema.SchemaInfo;
11+
import org.apache.pulsar.common.schema.SchemaType;
12+
13+
import java.sql.Timestamp;
14+
import java.util.Map;
15+
16+
@Value
17+
@Builder
18+
public class SchemaInfoDto {
19+
20+
String name;
21+
long version;
22+
SchemaType type;
23+
Map<String, String> properties;
24+
String schemaDefinition;
25+
Timestamp timestamp;
26+
27+
public static SchemaInfoDto create(SchemaInfo schemaInfo, Long version) {
28+
return SchemaInfoDto.builder()
29+
.name(schemaInfo.getName())
30+
.version(version)
31+
.type(schemaInfo.getType())
32+
.properties(schemaInfo.getProperties())
33+
.schemaDefinition(schemaInfo.getSchemaDefinition())
34+
.timestamp(new Timestamp(schemaInfo.getTimestamp()))
35+
.build();
36+
}
37+
38+
}

backend/src/main/java/de/amos/apachepulsarui/dto/SubscriptionDto.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ public class SubscriptionDto {
2424

2525
private List<String> inactiveConsumers;
2626

27-
private List<MessageDto> messages;
28-
2927
private long numberConsumers;
3028

3129
private double msgAckRate;
@@ -42,7 +40,7 @@ public class SubscriptionDto {
4240

4341
private String type;
4442

45-
public static SubscriptionDto create(SubscriptionStats subscriptionStats, List<MessageDto> messages, String name) {
43+
public static SubscriptionDto create(SubscriptionStats subscriptionStats, String name) {
4644
List<String> consumers = getConsumers(subscriptionStats);
4745
String active = consumers.stream()
4846
.filter(c -> Objects.equals(c, subscriptionStats.getActiveConsumerName()))
@@ -53,7 +51,6 @@ public static SubscriptionDto create(SubscriptionStats subscriptionStats, List<M
5351
.toList();
5452
return SubscriptionDto.builder()
5553
.name(name)
56-
.messages(messages)
5754
.activeConsumer(active)
5855
.inactiveConsumers(inactive)
5956
.msgAckRate(subscriptionStats.getMessageAckRate())

backend/src/main/java/de/amos/apachepulsarui/dto/TopicDetailDto.java

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,58 +12,56 @@
1212
import org.apache.pulsar.common.policies.data.TopicStats;
1313

1414
import java.util.ArrayList;
15-
import java.util.List;
15+
import java.util.Collections;
16+
import java.util.Comparator;
17+
import java.util.List;
1618

1719
@Data
1820
@NoArgsConstructor(access = AccessLevel.PRIVATE)
1921
public class TopicDetailDto {
2022

2123
private String name;
2224

23-
private String localName;
25+
private String localName;
2426

25-
private String namespace;
27+
private String namespace;
2628

27-
private String tenant;
29+
private String tenant;
2830

29-
private boolean isPersistent;
31+
private boolean isPersistent;
3032

31-
private String ownerBroker;
33+
private String ownerBroker;
3234

33-
private TopicStatsDto topicStatsDto;
35+
private TopicStatsDto topicStatsDto;
3436

35-
private long producedMessages;
37+
private List<SchemaInfoDto> schemaInfos = new ArrayList<>();
3638

37-
private long consumedMessages;
38-
39-
private List<MessageDto> messagesDto = new ArrayList<>();
40-
41-
42-
/**
43-
* Converts a valid complete topic name (like "persistent://eu-tenant/hr/fizzbuzz" into a {@link TopicDetailDto}.
44-
*/
45-
public static TopicDetailDto create(String completeTopicName, TopicStats topicStats, String ownerBroker) {
39+
public static TopicDetailDto create(
40+
String completeTopicName,
41+
TopicStats topicStats,
42+
String ownerBroker,
43+
List<SchemaInfoDto> schemaInfos
44+
) {
4645
TopicName topicName = TopicName.get(completeTopicName);
47-
TopicDetailDto topicDetailDto = new TopicDetailDto();
48-
topicDetailDto.name = topicName.toString();
49-
topicDetailDto.localName = topicName.getLocalName();
50-
topicDetailDto.namespace = topicName.getNamespacePortion();
51-
topicDetailDto.tenant = topicName.getTenant();
52-
topicDetailDto.isPersistent = topicName.isPersistent();
53-
topicDetailDto.ownerBroker = ownerBroker;
54-
topicDetailDto.setTopicStatsDto(TopicStatsDto.createTopicStatsDto(topicStats));
55-
topicDetailDto.producedMessages = topicDetailDto.topicStatsDto.getProducedMesages();
56-
topicDetailDto.consumedMessages = topicDetailDto.topicStatsDto.getConsumedMessages();
57-
58-
return topicDetailDto;
46+
TopicDetailDto topicDetailDto = new TopicDetailDto();
47+
48+
topicDetailDto.name = topicName.toString();
49+
topicDetailDto.localName = topicName.getLocalName();
50+
topicDetailDto.namespace = topicName.getNamespacePortion();
51+
topicDetailDto.tenant = topicName.getTenant();
52+
topicDetailDto.isPersistent = topicName.isPersistent();
53+
topicDetailDto.ownerBroker = ownerBroker;
54+
55+
topicDetailDto.setTopicStatsDto(TopicStatsDto.createTopicStatsDto(topicStats));
56+
57+
// we want to put the latest schema version on top of the list
58+
if (!schemaInfos.isEmpty()) {
59+
List<SchemaInfoDto> schemaInfosSorted = schemaInfos.stream()
60+
.sorted(Comparator.comparing(SchemaInfoDto::getVersion, Collections.reverseOrder()))
61+
.toList();
62+
topicDetailDto.setSchemaInfos(schemaInfosSorted);
63+
}
64+
65+
return topicDetailDto;
5966
}
60-
61-
public static TopicDetailDto createTopicDtoWithMessages(String completeTopicName, TopicStats topicStats, String ownerBroker, List<MessageDto> messages) {
62-
TopicDetailDto topicDto = create(completeTopicName, topicStats, ownerBroker);
63-
topicDto.setMessagesDto(messages);
64-
return topicDto;
65-
}
66-
67-
68-
6967
}

backend/src/main/java/de/amos/apachepulsarui/service/ClusterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class ClusterService {
2121
private final PulsarAdmin pulsarAdmin;
2222
private final TenantService tenantService;
2323

24-
@Cacheable("cluster.all")
24+
@Cacheable("cluster.allNames")
2525
public List<String> getAllNames() {
2626
try {
2727
return pulsarAdmin.clusters().getClusters();

backend/src/main/java/de/amos/apachepulsarui/service/MessageService.java

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,40 @@
1212
import lombok.extern.slf4j.Slf4j;
1313
import org.apache.pulsar.client.admin.PulsarAdmin;
1414
import org.apache.pulsar.client.admin.PulsarAdminException;
15-
import org.apache.pulsar.client.api.Producer;
16-
import org.apache.pulsar.client.api.PulsarClient;
17-
import org.apache.pulsar.client.api.PulsarClientException;
15+
import org.apache.pulsar.client.api.Message;
16+
import org.apache.pulsar.common.api.proto.CommandSubscribe;
1817
import org.apache.pulsar.common.naming.TopicName;
1918
import org.springframework.stereotype.Service;
2019

21-
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
2221
import java.util.List;
2322

2423
@Service
2524
@Slf4j
2625
@RequiredArgsConstructor
2726
public class MessageService {
28-
29-
public static final int MAX_NUM_MESSAGES = 20;
30-
31-
private final PulsarClient pulsarClient;
3227
private final PulsarAdmin pulsarAdmin;
3328

34-
public List<MessageDto> peekMessages(String topic, String subscription) throws PulsarApiException {
29+
public List<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessages) {
3530
var schema = getSchemaIfExists(topic);
3631
try {
37-
var messages = pulsarAdmin.topics().peekMessages(topic, subscription, MAX_NUM_MESSAGES);
32+
var messages = new ArrayList<Message<byte[]>>();
33+
34+
// ensure that we don't look up more messages than exist
35+
var producedMessagesUntilNow = pulsarAdmin.topics().getStats(topic).getMsgInCounter();
36+
var numLookUpMessages = Math.min(producedMessagesUntilNow, numMessages);
37+
38+
for (int i = 0; i < numLookUpMessages; i++) {
39+
messages.add(
40+
pulsarAdmin.topics()
41+
.examineMessage(topic, CommandSubscribe.InitialPosition.Latest.name(), i));
42+
}
3843
return messages.stream()
3944
.map(message -> MessageDto.fromExistingMessage(message, schema))
4045
.toList();
4146
} catch (PulsarAdminException e) {
4247
throw new PulsarApiException(
43-
"Could not peek messages for topic '%s' with subscription '%s'".formatted(topic, subscription),
48+
"Could not examine the amount of '%d' messages for topic '%s'".formatted(numMessages, topic),
4449
e
4550
);
4651
}
@@ -54,39 +59,7 @@ private String getSchemaIfExists(String topic) {
5459
}
5560
}
5661

57-
public void sendMessage(MessageDto messageDto) throws PulsarApiException {
58-
try {
59-
Producer<byte[]> producer = this.createProducerFor(messageDto.getTopic());
60-
producer.send(messageDto.getPayload().getBytes(StandardCharsets.UTF_8));
61-
producer.close();
62-
} catch (PulsarClientException e) {
63-
throw new PulsarApiException("Could not send messageDto to topic '%s'".formatted(messageDto.getTopic()), e);
64-
}
65-
}
66-
67-
public void sendMessages(List<MessageDto> messageDtos) throws PulsarApiException {
68-
try {
69-
Producer<byte[]> producer = this.createProducerFor(messageDtos.iterator().next().getTopic());
70-
for (MessageDto messageDto : messageDtos) {
71-
producer.send(messageDto.getPayload().getBytes(StandardCharsets.UTF_8));
72-
}
73-
producer.close();
74-
} catch (PulsarClientException e) {
75-
throw new PulsarApiException(
76-
"Could not list of %s messageDtos to topic '%s'".formatted(messageDtos.size(), messageDtos.iterator().next().getTopic()),
77-
e
78-
);
79-
}
80-
}
81-
8262
public boolean inValidTopicName(MessageDto messageDto) {
8363
return !TopicName.isValid(messageDto.getTopic());
8464
}
85-
86-
private Producer<byte []> createProducerFor(String topicName) throws PulsarClientException {
87-
return pulsarClient.newProducer()
88-
.topic(topicName)
89-
.create();
90-
}
91-
9265
}

backend/src/main/java/de/amos/apachepulsarui/service/NamespaceService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.pulsar.client.admin.Namespaces;
1414
import org.apache.pulsar.client.admin.PulsarAdmin;
1515
import org.apache.pulsar.client.admin.PulsarAdminException;
16+
import org.springframework.cache.annotation.Cacheable;
1617
import org.springframework.stereotype.Service;
1718

1819
import java.util.List;
@@ -46,10 +47,12 @@ public List<NamespaceDto> getAllForTenants(List<String> tenants) {
4647
.toList();
4748
}
4849

50+
@Cacheable("namespace.detail")
4951
public NamespaceDetailDto getNamespaceDetails(String namespaceName) {
5052
return enrichWithNamespaceData(NamespaceDetailDto.fromString(namespaceName));
5153
}
5254

55+
@Cacheable("namespace.allNames")
5356
public List<String> getAllOfTenant(String tenantName) throws PulsarApiException {
5457
try {
5558
return pulsarAdmin.namespaces().getNamespaces(tenantName);

backend/src/main/java/de/amos/apachepulsarui/service/TenantService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class TenantService {
2121
private final PulsarAdmin pulsarAdmin;
2222
private final NamespaceService namespaceService;
2323

24+
@Cacheable("tenants.allNames")
2425
public List<String> getAllNames() throws PulsarApiException {
2526
try {
2627
return pulsarAdmin.tenants().getTenants();
@@ -29,7 +30,7 @@ public List<String> getAllNames() throws PulsarApiException {
2930
}
3031
}
3132

32-
@Cacheable("tenats.getAll")
33+
@Cacheable("tenants.allFiltered")
3334
public List<TenantDto> getAllFiltered(List<String> tenants) throws PulsarApiException {
3435
try {
3536
List<String> tenantNames = pulsarAdmin.tenants().getTenants();

0 commit comments

Comments
 (0)