Skip to content

Commit 570433b

Browse files
committed
BE: Closes #71 Messages: Show headers duplicates
1 parent 9ea1a4e commit 570433b

File tree

15 files changed

+176
-47
lines changed

15 files changed

+176
-47
lines changed

api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import dev.cel.common.CelValidationResult;
1515
import dev.cel.common.types.CelType;
1616
import dev.cel.common.types.CelTypeProvider;
17+
import dev.cel.common.types.ListType;
1718
import dev.cel.common.types.MapType;
1819
import dev.cel.common.types.SimpleType;
1920
import dev.cel.common.types.StructType;
@@ -67,9 +68,14 @@ private static boolean headersContains(TopicMessageDTO msg, String searchString)
6768
}
6869

6970
for (final var entry : headers.entrySet()) {
70-
if (StringUtils.contains(entry.getKey(), searchString) || StringUtils.contains(entry.getValue(), searchString)) {
71+
if (StringUtils.contains(entry.getKey(), searchString)) {
7172
return true;
7273
}
74+
for (final var value : entry.getValue()) {
75+
if (StringUtils.contains(value, searchString)) {
76+
return true;
77+
}
78+
}
7379
}
7480

7581
return false;
@@ -143,7 +149,7 @@ private static CelCompiler createCompiler() {
143149
"timestampMs", SimpleType.INT,
144150
"keyAsText", SimpleType.STRING,
145151
"valueAsText", SimpleType.STRING,
146-
"headers", MapType.create(SimpleType.STRING, SimpleType.STRING),
152+
"headers", MapType.create(SimpleType.STRING, ListType.create(SimpleType.STRING)),
147153
"key", SimpleType.DYN,
148154
"value", SimpleType.DYN
149155
);

api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import java.time.Instant;
77
import java.time.OffsetDateTime;
88
import java.time.ZoneId;
9+
import java.util.ArrayList;
910
import java.util.Arrays;
1011
import java.util.HashMap;
12+
import java.util.List;
1113
import java.util.Map;
1214
import java.util.function.UnaryOperator;
1315
import lombok.RequiredArgsConstructor;
@@ -63,14 +65,14 @@ private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType)
6365
}
6466

6567
private void fillHeaders(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {
66-
Map<String, String> headers = new HashMap<>();
68+
Map<String, List<String>> headersMap = new HashMap<>();
6769
rec.headers().iterator()
68-
.forEachRemaining(header ->
69-
headers.put(
70-
header.key(),
71-
header.value() != null ? new String(header.value()) : null
72-
));
73-
message.setHeaders(headers);
70+
.forEachRemaining(header -> {
71+
String key = header.key();
72+
String value = header.value() != null ? new String(header.value()) : null;
73+
headersMap.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
74+
});
75+
message.setHeaders(headersMap);
7476
}
7577

7678
private void fillKey(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {

api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kafbat.ui.serdes;
22

33
import io.kafbat.ui.serde.api.Serde;
4+
import java.util.List;
45
import java.util.Map;
56
import javax.annotation.Nullable;
67
import lombok.RequiredArgsConstructor;
@@ -19,7 +20,7 @@ public ProducerRecord<byte[], byte[]> create(String topic,
1920
@Nullable Integer partition,
2021
@Nullable String key,
2122
@Nullable String value,
22-
@Nullable Map<String, String> headers) {
23+
@Nullable Map<String, List<String>> headers) {
2324
return new ProducerRecord<>(
2425
topic,
2526
partition,
@@ -29,10 +30,14 @@ public ProducerRecord<byte[], byte[]> create(String topic,
2930
);
3031
}
3132

32-
private Iterable<Header> createHeaders(Map<String, String> clientHeaders) {
33+
private Iterable<Header> createHeaders(Map<String, List<String>> clientHeaders) {
3334
RecordHeaders headers = new RecordHeaders();
34-
clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v == null ? null : v.getBytes())));
35+
clientHeaders.forEach((k, values) -> values.forEach(v -> headers.add(createRecord(k, v))));
3536
return headers;
3637
}
3738

39+
private RecordHeader createRecord(String key, String value) {
40+
return new RecordHeader(key, value == null ? null : value.getBytes());
41+
}
42+
3843
}

api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,17 @@ void returnsTrueWhenStringContainedInKeyOrContentOrHeadersOrInAllThree() {
4242
);
4343

4444
assertTrue(
45-
filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("abC", "value")))
45+
filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("abC", List.of("value"))))
4646
);
4747

