Skip to content

Commit ca7aaff

Browse files
committed
SchemaRegistrySerde: Avro deserialization via topic name
Avro deserialization without magic byte using lookup by topic name as fallback like in serialization Closes provectus#4520
1 parent 83b5a60 commit ca7aaff

File tree

2 files changed

+49
-8
lines changed

2 files changed

+49
-8
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public Serializer serializer(String topic, Target type) {
279279
@Override
280280
public Deserializer deserializer(String topic, Target type) {
281281
return (headers, data) -> {
282-
var schemaId = extractSchemaIdFromMsg(data);
282+
int schemaId = getSchemaIdFromMessageOrTopic(data, topic, type);
283283
SchemaType format = getMessageFormatBySchemaId(schemaId);
284284
MessageFormatter formatter = schemaRegistryFormatters.get(format);
285285
return new DeserializeResult(
@@ -293,22 +293,30 @@ public Deserializer deserializer(String topic, Target type) {
293293
};
294294
}
295295

296+
private int getSchemaIdFromMessageOrTopic(byte[] data, String topic, Target type) {
297+
return extractSchemaIdFromMsg(data).orElseGet(
298+
() -> {
299+
String subject = schemaSubject(topic, type);
300+
return getSchemaBySubject(subject)
301+
.map(SchemaMetadata::getId)
302+
.orElseThrow(() -> new ValidationException(
303+
String.format("No schema for subject '%s' found and no magic byte in avro data", subject)));
304+
}
305+
);
306+
}
307+
296308
private SchemaType getMessageFormatBySchemaId(int schemaId) {
297309
return getSchemaById(schemaId)
298310
.map(ParsedSchema::schemaType)
299311
.flatMap(SchemaType::fromString)
300312
.orElseThrow(() -> new ValidationException(String.format("Schema for id '%d' not found ", schemaId)));
301313
}
302314

303-
private int extractSchemaIdFromMsg(byte[] data) {
315+
private Optional<Integer> extractSchemaIdFromMsg(byte[] data) {
304316
ByteBuffer buffer = ByteBuffer.wrap(data);
305317
if (buffer.remaining() >= SR_PAYLOAD_PREFIX_LENGTH && buffer.get() == SR_PAYLOAD_MAGIC_BYTE) {
306-
return buffer.getInt();
318+
return Optional.of(buffer.getInt());
307319
}
308-
throw new ValidationException(
309-
String.format(
310-
"Data doesn't contain magic byte and schema id prefix, so it can't be deserialized with %s serde",
311-
name())
312-
);
320+
return Optional.empty();
313321
}
314322
}

kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,39 @@ void deserializeReturnsJsonAvroMsgJsonRepresentation() throws RestClientExceptio
130130
.contains(Map.entry("schemaId", schemaId));
131131
}
132132

133+
@Test
134+
void deserializeReturnsJsonAvroMsgJsonRepresentationViaTopicNameOnly() throws RestClientException, IOException {
135+
AvroSchema schema = new AvroSchema(
136+
"{"
137+
+ " \"type\": \"record\","
138+
+ " \"name\": \"TestAvroRecord1\","
139+
+ " \"fields\": ["
140+
+ " {"
141+
+ " \"name\": \"field1\","
142+
+ " \"type\": \"string\""
143+
+ " },"
144+
+ " {"
145+
+ " \"name\": \"field2\","
146+
+ " \"type\": \"int\""
147+
+ " }"
148+
+ " ]"
149+
+ "}"
150+
);
151+
String jsonValue = "{ \"field1\":\"testStr\", \"field2\": 123 }";
152+
153+
String topic = "test";
154+
int schemaId = registryClient.register(topic + "-value", schema);
155+
156+
byte[] data = jsonToAvro(jsonValue, schema); // No magic byte no schema id registered
157+
var result = serde.deserializer(topic, Serde.Target.VALUE).deserialize(null, data);
158+
159+
assertJsonsEqual(jsonValue, result.getResult());
160+
assertThat(result.getType()).isEqualTo(DeserializeResult.Type.JSON);
161+
assertThat(result.getAdditionalProperties())
162+
.contains(Map.entry("type", "AVRO"))
163+
.contains(Map.entry("schemaId", schemaId));
164+
}
165+
133166
@Nested
134167
class SerdeWithDisabledSubjectExistenceCheck {
135168

0 commit comments

Comments
 (0)