Skip to content

Commit 4de0d53

Browse files
authored
BE: Messages: Add CEL extensions (kafbat#465)
1 parent 28677a9 commit 4de0d53

File tree

2 files changed

+22
-9
lines changed

2 files changed

+22
-9
lines changed

api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
import dev.cel.common.types.StructType;
1919
import dev.cel.compiler.CelCompiler;
2020
import dev.cel.compiler.CelCompilerFactory;
21+
import dev.cel.extensions.CelExtensions;
2122
import dev.cel.parser.CelStandardMacro;
2223
import dev.cel.runtime.CelEvaluationException;
2324
import dev.cel.runtime.CelRuntime;
2425
import dev.cel.runtime.CelRuntimeFactory;
2526
import io.kafbat.ui.exception.CelException;
26-
import io.kafbat.ui.model.MessageFilterTypeDTO;
2727
import io.kafbat.ui.model.TopicMessageDTO;
2828
import java.util.HashMap;
2929
import java.util.Map;
@@ -42,8 +42,7 @@ public class MessageFilters {
4242
private static final String CEL_RECORD_TYPE_NAME = TopicMessageDTO.class.getSimpleName();
4343

4444
private static final CelCompiler CEL_COMPILER = createCompiler();
45-
private static final CelRuntime CEL_RUNTIME = CelRuntimeFactory.standardCelRuntimeBuilder()
46-
.build();
45+
private static final CelRuntime CEL_RUNTIME = createRuntime();
4746

4847
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
4948

@@ -143,6 +142,7 @@ private static CelCompiler createCompiler() {
143142
return CelCompilerFactory.standardCelCompilerBuilder()
144143
.setOptions(CelOptions.DEFAULT)
145144
.setStandardMacros(CelStandardMacro.STANDARD_MACROS)
145+
.addLibraries(CelExtensions.strings(), CelExtensions.encoders())
146146
.addVar(CEL_RECORD_VAR_NAME, recordType)
147147
.setResultType(SimpleType.BOOL)
148148
.setTypeProvider(new CelTypeProvider() {
@@ -159,6 +159,12 @@ public Optional<CelType> findType(String typeName) {
159159
.build();
160160
}
161161

162+
private static CelRuntime createRuntime() {
163+
return CelRuntimeFactory.standardCelRuntimeBuilder()
164+
.addLibraries(CelExtensions.strings(), CelExtensions.encoders())
165+
.build();
166+
}
167+
162168
@Nullable
163169
private static Object parseToJsonOrReturnAsIs(@Nullable String str) {
164170
if (str == null) {

api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010
import io.kafbat.ui.exception.CelException;
1111
import io.kafbat.ui.model.TopicMessageDTO;
1212
import java.time.OffsetDateTime;
13-
import java.time.temporal.ChronoUnit;
1413
import java.util.ArrayList;
14+
import java.util.Base64;
1515
import java.util.List;
1616
import java.util.Map;
17+
import java.util.UUID;
1718
import java.util.function.Predicate;
1819
import org.apache.commons.lang3.RandomStringUtils;
1920
import org.junit.jupiter.api.Nested;
@@ -100,7 +101,7 @@ void canCheckTimestampMs() {
100101
var ts = OffsetDateTime.now();
101102
var f = celScriptFilter("record.timestampMs == " + ts.toInstant().toEpochMilli());
102103
assertTrue(f.test(msg().timestamp(ts)));
103-
assertFalse(f.test(msg().timestamp(ts.plus(1L, ChronoUnit.SECONDS))));
104+
assertFalse(f.test(msg().timestamp(ts.plusSeconds(1L))));
104105
}
105106

106107
@Test
@@ -177,6 +178,7 @@ void filterSpeedIsAtLeast5kPerSec() {
177178
toFilter.add(msg().content(jsonContent).key(randString));
178179
}
179180
// first iteration for warmup
181+
// noinspection ResultOfMethodCallIgnored
180182
toFilter.stream().filter(f).count();
181183

182184
long before = System.currentTimeMillis();
@@ -188,10 +190,15 @@ void filterSpeedIsAtLeast5kPerSec() {
188190
}
189191
}
190192

193+
@Test
194+
void testBase64DecodingWorks() {
195+
var uuid = UUID.randomUUID().toString();
196+
var msg = "test." + Base64.getEncoder().encodeToString(uuid.getBytes());
197+
var f = celScriptFilter("string(base64.decode(record.value.split('.')[1])).contains('" + uuid + "')");
198+
assertTrue(f.test(msg().content(msg)));
199+
}
200+
191201
private TopicMessageDTO msg() {
192-
return new TopicMessageDTO()
193-
.timestamp(OffsetDateTime.now())
194-
.offset(-1L)
195-
.partition(1);
202+
return new TopicMessageDTO(1, -1L, OffsetDateTime.now());
196203
}
197204
}

0 commit comments

Comments
 (0)