4848
assertTrue(
49-
filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("x1", "some abC")))
49+
filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("x1", List.of("some abC"))))
50+
);
51+
52+
assertTrue(
53+
filter.test(msg().key("dfg")
54+
.content("does-not-contain")
55+
.headers(Map.of("x1", List.of("does-not-contain", "some abC"))))
5056
);
5157
}
5258

@@ -65,7 +71,7 @@ void returnsFalseOtherwise() {
6571
);
6672

6773
assertFalse(
68-
filter.test(msg().key("aBc").content("AbC").headers(Map.of("abc", "value")))
74+
filter.test(msg().key("aBc").content("AbC").headers(Map.of("abc", List.of("value"))))
6975
);
7076

7177
}
@@ -97,12 +103,12 @@ void canCheckOffset() {
97103

98104
@Test
99105
void canCheckHeaders() {
100-
var f = celScriptFilter("record.headers.size() == 2 && record.headers['k1'] == 'v1'");
101-
assertTrue(f.test(msg().headers(Map.of("k1", "v1", "k2", "v2"))));
102-
assertFalse(f.test(msg().headers(Map.of("k1", "unexpected", "k2", "v2"))));
106+
var f = celScriptFilter("record.headers.size() == 2 && record.headers['k1'] == ['v1']");
107+
assertTrue(f.test(msg().headers(Map.of("k1", List.of("v1"), "k2", List.of("v2")))));
108+
assertFalse(f.test(msg().headers(Map.of("k1", List.of("unexpected"), "k2", List.of("v2")))));
103109

104-
f = celScriptFilter("record.headers.size() == 1 && !has(record.headers.k1) && record.headers['k2'] == 'v2'");
105-
assertTrue(f.test(msg().headers(Map.of("k2", "v2"))));
110+
f = celScriptFilter("record.headers.size() == 1 && !has(record.headers.k1) && record.headers['k2'] == ['v2']");
111+
assertTrue(f.test(msg().headers(Map.of("k2", List.of("v2")))));
106112

107113
f = celScriptFilter("!has(record.headers) || record.headers.size() == 0");
108114
assertTrue(f.test(msg().headers(Map.of())));

api/src/test/java/io/kafbat/ui/serdes/ConsumerRecordDeserializerTest.java

+46-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
package io.kafbat.ui.serdes;
22

33
import static io.kafbat.ui.serde.api.DeserializeResult.Type.STRING;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
46
import static org.mockito.Mockito.any;
57
import static org.mockito.Mockito.mock;
68
import static org.mockito.Mockito.verify;
9+
import static org.mockito.Mockito.when;
710

811
import io.kafbat.ui.model.TopicMessageDTO;
912
import io.kafbat.ui.serde.api.DeserializeResult;
1013
import io.kafbat.ui.serde.api.Serde;
14+
import java.util.List;
1115
import java.util.Map;
1216
import java.util.function.UnaryOperator;
1317
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -22,9 +26,50 @@ void dataMaskingAppliedOnDeserializedMessage() {
2226
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
2327

2428
var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
25-
recordDeser.deserialize(new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes())));
29+
recordDeser.deserialize(record());
2630

2731
verify(maskerMock).apply(any(TopicMessageDTO.class));
2832
}
2933

