Skip to content

Commit ed0b77d

Browse files
committed
Support version getter from metadata.version
1 parent 9ea1a4e commit ed0b77d

File tree

2 files changed

+62
-2
lines changed

2 files changed

+62
-2
lines changed

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.kafbat.ui.exception.NotFoundException;
1414
import io.kafbat.ui.exception.ValidationException;
1515
import io.kafbat.ui.util.KafkaVersion;
16+
import io.kafbat.ui.util.MetadataVersion;
1617
import io.kafbat.ui.util.annotation.KafkaClientInternalsDependant;
1718
import java.io.Closeable;
1819
import java.time.Duration;
@@ -49,6 +50,8 @@
4950
import org.apache.kafka.clients.admin.DescribeClusterOptions;
5051
import org.apache.kafka.clients.admin.DescribeClusterResult;
5152
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
53+
import org.apache.kafka.clients.admin.FeatureMetadata;
54+
import org.apache.kafka.clients.admin.FinalizedVersionRange;
5255
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
5356
import org.apache.kafka.clients.admin.ListOffsetsResult;
5457
import org.apache.kafka.clients.admin.ListTopicsOptions;
@@ -96,6 +99,7 @@
9699
@Slf4j
97100
@AllArgsConstructor
98101
public class ReactiveAdminClient implements Closeable {
102+
private final static String DEFAULT_UNKNOWN_VERSION = "Unknown";
99103

100104
public enum SupportedFeature {
101105
INCREMENTAL_ALTER_CONFIGS(2.3f),
@@ -150,8 +154,11 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
150154
.orElse(desc.getNodes().iterator().next().id());
151155
return loadBrokersConfig(ac, List.of(targetNodeId))
152156
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
153-
.flatMap(configs -> {
154-
String version = "1.0-UNKNOWN";
157+
.zipWith(toMono(ac.describeFeatures().featureMetadata()))
158+
.flatMap(tuple -> {
159+
List<ConfigEntry> configs = tuple.getT1();
160+
FeatureMetadata featureMetadata = tuple.getT2();
161+
String version = DEFAULT_UNKNOWN_VERSION;
155162
boolean topicDeletionEnabled = true;
156163
for (ConfigEntry entry : configs) {
157164
if (entry.name().contains("inter.broker.protocol.version")) {
@@ -161,6 +168,11 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
161168
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
162169
}
163170
}
171+
FinalizedVersionRange metadataVersion =
172+
featureMetadata.finalizedFeatures().get("metadata.version");
173+
if (metadataVersion != null) {
174+
version = MetadataVersion.findVersion(metadataVersion.maxVersionLevel(), version);
175+
}
164176
final String finalVersion = version;
165177
final boolean finalTopicDeletionEnabled = topicDeletionEnabled;
166178
return SupportedFeature.forVersion(ac, version)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.kafbat.ui.util;
2+
3+
import java.util.Arrays;
4+
5+
public enum MetadataVersion {
6+
IBP_3_0_IV1(1, "3.0-IV1"),
7+
IBP_3_1_IV0(2, "3.1-IV0"),
8+
IBP_3_2_IV0(3, "3.2-IV0"),
9+
IBP_3_3_IV0(4, "3.3-IV0"),
10+
IBP_3_3_IV1(5, "3.3-IV1"),
11+
IBP_3_3_IV2(6, "3.3-IV2"),
12+
IBP_3_3_IV3(7, "3.3-IV3"),
13+
IBP_3_4_IV0(8, "3.4-IV0"),
14+
IBP_3_5_IV0(9, "3.5-IV0"),
15+
IBP_3_5_IV1(10, "3.5-IV1"),
16+
IBP_3_5_IV2(11, "3.5-IV2"),
17+
IBP_3_6_IV0(12, "3.6-IV0"),
18+
IBP_3_6_IV1(13, "3.6-IV1"),
19+
IBP_3_6_IV2(14, "3.6-IV2"),
20+
IBP_3_7_IV0(15, "3.7-IV0"),
21+
IBP_3_7_IV1(16, "3.7-IV1"),
22+
IBP_3_7_IV2(17, "3.7-IV2"),
23+
IBP_3_7_IV3(18, "3.7-IV3"),
24+
IBP_3_7_IV4(19, "3.7-IV4"),
25+
IBP_3_8_IV0(20, "3.8-IV0"),
26+
IBP_3_9_IV0(21, "3.9-IV0"),
27+
IBP_4_0_IV0(22, "4.0-IV0"),
28+
IBP_4_0_IV1(23, "4.0-IV1"),
29+
IBP_4_0_IV2(24, "4.0-IV2"),
30+
IBP_4_0_IV3(25, "4.0-IV3"),
31+
IBP_4_1_IV0(26, "4.1-IV0");
32+
33+
private final int featureLevel;
34+
private final String release;
35+
36+
MetadataVersion(int featureLevel, String release) {
37+
this.featureLevel = featureLevel;
38+
this.release = release;
39+
}
40+
41+
public static String findVersion(int featureLevel, String defaultValue) {
42+
return Arrays.stream(values())
43+
.filter(v -> v.featureLevel == featureLevel)
44+
.findFirst().map(v -> v.release)
45+
.orElse(defaultValue);
46+
}
47+
48+
}

0 commit comments

Comments
 (0)