@@ -118,8 +118,8 @@ public enum SupportedFeature {
118
118
this .predicate = (admin , ver ) -> Mono .just (ver != null && ver >= fromVersion );
119
119
}
120
120
121
- static Mono <Set <SupportedFeature >> forVersion (AdminClient ac , String kafkaVersionStr ) {
122
- @ Nullable Float kafkaVersion = KafkaVersion . parse ( kafkaVersionStr ).orElse (null );
121
+ static Mono <Set <SupportedFeature >> forVersion (AdminClient ac , Optional < String > kafkaVersionStr ) {
122
+ @ Nullable Float kafkaVersion = kafkaVersionStr . flatMap ( KafkaVersion :: parse ).orElse (null );
123
123
return Flux .fromArray (SupportedFeature .values ())
124
124
.flatMap (f -> f .predicate .apply (ac , kafkaVersion ).map (enabled -> Tuples .of (f , enabled )))
125
125
.filter (Tuple2 ::getT2 )
@@ -158,22 +158,24 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
158
158
.flatMap (tuple -> {
159
159
List <ConfigEntry > configs = tuple .getT1 ();
160
160
FeatureMetadata featureMetadata = tuple .getT2 ();
161
- String version = DEFAULT_UNKNOWN_VERSION ;
161
+ Optional < String > version = Optional . empty () ;
162
162
boolean topicDeletionEnabled = true ;
163
163
for (ConfigEntry entry : configs ) {
164
164
if (entry .name ().contains ("inter.broker.protocol.version" )) {
165
- version = entry .value ();
165
+ version = Optional . of ( entry .value () );
166
166
}
167
167
if (entry .name ().equals ("delete.topic.enable" )) {
168
168
topicDeletionEnabled = Boolean .parseBoolean (entry .value ());
169
169
}
170
170
}
171
- FinalizedVersionRange metadataVersion =
172
- featureMetadata .finalizedFeatures ().get ("metadata.version" );
173
- if (metadataVersion != null ) {
174
- version = MetadataVersion .findVersion (metadataVersion .maxVersionLevel (), version );
171
+ if (version .isEmpty ()) {
172
+ FinalizedVersionRange metadataVersion =
173
+ featureMetadata .finalizedFeatures ().get ("metadata.version" );
174
+ if (metadataVersion != null ) {
175
+ version = MetadataVersion .findVersion (metadataVersion .maxVersionLevel ());
176
+ }
175
177
}
176
- final String finalVersion = version ;
178
+ final String finalVersion = version . orElse ( DEFAULT_UNKNOWN_VERSION ) ;
177
179
final boolean finalTopicDeletionEnabled = topicDeletionEnabled ;
178
180
return SupportedFeature .forVersion (ac , version )
179
181
.map (features -> new ConfigRelatedInfo (finalVersion , features , finalTopicDeletionEnabled ));
0 commit comments