13
13
import io .kafbat .ui .exception .NotFoundException ;
14
14
import io .kafbat .ui .exception .ValidationException ;
15
15
import io .kafbat .ui .util .KafkaVersion ;
16
+ import io .kafbat .ui .util .MetadataVersion ;
16
17
import io .kafbat .ui .util .annotation .KafkaClientInternalsDependant ;
17
18
import java .io .Closeable ;
18
19
import java .time .Duration ;
49
50
import org .apache .kafka .clients .admin .DescribeClusterOptions ;
50
51
import org .apache .kafka .clients .admin .DescribeClusterResult ;
51
52
import org .apache .kafka .clients .admin .DescribeConfigsOptions ;
53
+ import org .apache .kafka .clients .admin .FeatureMetadata ;
54
+ import org .apache .kafka .clients .admin .FinalizedVersionRange ;
52
55
import org .apache .kafka .clients .admin .ListConsumerGroupOffsetsSpec ;
53
56
import org .apache .kafka .clients .admin .ListOffsetsResult ;
54
57
import org .apache .kafka .clients .admin .ListTopicsOptions ;
96
99
@ Slf4j
97
100
@ AllArgsConstructor
98
101
public class ReactiveAdminClient implements Closeable {
102
+ private static final String DEFAULT_UNKNOWN_VERSION = "Unknown" ;
99
103
100
104
public enum SupportedFeature {
101
105
INCREMENTAL_ALTER_CONFIGS (2.3f ),
@@ -114,8 +118,8 @@ public enum SupportedFeature {
114
118
this .predicate = (admin , ver ) -> Mono .just (ver != null && ver >= fromVersion );
115
119
}
116
120
117
- static Mono <Set <SupportedFeature >> forVersion (AdminClient ac , String kafkaVersionStr ) {
118
- @ Nullable Float kafkaVersion = KafkaVersion . parse ( kafkaVersionStr ).orElse (null );
121
+ static Mono <Set <SupportedFeature >> forVersion (AdminClient ac , Optional < String > kafkaVersionStr ) {
122
+ @ Nullable Float kafkaVersion = kafkaVersionStr . flatMap ( KafkaVersion :: parse ).orElse (null );
119
123
return Flux .fromArray (SupportedFeature .values ())
120
124
.flatMap (f -> f .predicate .apply (ac , kafkaVersion ).map (enabled -> Tuples .of (f , enabled )))
121
125
.filter (Tuple2 ::getT2 )
@@ -150,18 +154,28 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
150
154
.orElse (desc .getNodes ().iterator ().next ().id ());
151
155
return loadBrokersConfig (ac , List .of (targetNodeId ))
152
156
.map (map -> map .isEmpty () ? List .<ConfigEntry >of () : map .get (targetNodeId ))
153
- .flatMap (configs -> {
154
- String version = "1.0-UNKNOWN" ;
157
+ .zipWith (toMono (ac .describeFeatures ().featureMetadata ()))
158
+ .flatMap (tuple -> {
159
+ List <ConfigEntry > configs = tuple .getT1 ();
160
+ FeatureMetadata featureMetadata = tuple .getT2 ();
161
+ Optional <String > version = Optional .empty ();
155
162
boolean topicDeletionEnabled = true ;
156
163
for (ConfigEntry entry : configs ) {
157
164
if (entry .name ().contains ("inter.broker.protocol.version" )) {
158
- version = entry .value ();
165
+ version = Optional . of ( entry .value () );
159
166
}
160
167
if (entry .name ().equals ("delete.topic.enable" )) {
161
168
topicDeletionEnabled = Boolean .parseBoolean (entry .value ());
162
169
}
163
170
}
164
- final String finalVersion = version ;
171
+ if (version .isEmpty ()) {
172
+ FinalizedVersionRange metadataVersion =
173
+ featureMetadata .finalizedFeatures ().get ("metadata.version" );
174
+ if (metadataVersion != null ) {
175
+ version = MetadataVersion .findVersion (metadataVersion .maxVersionLevel ());
176
+ }
177
+ }
178
+ final String finalVersion = version .orElse (DEFAULT_UNKNOWN_VERSION );
165
179
final boolean finalTopicDeletionEnabled = topicDeletionEnabled ;
166
180
return SupportedFeature .forVersion (ac , version )
167
181
.map (features -> new ConfigRelatedInfo (finalVersion , features , finalTopicDeletionEnabled ));
0 commit comments