Skip to content

Commit d130258

Browse files
authored
Merge pull request #571 from climber73/add-simple-aggr-func-type-support
add SimpleAggregateFunction column type support
2 parents 93d4e64 + e09c3a3 commit d130258

File tree

4 files changed

+29
-5
lines changed

4 files changed

+29
-5
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# 1.3.3
2+
* Added support of SimpleAggregateFunction column type
3+
14
# 1.3.2
25
Added support of writing JSON as string to a JSON column.
36
Upgraded dependency `org.slf4j:slf4j-reload4j` from `2.0.13` to `2.0.17`

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v1.3.2
1+
v1.3.3

src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
@Builder
2626
@Getter
2727
public class Column {
28+
private static final Pattern PARAMETRIZED_TYPE_PATTERN = Pattern.compile(".+\\(.+\\)");
29+
private static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("Decimal(?<size>\\d{2,3})?\\s*(\\((?<a1>\\d{1,}\\s*)?,*\\s*(?<a2>\\d{1,})?\\))?");
30+
private static final Pattern SIMPLE_AGGREGATE_FUNCTION_TYPE_PATTERN = Pattern.compile("^SimpleAggregateFunction\\s*\\([^,]+,\\s*(.+)\\)$");
31+
2832
private static final Logger LOGGER = LoggerFactory.getLogger(Column.class);
2933

3034
private String name;
@@ -229,7 +233,7 @@ public static Column extractColumn(String name, String valueType, boolean isNull
229233

230234
ColumnBuilder variantTypeBuilder = Column.builder().type(typePrecisionAndScale.getT1());
231235

232-
if (Pattern.compile(".+\\(.+\\)").asMatchPredicate().test(definition)) {
236+
if (PARAMETRIZED_TYPE_PATTERN.asMatchPredicate().test(definition)) {
233237
variantTypeBuilder = variantTypeBuilder
234238
.precision(typePrecisionAndScale.getT2())
235239
.scale(typePrecisionAndScale.getT3());
@@ -251,6 +255,13 @@ public static Column extractColumn(String name, String valueType, boolean isNull
251255
return extractColumn(name, valueType.substring("LowCardinality".length() + 1, valueType.length() - 1), isNull, hasDefaultValue, isSubColumn);
252256
} else if (valueType.startsWith("Nullable")) {
253257
return extractColumn(name, valueType.substring("Nullable".length() + 1, valueType.length() - 1), true, hasDefaultValue, isSubColumn);
258+
} else if (valueType.startsWith("SimpleAggregateFunction")) {
259+
Matcher m = SIMPLE_AGGREGATE_FUNCTION_TYPE_PATTERN.matcher(valueType);
260+
if (!m.matches()) {
261+
throw new RuntimeException("can't parse SimpleAggregateFunction type");
262+
}
263+
String argType = m.group(1).trim();
264+
return extractColumn(name, argType, isNull, hasDefaultValue, isSubColumn);
254265
}
255266

256267
// We're dealing with a primitive type here
@@ -277,9 +288,7 @@ private static Tuple3<Type, Integer, Integer> dispatchPrimitiveWithPrecisionAndS
277288
precision = Integer.parseInt(scaleAndTimezone[0].trim());
278289
LOGGER.trace("Parsed precision of DateTime64 is {}", precision);
279290
} else if (type == Type.Decimal) {
280-
final Pattern patter = Pattern.compile("Decimal(?<size>\\d{2,3})?\\s*(\\((?<a1>\\d{1,}\\s*)?,*\\s*(?<a2>\\d{1,})?\\))?");
281-
Matcher match = patter.matcher(valueType);
282-
291+
Matcher match = DECIMAL_TYPE_PATTERN.matcher(valueType);
283292
if (!match.matches()) {
284293
throw new RuntimeException("type doesn't match");
285294
}

src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/ColumnTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ public void extractLowCardinalityNullableColumn() {
2828
assertEquals(Type.STRING, col.getType());
2929
}
3030

31+
@Test
32+
public void extractSimpleAggregateFunctionColumn() {
33+
Column col = Column.extractColumn(newDescriptor("SimpleAggregateFunction(sum, Int64)"));
34+
assertEquals(Type.INT64, col.getType());
35+
}
36+
37+
@Test
38+
public void extractSimpleAggregateFunctionNullableColumn() {
39+
Column col = Column.extractColumn(newDescriptor("SimpleAggregateFunction(sum, Nullable(Int64))"));
40+
assertEquals(Type.INT64, col.getType());
41+
}
42+
3143
@Test
3244
public void extractArrayOfLowCardinalityNullableColumn() {
3345
Column col = Column.extractColumn(newDescriptor("Array(LowCardinality(Nullable(String)))"));

0 commit comments

Comments
 (0)