Skip to content

Commit 607f3eb

Browse files
authored
Merge pull request #146 from amosproj/sprint-09
Sprint-09
2 parents 607c8f6 + 9dbf156 commit 607f3eb

38 files changed

+1197
-360
lines changed

backend/src/main/java/de/amos/apachepulsarui/controller/MessageController.java

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88

99
import de.amos.apachepulsarui.dto.MessageDto;
1010
import de.amos.apachepulsarui.dto.MessagesDto;
11-
import de.amos.apachepulsarui.exception.BadRequestException;
1211
import de.amos.apachepulsarui.service.MessageService;
1312
import lombok.RequiredArgsConstructor;
1413
import org.springframework.http.HttpStatus;
15-
import org.springframework.http.MediaType;
1614
import org.springframework.http.ResponseEntity;
17-
import org.springframework.web.bind.annotation.*;
15+
import org.springframework.web.bind.annotation.GetMapping;
16+
import org.springframework.web.bind.annotation.RequestMapping;
17+
import org.springframework.web.bind.annotation.RequestParam;
18+
import org.springframework.web.bind.annotation.RestController;
1819

19-
import javax.validation.Valid;
2020
import java.util.List;
2121

2222
@RestController
@@ -27,21 +27,9 @@ public class MessageController {
2727
private final MessageService messageService;
2828

2929
@GetMapping
30-
public ResponseEntity<MessagesDto> getMessages(@RequestParam String topic, @RequestParam String subscription) {
31-
List<MessageDto> messageDtos = messageService.peekMessages(topic, subscription);
30+
public ResponseEntity<MessagesDto> getMessages(@RequestParam String topic,
31+
@RequestParam(required = false, defaultValue = "10") Integer numMessages) {
32+
List<MessageDto> messageDtos = messageService.getLatestMessagesOfTopic(topic, numMessages);
3233
return new ResponseEntity<>(new MessagesDto(messageDtos), HttpStatus.OK);
3334
}
34-
35-
@PostMapping(
36-
value = "/send",
37-
consumes = {MediaType.APPLICATION_JSON_VALUE}
38-
)
39-
public ResponseEntity<Void> sendMessage(@RequestBody @Valid MessageDto messageDto) {
40-
if (messageService.inValidTopicName(messageDto)) {
41-
throw new BadRequestException.InvalidTopicName();
42-
}
43-
messageService.sendMessage(messageDto);
44-
return new ResponseEntity<>(HttpStatus.CREATED);
45-
}
46-
4735
}

backend/src/main/java/de/amos/apachepulsarui/controller/TopicController.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,32 @@ public class TopicController {
3434
@GetMapping("/all")
3535
public ResponseEntity<TopicsDto> getAll(@RequestParam(required = false, defaultValue = "") List<String> tenants,
3636
@RequestParam(required = false, defaultValue = "") List<String> namespaces,
37-
@RequestParam(required = false, defaultValue = "") List<String> topics) {
37+
@RequestParam(required = false, defaultValue = "") List<String> topics,
38+
@RequestParam(required = false, defaultValue = "") String producer,
39+
@RequestParam(required = false, defaultValue = "") List<String> subscriptions) {
40+
41+
List<TopicDto> topicsToReturn;
42+
3843
if (!topics.isEmpty()) {
39-
return wrapInEntity(getAllForTopics(topics));
44+
topicsToReturn = getAllForTopics(topics);
4045
} else if (!namespaces.isEmpty()) {
41-
return wrapInEntity(getAllForNamespaces(namespaces));
46+
topicsToReturn = getAllForNamespaces(namespaces);
4247
} else {
43-
return wrapInEntity(getAllForTenants(tenants));
48+
topicsToReturn = getAllForTenants(tenants);
49+
}
50+
51+
if (!producer.isEmpty()) {
52+
topicsToReturn = topicService.getTopicForProducer(topicsToReturn, producer);
53+
}
54+
if (!subscriptions.isEmpty()) {
55+
topicsToReturn = topicService.getAllForSubscriptions(topicsToReturn, subscriptions);
4456
}
57+
return wrapInEntity(topicsToReturn);
4558
}
4659

4760
@GetMapping
4861
public ResponseEntity<TopicDetailDto> getTopicDetails(@RequestParam String name) {
49-
return new ResponseEntity<>(topicService.getTopicDetails(name), HttpStatus.OK);
62+
return new ResponseEntity<>(topicService.getTopicDetails(name), HttpStatus.OK);
5063
}
5164

5265
@GetMapping("/subscription/{subscription}")

backend/src/main/java/de/amos/apachepulsarui/dto/ProducerDto.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import lombok.Data;
1212
import org.apache.pulsar.common.policies.data.PublisherStats;
1313

14-
import java.util.List;
15-
1614
@Data
1715
@AllArgsConstructor
1816
@Builder(access = AccessLevel.PRIVATE)
@@ -22,9 +20,8 @@ public class ProducerDto {
2220

2321
private String name;
2422

25-
private List<MessageDto> messagesDto;
26-
27-
private long amountOfMessages;
23+
//todo figure out how to get the amount of messages
24+
//private long amountOfMessages;
2825

2926
private String address;
3027

@@ -34,12 +31,11 @@ public class ProducerDto {
3431

3532
private String connectedSince;
3633

37-
public static ProducerDto create(PublisherStats publisherStats, List<MessageDto> messages) {
34+
public static ProducerDto create(PublisherStats publisherStats) {
3835
return ProducerDto.builder()
3936
.id(publisherStats.getProducerId())
4037
.name(publisherStats.getProducerName())
41-
.messagesDto(messages)
42-
.amountOfMessages(messages.size())
38+
//.amountOfMessages(numOfMsgs)
4339
.connectedSince(publisherStats.getConnectedSince())
4440
.address(publisherStats.getAddress())
4541
.averageMsgSize(publisherStats.getAverageMsgSize())
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SPDX-License-Identifier: MIT
3+
* SPDX-FileCopyrightText: 2023 Jonas Arnhold <jonasarnhold@web.de>
4+
*/
5+
6+
package de.amos.apachepulsarui.dto;
7+
8+
import lombok.Builder;
9+
import lombok.Value;
10+
import org.apache.pulsar.common.schema.SchemaInfo;
11+
import org.apache.pulsar.common.schema.SchemaType;
12+
13+
import java.sql.Timestamp;
14+
import java.util.Map;
15+
16+
@Value
17+
@Builder
18+
public class SchemaInfoDto {
19+
20+
String name;
21+
long version;
22+
SchemaType type;
23+
Map<String, String> properties;
24+
String schemaDefinition;
25+
Timestamp timestamp;
26+
27+
public static SchemaInfoDto create(SchemaInfo schemaInfo, Long version) {
28+
return SchemaInfoDto.builder()
29+
.name(schemaInfo.getName())
30+
.version(version)
31+
.type(schemaInfo.getType())
32+
.properties(schemaInfo.getProperties())
33+
.schemaDefinition(schemaInfo.getSchemaDefinition())
34+
.timestamp(new Timestamp(schemaInfo.getTimestamp()))
35+
.build();
36+
}
37+
38+
}

backend/src/main/java/de/amos/apachepulsarui/dto/SubscriptionDto.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ public class SubscriptionDto {
2424

2525
private List<String> inactiveConsumers;
2626

27-
private List<MessageDto> messages;
28-
2927
private long numberConsumers;
3028

3129
private double msgAckRate;
@@ -42,7 +40,7 @@ public class SubscriptionDto {
4240

4341
private String type;
4442

45-
public static SubscriptionDto create(SubscriptionStats subscriptionStats, List<MessageDto> messages, String name) {
43+
public static SubscriptionDto create(SubscriptionStats subscriptionStats, String name) {
4644
List<String> consumers = getConsumers(subscriptionStats);
4745
String active = consumers.stream()
4846
.filter(c -> Objects.equals(c, subscriptionStats.getActiveConsumerName()))
@@ -53,7 +51,6 @@ public static SubscriptionDto create(SubscriptionStats subscriptionStats, List<M
5351
.toList();
5452
return SubscriptionDto.builder()
5553
.name(name)
56-
.messages(messages)
5754
.activeConsumer(active)
5855
.inactiveConsumers(inactive)
5956
.msgAckRate(subscriptionStats.getMessageAckRate())

backend/src/main/java/de/amos/apachepulsarui/dto/TopicDetailDto.java

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,58 +12,56 @@
1212
import org.apache.pulsar.common.policies.data.TopicStats;
1313

1414
import java.util.ArrayList;
15-
import java.util.List;
15+
import java.util.Collections;
16+
import java.util.Comparator;
17+
import java.util.List;
1618

1719
@Data
1820
@NoArgsConstructor(access = AccessLevel.PRIVATE)
1921
public class TopicDetailDto {
2022

2123
private String name;
2224

23-
private String localName;
25+
private String localName;
2426

25-
private String namespace;
27+
private String namespace;
2628

27-
private String tenant;
29+
private String tenant;
2830

29-
private boolean isPersistent;
31+
private boolean isPersistent;
3032

31-
private String ownerBroker;
33+
private String ownerBroker;
3234

33-
private TopicStatsDto topicStatsDto;
35+
private TopicStatsDto topicStatsDto;
3436

35-
private long producedMessages;
37+
private List<SchemaInfoDto> schemaInfos = new ArrayList<>();
3638

37-
private long consumedMessages;
38-
39-
private List<MessageDto> messagesDto = new ArrayList<>();
40-
41-
42-
/**
43-
* Converts a valid complete topic name (like "persistent://eu-tenant/hr/fizzbuzz" into a {@link TopicDetailDto}.
44-
*/
45-
public static TopicDetailDto create(String completeTopicName, TopicStats topicStats, String ownerBroker) {
39+
public static TopicDetailDto create(
40+
String completeTopicName,
41+
TopicStats topicStats,
42+
String ownerBroker,
43+
List<SchemaInfoDto> schemaInfos
44+
) {
4645
TopicName topicName = TopicName.get(completeTopicName);
47-
TopicDetailDto topicDetailDto = new TopicDetailDto();
48-
topicDetailDto.name = topicName.toString();
49-
topicDetailDto.localName = topicName.getLocalName();
50-
topicDetailDto.namespace = topicName.getNamespacePortion();
51-
topicDetailDto.tenant = topicName.getTenant();
52-
topicDetailDto.isPersistent = topicName.isPersistent();
53-
topicDetailDto.ownerBroker = ownerBroker;
54-
topicDetailDto.setTopicStatsDto(TopicStatsDto.createTopicStatsDto(topicStats));
55-
topicDetailDto.producedMessages = topicDetailDto.topicStatsDto.getProducedMesages();
56-
topicDetailDto.consumedMessages = topicDetailDto.topicStatsDto.getConsumedMessages();
57-
58-
return topicDetailDto;
46+
TopicDetailDto topicDetailDto = new TopicDetailDto();
47+
48+
topicDetailDto.name = topicName.toString();
49+
topicDetailDto.localName = topicName.getLocalName();
50+
topicDetailDto.namespace = topicName.getNamespacePortion();
51+
topicDetailDto.tenant = topicName.getTenant();
52+
topicDetailDto.isPersistent = topicName.isPersistent();
53+
topicDetailDto.ownerBroker = ownerBroker;
54+
55+
topicDetailDto.setTopicStatsDto(TopicStatsDto.createTopicStatsDto(topicStats));
56+
57+
// we want to put the latest schema version on top of the list
58+
if (!schemaInfos.isEmpty()) {
59+
List<SchemaInfoDto> schemaInfosSorted = schemaInfos.stream()
60+
.sorted(Comparator.comparing(SchemaInfoDto::getVersion, Collections.reverseOrder()))
61+
.toList();
62+
topicDetailDto.setSchemaInfos(schemaInfosSorted);
63+
}
64+
65+
return topicDetailDto;
5966
}
60-
61-
public static TopicDetailDto createTopicDtoWithMessages(String completeTopicName, TopicStats topicStats, String ownerBroker, List<MessageDto> messages) {
62-
TopicDetailDto topicDto = create(completeTopicName, topicStats, ownerBroker);
63-
topicDto.setMessagesDto(messages);
64-
return topicDto;
65-
}
66-
67-
68-
6967
}

backend/src/main/java/de/amos/apachepulsarui/dto/TopicDto.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,48 @@
55

66
package de.amos.apachepulsarui.dto;
77

8-
import lombok.AccessLevel;
8+
import lombok.Builder;
99
import lombok.Data;
10-
import lombok.NoArgsConstructor;
1110
import org.apache.pulsar.common.naming.TopicName;
11+
import org.apache.pulsar.common.policies.data.PublisherStats;
12+
import org.apache.pulsar.common.policies.data.TopicStats;
13+
14+
import java.util.ArrayList;
15+
import java.util.List;
16+
import java.util.Set;
1217

1318
@Data
14-
@NoArgsConstructor(access = AccessLevel.PRIVATE)
19+
@Builder
1520
public class TopicDto {
1621

1722
private String name;
1823

19-
private String namespace;
24+
private String namespace;
25+
26+
private String tenant;
27+
28+
private Set<String> subscriptions;
2029

21-
private String tenant;
30+
private List<String> producers;
31+
32+
public static TopicDto create(String completeTopicName, TopicStats topicStats) {
33+
Set<String> subscriptions = topicStats.getSubscriptions().keySet();
34+
List<String> producers = getProducers(topicStats);
2235

23-
public static TopicDto create(String completeTopicName) {
2436
TopicName topicName = TopicName.get(completeTopicName);
25-
TopicDto topicDetailDto = new TopicDto();
26-
topicDetailDto.name = topicName.toString();
27-
topicDetailDto.namespace = topicName.getNamespace();
28-
topicDetailDto.tenant = topicName.getTenant();
37+
return TopicDto.builder()
38+
.name(topicName.toString())
39+
.namespace(topicName.getNamespace())
40+
.tenant(topicName.getTenant())
41+
.producers(producers)
42+
.subscriptions(subscriptions)
43+
.build();
44+
}
2945

30-
return topicDetailDto;
46+
private static List<String> getProducers(TopicStats topicStats) {
47+
List<PublisherStats> publisherStats = new ArrayList<>(topicStats.getPublishers());
48+
return publisherStats.stream()
49+
.map(PublisherStats::getProducerName)
50+
.toList();
3151
}
3252
}

backend/src/main/java/de/amos/apachepulsarui/dto/TopicStatsDto.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public static TopicStatsDto createTopicStatsDto(TopicStats topicStats) {
5353

5454
}
5555

56-
public static List<String> getProducers(TopicStats topicStats) {
56+
private static List<String> getProducers(TopicStats topicStats) {
5757
List<PublisherStats> publisherStats = new ArrayList<>(topicStats.getPublishers());
5858
return publisherStats.stream()
5959
.map(PublisherStats::getProducerName)

backend/src/main/java/de/amos/apachepulsarui/service/ClusterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class ClusterService {
2121
private final PulsarAdmin pulsarAdmin;
2222
private final TenantService tenantService;
2323

24-
@Cacheable("cluster.all")
24+
@Cacheable("cluster.allNames")
2525
public List<String> getAllNames() {
2626
try {
2727
return pulsarAdmin.clusters().getClusters();

0 commit comments

Comments
 (0)