34+
@Test
35+
void deserializeWithMultipleHeaderValues() {
36+
UnaryOperator<TopicMessageDTO> maskerMock = mock();
37+
when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0));
38+
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
39+
40+
var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
41+
ConsumerRecord<Bytes, Bytes> record = record();
42+
record.headers().add("headerKey", "headerValue1".getBytes());
43+
record.headers().add("headerKey", "headerValue2".getBytes());
44+
TopicMessageDTO message = recordDeser.deserialize(record);
45+
46+
Map<String, List<String>> headers = message.getHeaders();
47+
assertEquals(1, headers.size());
48+
assertEquals(List.of("headerValue1", "headerValue2"), headers.get("headerKey"));
49+
}
50+
51+
@Test
52+
void deserializeWithMixedSingleAndMultipleHeaderValues() {
53+
UnaryOperator<TopicMessageDTO> maskerMock = mock();
54+
when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0));
55+
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
56+
57+
var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
58+
ConsumerRecord<Bytes, Bytes> record = record();
59+
record.headers().add("headerKey1", "singleValue".getBytes());
60+
record.headers().add("headerKey2", "multiValue1".getBytes());
61+
record.headers().add("headerKey2", "multiValue2".getBytes());
62+
TopicMessageDTO message = recordDeser.deserialize(record);
63+
64+
Map<String, List<String>> headers = message.getHeaders();
65+
assertEquals(1, headers.get("headerKey1").size());
66+
assertEquals(List.of("singleValue"), headers.get("headerKey1"));
67+
assertEquals(2, headers.get("headerKey2").size());
68+
assertEquals(List.of("multiValue1", "multiValue2"), headers.get("headerKey2"));
69+
}
70+
71+
private ConsumerRecord<Bytes, Bytes> record() {
72+
return new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes()));
73+
}
74+
3075
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.kafbat.ui.serdes;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
import static org.junit.jupiter.api.Assertions.assertNotNull;
6+
import static org.junit.jupiter.api.Assertions.assertThrows;
7+
import static org.mockito.Mockito.mock;
8+
9+
import io.kafbat.ui.exception.ValidationException;
10+
import io.kafbat.ui.serde.api.Serde;
11+
import java.util.List;
12+
import java.util.Map;
13+
import org.apache.kafka.clients.producer.ProducerRecord;
14+
import org.apache.kafka.common.header.internals.RecordHeader;
15+
import org.junit.jupiter.api.Test;
16+
17+
class ProducerRecordCreatorTest {
18+
19+
@Test
20+
void createWithHeaders() {
21+
Serde.Serializer keySerializer = mock(Serde.Serializer.class);
22+
Serde.Serializer valueSerializer = mock(Serde.Serializer.class);
23+
24+
ProducerRecordCreator recordCreator = new ProducerRecordCreator(keySerializer, valueSerializer);
25+
Map<String, List<String>> headersMap = Map.of(
26+
"headerKey1", List.of("headerValue1"),
27+
"headerKey2", List.of("headerValue2", "headerValue3")
28+
);
29+
ProducerRecord<byte[], byte[]> record = recordCreator.create("topic", 1, "key", "value", headersMap);
30+
31+
assertNotNull(record.headers());
32+
assertEquals(3, record.headers().toArray().length);
33+
assertThat(record.headers()).containsExactlyInAnyOrder(
34+
new RecordHeader("headerKey1", "headerValue1".getBytes()),
35+
new RecordHeader("headerKey2", "headerValue2".getBytes()),
36+
new RecordHeader("headerKey2", "headerValue3".getBytes())
37+
);
38+
}
39+
}

api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ void execSmartFilterTestReturnsExecutionResult() {
170170
+ "&& has(record.timestampMs) && has(record.offset)")
171171
.key("1234")
172172
.value("{ \"some\" : \"value\" } ")
173-
.headers(Map.of("h1", "hv1"))
173+
.headers(Map.of("h1", List.of("hv1")))
174174
.offset(12345L)
175175
.timestampMs(System.currentTimeMillis())
176176
.partition(1);

