Skip to content

Commit d24f10f

Browse files
authored
Merge pull request #138 from amosproj/feature/add-schema-to-topic-details-endpoint
#114: Add Schema to Topic Details Endpoint
2 parents a0222d6 + 807bca1 commit d24f10f

File tree

9 files changed

+139
-70
lines changed

9 files changed

+139
-70
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SPDX-License-Identifier: MIT
3+
* SPDX-FileCopyrightText: 2023 Jonas Arnhold <jonasarnhold@web.de>
4+
*/
5+
6+
package de.amos.apachepulsarui.dto;
7+
8+
import lombok.Builder;
9+
import lombok.Value;
10+
import org.apache.pulsar.common.schema.SchemaInfo;
11+
import org.apache.pulsar.common.schema.SchemaType;
12+
13+
import java.sql.Timestamp;
14+
import java.util.Map;
15+
16+
@Value
17+
@Builder
18+
public class SchemaInfoDto {
19+
20+
String name;
21+
long version;
22+
SchemaType type;
23+
Map<String, String> properties;
24+
String schemaDefinition;
25+
Timestamp timestamp;
26+
27+
public static SchemaInfoDto create(SchemaInfo schemaInfo, Long version) {
28+
return SchemaInfoDto.builder()
29+
.name(schemaInfo.getName())
30+
.version(version)
31+
.type(schemaInfo.getType())
32+
.properties(schemaInfo.getProperties())
33+
.schemaDefinition(schemaInfo.getSchemaDefinition())
34+
.timestamp(new Timestamp(schemaInfo.getTimestamp()))
35+
.build();
36+
}
37+
38+
}

backend/src/main/java/de/amos/apachepulsarui/dto/TopicDetailDto.java

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.apache.pulsar.common.policies.data.TopicStats;
1313

1414
import java.util.ArrayList;
15+
import java.util.Collections;
16+
import java.util.Comparator;
1517
import java.util.List;
1618

