From fe9dc76ec1beedd248f1ba3f9bb3113d95b90886 Mon Sep 17 00:00:00 2001 From: MDzaja Date: Wed, 17 Jul 2024 23:20:21 +0200 Subject: [PATCH 1/2] Initial proposition to enable header modifications by custom serializer --- .../ui/serdes/ProducerRecordCreator.java | 19 +++++++++++++------ .../io/kafbat/ui/serdes/SerdeInstance.java | 18 ++++++++++++++---- serde-api/pom.xml | 8 ++++++++ .../java/io/kafbat/ui/serde/api/Serde.java | 5 +++++ 4 files changed, 40 insertions(+), 10 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java b/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java index 359c871d6..084606436 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java +++ b/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java @@ -5,7 +5,7 @@ import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -20,18 +20,25 @@ public ProducerRecord create(String topic, @Nullable String key, @Nullable String value, @Nullable Map headers) { + + Headers kafkaHeaders = createHeaders(headers); + byte[] keyBytes = keySerializer.serialize(key, kafkaHeaders); + byte[] valueBytes = valuesSerializer.serialize(value, kafkaHeaders); + return new ProducerRecord<>( topic, partition, - key == null ? null : keySerializer.serialize(key), - value == null ? null : valuesSerializer.serialize(value), - headers == null ? null : createHeaders(headers) + keyBytes, + valueBytes, + kafkaHeaders ); } - private Iterable
createHeaders(Map clientHeaders) { + private Headers createHeaders(Map clientHeaders) { RecordHeaders headers = new RecordHeaders(); - clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v.getBytes()))); + if (clientHeaders != null) { + clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v.getBytes()))); + } return headers; } diff --git a/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java b/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java index 7c1826257..f2526d698 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java +++ b/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java @@ -3,6 +3,7 @@ import io.kafbat.ui.serde.api.SchemaDescription; import io.kafbat.ui.serde.api.Serde; import java.io.Closeable; +import java.lang.reflect.InvocationTargetException; import java.util.Optional; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -10,6 +11,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.cglib.proxy.Proxy; @Slf4j @RequiredArgsConstructor @@ -78,10 +80,18 @@ public boolean canDeserialize(String topic, Serde.Target type) { } public Serde.Serializer serializer(String topic, Serde.Target type) { - return wrapWithClassloader(() -> { - var serializer = serde.serializer(topic, type); - return input -> wrapWithClassloader(() -> serializer.serialize(input)); - }); + Serde.Serializer realSerializer = serde.serializer(topic, type); + return (Serde.Serializer) Proxy.newProxyInstance( + classLoader, + new Class[] { Serde.Serializer.class }, + (proxy, method, args) -> wrapWithClassloader(() -> { + try { + return method.invoke(realSerializer, args); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException("Error invoking serializer method", e.getCause()); + } + }) + ); } public Serde.Deserializer deserializer(String topic, Serde.Target type) { diff --git a/serde-api/pom.xml b/serde-api/pom.xml index 3c2e3b3a1..984c7a287 100644 --- a/serde-api/pom.xml +++ b/serde-api/pom.xml @@ -12,6 +12,14 @@ 17 + + + org.apache.kafka + kafka-clients + 3.5.2 + + + ossrh diff --git a/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java b/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java index 74705b1b6..45a46814b 100644 --- a/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java +++ b/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java @@ -2,6 +2,7 @@ import java.io.Closeable; import java.util.Optional; +import org.apache.kafka.common.header.Headers; /** * Main interface of serialization/deserialization logic. @@ -96,6 +97,10 @@ interface Serializer { * @param input string entered by user into UI text field.
Note: this input is not formatted in any way. */ byte[] serialize(String input); + + default byte[] serialize(String input, Headers headers) { + return serialize(input); + } } /** From 2c5554660cdf47e267916064860e1618aeca65fa Mon Sep 17 00:00:00 2001 From: MDzaja Date: Sun, 28 Jul 2024 23:02:30 +0200 Subject: [PATCH 2/2] add comments, change variable names, optimize config --- .../java/io/kafbat/ui/serdes/ProducerRecordCreator.java | 6 ++---- api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java | 8 +++++--- serde-api/pom.xml | 8 +++++++- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java b/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java index 084606436..6f4db7a77 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java +++ b/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java @@ -22,14 +22,12 @@ public ProducerRecord create(String topic, @Nullable Map headers) { Headers kafkaHeaders = createHeaders(headers); - byte[] keyBytes = keySerializer.serialize(key, kafkaHeaders); - byte[] valueBytes = valuesSerializer.serialize(value, kafkaHeaders); return new ProducerRecord<>( topic, partition, - keyBytes, - valueBytes, + key == null ? null : keySerializer.serialize(key, kafkaHeaders), + value == null ? null : valuesSerializer.serialize(value, kafkaHeaders), kafkaHeaders ); } diff --git a/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java b/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java index f2526d698..791535081 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java +++ b/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java @@ -80,13 +80,15 @@ public boolean canDeserialize(String topic, Serde.Target type) { } public Serde.Serializer serializer(String topic, Serde.Target type) { - Serde.Serializer realSerializer = serde.serializer(topic, type); + var serializer = serde.serializer(topic, type); + // Create a dynamic proxy instance for the Serde.Serializer interface return (Serde.Serializer) Proxy.newProxyInstance( classLoader, new Class[] { Serde.Serializer.class }, - (proxy, method, args) -> wrapWithClassloader(() -> { + (proxy, method, args) -> wrapWithClassloader(() -> { // Invocation handler to wrap method calls try { - return method.invoke(realSerializer, args); + // Invoke the actual serializer method with the provided arguments + return method.invoke(serializer, args); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException("Error invoking serializer method", e.getCause()); } diff --git a/serde-api/pom.xml b/serde-api/pom.xml index 984c7a287..7ae589f08 100644 --- a/serde-api/pom.xml +++ b/serde-api/pom.xml @@ -4,6 +4,12 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + kafbat-ui + io.kafbat.ui + 0.0.1-SNAPSHOT + + 4.0.0 jar @@ -16,7 +22,7 @@ org.apache.kafka kafka-clients - 3.5.2 + ${kafka-clients.version}