api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ void topicMessageMetadataJson() {
412412
.keySerde(SchemaRegistrySerde.name())
413413
.content(JSON_SCHEMA_RECORD)
414414
.valueSerde(SchemaRegistrySerde.name())
415-
.headers(Map.of("header1", "value1"))
415+
.headers(Map.of("header1", List.of("value1")))
416416
)
417417
.doAssert(polled -> {
418418
assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
@@ -438,9 +438,9 @@ void headerValueNullPresentTest() {
438438
.keySerde(SchemaRegistrySerde.name())
439439
.content(JSON_SCHEMA_RECORD)
440440
.valueSerde(SchemaRegistrySerde.name())
441-
.headers(Collections.singletonMap("header123", null))
441+
.headers(Collections.singletonMap("header123", Collections.singletonList(null)))
442442
)
443-
.doAssert(polled -> assertThat(polled.getHeaders().get("header123")).isNull());
443+
.doAssert(polled -> assertThat(polled.getHeaders().get("header123")).containsExactly((String) null));
444444
}
445445

446446

contract/src/main/resources/swagger/kafbat-ui-api.yaml

+12-3
Original file line numberDiff line numberDiff line change
@@ -3003,8 +3003,11 @@ components:
30033003
type: string
30043004
headers:
30053005
type: object
3006+
description: headers can contain multiple values
30063007
additionalProperties:
3007-
type: string
3008+
type: array
3009+
items:
3010+
type: string
30083011
partition:
30093012
type: integer
30103013
offset:
@@ -3032,8 +3035,11 @@ components:
30323035
nullable: true
30333036
headers:
30343037
type: object
3038+
description: headers can contain multiple values
30353039
additionalProperties:
3036-
type: string
3040+
type: array
3041+
items:
3042+
type: string
30373043
content:
30383044
type: string
30393045
nullable: true
@@ -3122,8 +3128,11 @@ components:
31223128
type: string
31233129
headers:
31243130
type: object
3131+
description: headers can contain multiple values
31253132
additionalProperties:
3126-
type: string
3133+
type: array
3134+
items:
3135+
type: string
31273136
content:
31283137
type: string
31293138
keyFormat:

frontend/src/components/Topics/Topic/Messages/Filters/InfoModal.tsx

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const InfoModal: React.FC<InfoModalProps> = ({ toggleIsOpen }) => {
1919
<S.ListItem>value (json if possible)</S.ListItem>
2020
<S.ListItem>keyAsText</S.ListItem>
2121
<S.ListItem>valueAsText</S.ListItem>
22-
<S.ListItem>header</S.ListItem>
22+
<S.ListItem>headers</S.ListItem>
2323
<S.ListItem>partition</S.ListItem>
2424
<S.ListItem>timestampMs</S.ListItem>
2525
</ol>
@@ -51,7 +51,7 @@ const InfoModal: React.FC<InfoModalProps> = ({ toggleIsOpen }) => {
5151
<S.ListItem>
5252
<code>
5353
record.headers.size() == 1 && !has(record.headers.k1) &&
54-
record.headers[&apos;k2&apos;] == &apos;v2&apos;
54+
&apos;v2&apos; in record.headers[&apos;k2&apos;]
5555
</code>
5656
</S.ListItem>
5757
</ol>

frontend/src/components/Topics/Topic/Messages/MessageContent/MessageContent.tsx

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type Tab = 'key' | 'content' | 'headers';
1111
export interface MessageContentProps {
1212
messageKey?: string;
1313
messageContent?: string;
14-
headers?: { [key: string]: string | undefined };
14+
headers?: { [key: string]: string[] | undefined };
1515
timestamp?: Date;
1616
timestampType?: TopicMessageTimestampTypeEnum;
1717
keySize?: number;

frontend/src/components/Topics/Topic/Messages/MessageContent/__tests__/MessageContent.spec.tsx

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ const setupWrapper = (props?: Partial<MessageContentProps>) => {
1717
<MessageContent
1818
messageKey='"test-key"'
1919
messageContent='{"data": "test"}'
20-
headers={{ header: 'test' }}
20+
headers={{ header1: ['test'], header2: ['value1', 'value2'] }}
2121
timestamp={new Date(0)}
2222
timestampType={TopicMessageTimestampTypeEnum.CREATE_TIME}
2323
keySerde="SchemaRegistry"

frontend/src/components/Topics/Topic/Messages/__test__/Message.spec.tsx

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ describe('Message component', () => {
3131
key: 'test-key',
3232
partition: 6,
3333
content: '{"data": "test"}',
34-
headers: { header: 'test' },
34+
headers: { header: ['test'] },
3535
};
3636
const mockKeyFilters: PreviewFilter = {
3737
field: 'sub',

frontend/src/components/Topics/Topic/SendMessage/SendMessage.styled.tsx

+10
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,13 @@ export const FlexItem = styled.div`
3434
width: 100%;
3535
}
3636
`;
37+
export const Headers = styled.div`
38+
display: flex;
39+
align-items: center;
40+
41+
svg {
42+
margin-left: 8px;
43+
vertical-align: middle;
44+
cursor: pointer;
45+
}
46+
`;

0 commit comments

Comments
 (0)