1719
@Data
@@ -20,50 +22,47 @@ public class TopicDetailDto {
2022

2123
private String name;
2224

23-
private String localName;
25+
private String localName;
2426

25-
private String namespace;
27+
private String namespace;
2628

27-
private String tenant;
29+
private String tenant;
2830

29-
private boolean isPersistent;
31+
private boolean isPersistent;
3032

31-
private String ownerBroker;
33+
private String ownerBroker;
3234

33-
private TopicStatsDto topicStatsDto;
35+
private TopicStatsDto topicStatsDto;
3436

35-
private long producedMessages;
37+
private List<SchemaInfoDto> schemaInfos = new ArrayList<>();
3638

37-
private long consumedMessages;
38-
39-
private List<MessageDto> messagesDto = new ArrayList<>();
40-
41-
42-
/**
43-
* Converts a valid complete topic name (like "persistent://eu-tenant/hr/fizzbuzz" into a {@link TopicDetailDto}.
44-
*/
45-
public static TopicDetailDto create(String completeTopicName, TopicStats topicStats, String ownerBroker) {
39+
public static TopicDetailDto create(
40+
String completeTopicName,
41+
TopicStats topicStats,
42+
String ownerBroker,
43+
List<SchemaInfoDto> schemaInfos
44+
) {
4645
TopicName topicName = TopicName.get(completeTopicName);
47-
TopicDetailDto topicDetailDto = new TopicDetailDto();
48-
topicDetailDto.name = topicName.toString();
49-
topicDetailDto.localName = topicName.getLocalName();
50-
topicDetailDto.namespace = topicName.getNamespacePortion();
51-
topicDetailDto.tenant = topicName.getTenant();
52-
topicDetailDto.isPersistent = topicName.isPersistent();
53-
topicDetailDto.ownerBroker = ownerBroker;
54-
topicDetailDto.setTopicStatsDto(TopicStatsDto.createTopicStatsDto(topicStats));
55-
topicDetailDto.producedMessages = topicDetailDto.topicStatsDto.getProducedMesages();
56-
topicDetailDto.consumedMessages = topicDetailDto.topicStatsDto.getConsumedMessages();
57-
58-
return topicDetailDto;
46+
TopicDetailDto topicDetailDto = new TopicDetailDto();
47+
48+
topicDetailDto.name = topicName.toString();
49+
topicDetailDto.localName = topicName.getLocalName();
50+
topicDetailDto.namespace = topicName.getNamespacePortion();
51+
topicDetailDto.tenant = topicName.getTenant();
52+
topicDetailDto.isPersistent = topicName.isPersistent();
53+
topicDetailDto.ownerBroker = ownerBroker;
54+
55+
topicDetailDto.setTopicStatsDto(TopicStatsDto.createTopicStatsDto(topicStats));
56+
57+
// we want to put the latest schema version on top of the list
58+
if (!schemaInfos.isEmpty()) {
59+
List<SchemaInfoDto> schemaInfosSorted = schemaInfos.stream()
60+
.sorted(Comparator.comparing(SchemaInfoDto::getVersion, Collections.reverseOrder()))
61+
.toList();
62+
topicDetailDto.setSchemaInfos(schemaInfosSorted);
63+
}
64+
65+
return topicDetailDto;
5966
}
6067

61-
public static TopicDetailDto createTopicDtoWithMessages(String completeTopicName, TopicStats topicStats, String ownerBroker, List<MessageDto> messages) {
62-
TopicDetailDto topicDto = create(completeTopicName, topicStats, ownerBroker);
63-
topicDto.setMessagesDto(messages);
64-
return topicDto;
65-
}
66-
67-
68-
6968
}

backend/src/main/java/de/amos/apachepulsarui/service/ClusterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class ClusterService {
2121
private final PulsarAdmin pulsarAdmin;
2222
private final TenantService tenantService;
2323

24-
@Cacheable("cluster.all")
24+
@Cacheable("cluster.allNames")
2525
public List<String> getAllNames() {
2626
try {
2727
return pulsarAdmin.clusters().getClusters();

backend/src/main/java/de/amos/apachepulsarui/service/NamespaceService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.pulsar.client.admin.Namespaces;
1414
import org.apache.pulsar.client.admin.PulsarAdmin;
1515
import org.apache.pulsar.client.admin.PulsarAdminException;
16+
import org.springframework.cache.annotation.Cacheable;
1617
import org.springframework.stereotype.Service;
1718

1819
import java.util.List;
@@ -46,10 +47,12 @@ public List<NamespaceDto> getAllForTenants(List<String> tenants) {
4647
.toList();
4748
}
4849

50+
@Cacheable("namespace.detail")
4951
public NamespaceDetailDto getNamespaceDetails(String namespaceName) {
5052
return enrichWithNamespaceData(NamespaceDetailDto.fromString(namespaceName));
5153
}
5254

55+
@Cacheable("namespace.allNames")
5356
public List<String> getAllOfTenant(String tenantName) throws PulsarApiException {
5457
try {
5558
return pulsarAdmin.namespaces().getNamespaces(tenantName);

backend/src/main/java/de/amos/apachepulsarui/service/TenantService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class TenantService {
2121
private final PulsarAdmin pulsarAdmin;
2222
private final NamespaceService namespaceService;
2323

24+
@Cacheable("tenants.allNames")
2425
public List<String> getAllNames() throws PulsarApiException {
2526
try {
2627
return pulsarAdmin.tenants().getTenants();
@@ -29,7 +30,7 @@ public List<String> getAllNames() throws PulsarApiException {
2930
}
3031
}
3132

32-
@Cacheable("tenats.getAll")
33+
@Cacheable("tenants.allFiltered")
3334
public List<TenantDto> getAllFiltered(List<String> tenants) throws PulsarApiException {
3435
try {
3536
List<String> tenantNames = pulsarAdmin.tenants().getTenants();

backend/src/main/java/de/amos/apachepulsarui/service/TopicService.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import de.amos.apachepulsarui.dto.ConsumerDto;
1010
import de.amos.apachepulsarui.dto.MessageDto;
1111
import de.amos.apachepulsarui.dto.ProducerDto;
12+
import de.amos.apachepulsarui.dto.SchemaInfoDto;
1213
import de.amos.apachepulsarui.dto.SubscriptionDto;
1314
import de.amos.apachepulsarui.dto.TopicDetailDto;
1415
import de.amos.apachepulsarui.dto.TopicDto;
@@ -22,7 +23,7 @@
2223
import org.apache.pulsar.common.policies.data.ConsumerStats;
2324
import org.apache.pulsar.common.policies.data.PublisherStats;
2425
import org.apache.pulsar.common.policies.data.TopicStats;
25-
import org.springframework.cache.annotation.Cacheable;
26+
import org.apache.pulsar.common.schema.SchemaInfo;
2627
import org.springframework.stereotype.Service;
2728

2829
import java.util.List;
@@ -64,7 +65,6 @@ public List<TopicDto> getAllForNamespaces(List<String> namespaces) {
6465
* @param namespace The namespace you want to get a list of all topics for.
6566
* @return A list of topics (their fully qualified names).
6667
*/
67-
@Cacheable("topic.allNamesByNamespace")
6868
public List<String> getAllByNamespace(String namespace) {
6969
return getByNamespace(namespace);
7070
}
@@ -73,7 +73,7 @@ private List<String> getByNamespace(String namespace) throws PulsarApiException
7373
try {
7474
return pulsarAdmin.topics().getList(namespace);
7575
} catch (PulsarAdminException e) {
76-
throw new PulsarApiException("Could not fetch topics of namespace '%s'".formatted(namespace), e);
76+
throw new PulsarApiException("Could not fetch topics of namespace '%s'".formatted(namespace), e);
7777
}
7878
}
7979

@@ -91,18 +91,12 @@ public void createNewTopic(String topic) throws PulsarApiException {
9191
* additional metadata.
9292
*/
9393
public TopicDetailDto getTopicDetails(String topicName) throws PulsarApiException {
94-
try {
95-
List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);
96-
List<MessageDto> messages = subscriptions.stream().
97-
flatMap(sub -> messageService.peekMessages(topicName, sub).stream()).toList();
98-
return TopicDetailDto.createTopicDtoWithMessages(topicName,
99-
getTopicStats(topicName),
100-
getOwnerBroker(topicName),
101-
messages);
102-
103-
} catch (PulsarAdminException e) {
104-
throw new PulsarApiException("Could not fetch topic details for topic '%s'".formatted(topicName), e);
105-
}
94+
return TopicDetailDto.create(
95+
topicName,
96+
getTopicStats(topicName),
97+
getOwnerBroker(topicName),
98+
getSchemasOfTopic(topicName)
99+
);
106100
}
107101

108102
private TopicStats getTopicStats(String topicName) throws PulsarApiException {
@@ -121,6 +115,30 @@ private String getOwnerBroker(String topicName) throws PulsarApiException {
121115
}
122116
}
123117

118+
private List<SchemaInfoDto> getSchemasOfTopic(String topicName) {
119+
try {
120+
return pulsarAdmin.schemas().getAllSchemas(topicName).stream()
121+
.map(schemaInfo -> {
122+
Long versionBySchemaInfo = getVersionBySchemaInfo(topicName, schemaInfo);
123+
return SchemaInfoDto.create(schemaInfo, versionBySchemaInfo);
124+
})
125+
.toList();
126+
} catch (PulsarAdminException e) {
127+
throw new PulsarApiException("Could not fetch all schemas for topic %s".formatted(topicName), e);
128+
}
129+
}
130+
131+
private Long getVersionBySchemaInfo(String topicName, SchemaInfo schemaInfo) {
132+
try {
133+
return pulsarAdmin.schemas().getVersionBySchema(topicName, schemaInfo);
134+
} catch (PulsarAdminException e) {
135+
throw new PulsarApiException(
136+
"Could not fetch version by schema info %s for topic %s".formatted(schemaInfo.getName(), topicName),
137+
e
138+
);
139+
}
140+
}
141+
124142
public ProducerDto getProducerByTopic(String topic, String producer) {
125143
TopicStats topicStats = getTopicStats(topic);
126144
PublisherStats publisherStats = topicStats.getPublishers().stream()
@@ -129,7 +147,6 @@ public ProducerDto getProducerByTopic(String topic, String producer) {
129147
return create(publisherStats, getMessagesByProducer(topic, topicStats.getSubscriptions().keySet(), producer));
130148
}
131149

132-
133150
private List<MessageDto> getMessagesByProducer(String topic, Set<String> subscriptions, String producer) {
134151
return subscriptions.stream()
135152
.flatMap(s -> messageService.peekMessages(topic, s).stream())

backend/src/test/java/de/amos/apachepulsarui/controller/TopicControllerTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,12 @@ void getAll_withTopics_returnsTopics() throws Exception {
154154
void getTopicDetails() throws Exception {
155155
String name = "grogu";
156156
String fullTopic = "persistent://public/default/grogu";
157-
TopicDetailDto topic = TopicDetailDto.create(name, topicStats, RandomString.make(1));
157+
TopicDetailDto topic = TopicDetailDto.create(
158+
name,
159+
topicStats,
160+
RandomString.make(1),
161+
List.of()
162+
);
158163

159164
when(topicService.getTopicDetails(fullTopic)).thenReturn(topic);
160165

backend/src/test/java/de/amos/apachepulsarui/service/TopicServiceIntegrationTest.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.nio.charset.StandardCharsets;
2121
import java.util.List;
22-
import java.util.concurrent.CompletableFuture;
2322

2423
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
2524
public class TopicServiceIntegrationTest extends AbstractIntegrationTest {
@@ -84,25 +83,22 @@ void getTopicDetails_returnsMessageCount() throws Exception {
8483
topicService.createNewTopic(topicName);
8584
var message = "hello world".getBytes(StandardCharsets.UTF_8);
8685
try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
87-
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("TestSubscriber").subscribe()) {
88-
// receive needs to happen before send, but we don't want to block -> async
89-
// after the message was sent, we need to ensure it was received -> .get()
90-
CompletableFuture<Message<byte[]>> consume1 = consumer.receiveAsync();
91-
producer.sendAsync(message);
92-
consume1.get();
86+
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("TestSubscriber").subscribe())
87+
{
88+
producer.send(message);
89+
producer.send(message);
90+
producer.send(message);
9391

94-
CompletableFuture<Message<byte[]>> consume2 = consumer.receiveAsync();
95-
producer.sendAsync(message);
96-
consume2.get();
92+
consumer.receive();
93+
consumer.receive();
9794

98-
producer.send(message);
9995
consumer.close();
10096
producer.close();
10197
}
10298
TopicDetailDto topic = topicService.getTopicDetails(topicName);
10399
// it seems there is no exactly once guarantees in pulsar which made the test flaky -> use greaterThan instead of equals
104-
Assertions.assertThat(topic.getProducedMessages()).isGreaterThanOrEqualTo(3);
105-
Assertions.assertThat(topic.getConsumedMessages()).isGreaterThanOrEqualTo(2);
100+
Assertions.assertThat(topic.getTopicStatsDto().getProducedMesages()).isGreaterThanOrEqualTo(3);
101+
Assertions.assertThat(topic.getTopicStatsDto().getConsumedMessages()).isGreaterThanOrEqualTo(2);
106102
}
107103

108104
@Test

backend/src/test/java/de/amos/apachepulsarui/service/TopicServiceTest.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.pulsar.client.admin.Lookup;
1111
import org.apache.pulsar.client.admin.PulsarAdmin;
1212
import org.apache.pulsar.client.admin.PulsarAdminException;
13+
import org.apache.pulsar.client.admin.Schemas;
1314
import org.apache.pulsar.client.admin.Topics;
1415
import org.apache.pulsar.common.naming.TopicName;
1516
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -36,6 +37,8 @@ class TopicServiceTest {
3637
@Mock
3738
private Topics topics;
3839
@Mock
40+
private Schemas schemas;
41+
@Mock
3942
private PulsarAdmin pulsarAdmin;
4043
@Mock
4144
private TopicStats topicStats;
@@ -87,6 +90,11 @@ private void whenOwnerBroker() throws PulsarAdminException {
8790
when(pulsarAdmin.lookups().lookupTopic(TOPIC_NAME)).thenReturn(BROKER);
8891
}
8992

93+
private void whenSchemas() throws PulsarAdminException {
94+
when(pulsarAdmin.schemas()).thenReturn(schemas);
95+
when(pulsarAdmin.schemas().getAllSchemas(TOPIC_NAME)).thenReturn(List.of());
96+
}
97+
9098
@Test
9199
void createNewTopic() throws PulsarAdminException {
92100
when(pulsarAdmin.topics()).thenReturn(topics);
@@ -97,15 +105,17 @@ void createNewTopic() throws PulsarAdminException {
97105
}
98106

99107
@Test
100-
void getTopicWithMessagesByName() throws PulsarAdminException {
108+
void getTopicDetails() throws PulsarAdminException {
101109
whenTopicStats();
102110
whenOwnerBroker();
111+
whenSchemas();
103112

104113
topicService.getTopicDetails(TOPIC_NAME);
105114

106115
topicDtoMockedStatic.verify(
107-
() -> TopicDetailDto.createTopicDtoWithMessages(TOPIC_NAME, topicStats, BROKER, List.of()),
116+
() -> TopicDetailDto.create(TOPIC_NAME, topicStats, BROKER, List.of()),
108117
times(1)
109118
);
110119
}
120+
111121
}

0 commit comments

Comments
 (0)