Skip to content

Commit 9293d92

Browse files
committed
Extra java 17 code clean up
Signed-off-by: Marvin Froeder <marvin@datasqrl.com>
1 parent ae9b591 commit 9293d92

File tree

42 files changed

+336
-397
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+336
-397
lines changed

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlBaseJdbcRowConverter.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public SqrlBaseJdbcRowConverter(RowType rowType) {
4848
protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
4949
JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) {
5050
if (type.getTypeRoot() == TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
51-
int timestampWithTimezone = Types.TIMESTAMP_WITH_TIMEZONE;
51+
var timestampWithTimezone = Types.TIMESTAMP_WITH_TIMEZONE;
5252
return (val, index, statement) -> {
5353
if (val == null || val.isNullAt(index) || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
5454
statement.setNull(index, timestampWithTimezone);
@@ -62,15 +62,15 @@ protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
6262

6363
@Override
6464
public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
65-
LogicalTypeRoot root = type.getTypeRoot();
65+
var root = type.getTypeRoot();
6666

6767
if (root == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
6868
return val ->
6969
val instanceof LocalDateTime ldt
7070
? TimestampData.fromLocalDateTime(ldt)
7171
: TimestampData.fromTimestamp((Timestamp) val);
7272
} else if (root == LogicalTypeRoot.ARRAY) {
73-
ArrayType arrayType = (ArrayType) type;
73+
var arrayType = (ArrayType) type;
7474
return createArrayConverter(arrayType);
7575
} else if (root == LogicalTypeRoot.ROW) {
7676
return val -> val;
@@ -83,16 +83,15 @@ public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
8383

8484
@Override
8585
protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
86-
switch (type.getTypeRoot()) {
87-
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
88-
final int tsPrecision = ((LocalZonedTimestampType) type).getPrecision();
89-
return (val, index, statement) ->
86+
return switch (type.getTypeRoot()) {
87+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE -> {
88+
final var tsPrecision = ((LocalZonedTimestampType) type).getPrecision();
89+
yield (val, index, statement) ->
9090
statement.setTimestamp(index, val.getTimestamp(index, tsPrecision).toTimestamp());
91-
case MULTISET:
92-
case RAW:
93-
default:
94-
return super.createExternalConverter(type);
95-
}
91+
}
92+
case MULTISET, RAW -> super.createExternalConverter(type);
93+
default -> super.createExternalConverter(type);
94+
};
9695
}
9796

9897
@SneakyThrows

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlJdbcDynamicTableFactory.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,14 @@ public class SqrlJdbcDynamicTableFactory implements DynamicTableSinkFactory {
7171

7272
@Override
7373
public DynamicTableSink createDynamicTableSink(Context context) {
74-
final FactoryUtil.TableFactoryHelper helper =
75-
FactoryUtil.createTableFactoryHelper(this, context);
76-
final ReadableConfig config = helper.getOptions();
74+
final var helper = FactoryUtil.createTableFactoryHelper(this, context);
75+
final var config = helper.getOptions();
7776

7877
helper.validate();
7978
validateConfigOptions(config, context.getClassLoader());
8079
validateDataTypeWithJdbcDialect(
8180
context.getPhysicalRowDataType(), config.get(URL), context.getClassLoader());
82-
InternalJdbcConnectionOptions jdbcOptions = getJdbcOptions(config, context.getClassLoader());
81+
var jdbcOptions = getJdbcOptions(config, context.getClassLoader());
8382

8483
return new JdbcDynamicTableSink(
8584
jdbcOptions,
@@ -91,15 +90,15 @@ public DynamicTableSink createDynamicTableSink(Context context) {
9190

9291
private static void validateDataTypeWithJdbcDialect(
9392
DataType dataType, String url, ClassLoader classLoader) {
94-
JdbcDialect dialect = loadDialect(url, classLoader);
93+
var dialect = loadDialect(url, classLoader);
9594

9695
dialect.validate((RowType) dataType.getLogicalType());
9796
}
9897

9998
private InternalJdbcConnectionOptions getJdbcOptions(
10099
ReadableConfig readableConfig, ClassLoader classLoader) {
101-
final String url = readableConfig.get(URL);
102-
final InternalJdbcConnectionOptions.Builder builder =
100+
final var url = readableConfig.get(URL);
101+
final var builder =
103102
InternalJdbcConnectionOptions.builder()
104103
.setClassLoader(classLoader)
105104
.setDBUrl(url)
@@ -116,7 +115,7 @@ private InternalJdbcConnectionOptions getJdbcOptions(
116115
}
117116

118117
private static JdbcDialect loadDialect(String url, ClassLoader classLoader) {
119-
JdbcDialect dialect = JdbcDialectLoader.load(url, classLoader);
118+
var dialect = JdbcDialectLoader.load(url, classLoader);
120119
// sqrl: standard postgres dialect with extended dialect
121120
if (dialect.dialectName().equalsIgnoreCase("PostgreSQL")) {
122121
return new SqrlPostgresDialect();
@@ -125,7 +124,7 @@ private static JdbcDialect loadDialect(String url, ClassLoader classLoader) {
125124
}
126125

127126
private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
128-
final JdbcExecutionOptions.Builder builder = new JdbcExecutionOptions.Builder();
127+
final var builder = new JdbcExecutionOptions.Builder();
129128
builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
130129
builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
131130
builder.withMaxRetries(config.get(SINK_MAX_RETRIES));
@@ -135,7 +134,7 @@ private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
135134
private JdbcDmlOptions getJdbcDmlOptions(
136135
InternalJdbcConnectionOptions jdbcOptions, DataType dataType, int[] primaryKeyIndexes) {
137136

138-
String[] keyFields =
137+
var keyFields =
139138
Arrays.stream(primaryKeyIndexes)
140139
.mapToObj(i -> DataType.getFieldNames(dataType).get(i))
141140
.toArray(String[]::new);
@@ -209,7 +208,7 @@ public Set<ConfigOption<?>> forwardOptions() {
209208
}
210209

211210
private void validateConfigOptions(ReadableConfig config, ClassLoader classLoader) {
212-
String jdbcUrl = config.get(URL);
211+
var jdbcUrl = config.get(URL);
213212
// JdbcDialectLoader.load(jdbcUrl, classLoader);
214213

215214
checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD});
@@ -263,14 +262,13 @@ private void validateConfigOptions(ReadableConfig config, ClassLoader classLoade
263262
}
264263

265264
private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptions) {
266-
int presentCount = 0;
265+
var presentCount = 0;
267266
for (ConfigOption configOption : configOptions) {
268267
if (config.getOptional(configOption).isPresent()) {
269268
presentCount++;
270269
}
271270
}
272-
String[] propertyNames =
273-
Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
271+
var propertyNames = Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
274272
Preconditions.checkArgument(
275273
configOptions.length == presentCount || presentCount == 0,
276274
"Either all or none of the following options should be provided:\n"

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresDialect.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ public Optional<String> defaultDriverName() {
6868
@Override
6969
public Optional<String> getUpsertStatement(
7070
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
71-
String uniqueColumns =
71+
var uniqueColumns =
7272
Arrays.stream(uniqueKeyFields).map(this::quoteIdentifier).collect(Collectors.joining(", "));
73-
String updateClause =
73+
var updateClause =
7474
Arrays.stream(fieldNames)
7575
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
7676
.collect(Collectors.joining(", "));

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresRowConverter.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ public JdbcDeserializationConverter createArrayConverter(ArrayType arrayType) {
9999
// primitive byte arrays
100100
final Class<?> elementClass =
101101
LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
102-
final JdbcDeserializationConverter elementConverter =
103-
createNullableInternalConverter(arrayType.getElementType());
102+
final var elementConverter = createNullableInternalConverter(arrayType.getElementType());
104103
return val -> {
105104
// sqrl: check if scalar array
106105

@@ -110,9 +109,8 @@ public JdbcDeserializationConverter createArrayConverter(ArrayType arrayType) {
110109
} else {
111110
in = (Object[]) val;
112111
}
113-
final Object[] array =
114-
(Object[]) java.lang.reflect.Array.newInstance(elementClass, in.length);
115-
for (int i = 0; i < in.length; i++) {
112+
final var array = (Object[]) java.lang.reflect.Array.newInstance(elementClass, in.length);
113+
for (var i = 0; i < in.length; i++) {
116114
array[i] = elementConverter.deserialize(in[i]);
117115
}
118116
return new GenericArrayData(array);

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/FlinkArrayTypeUtil.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,34 +29,33 @@ public static LogicalType getBaseFlinkArrayType(LogicalType type) {
2929

3030
public static boolean isScalarArray(LogicalType type) {
3131
if (type instanceof ArrayType arrayType) {
32-
LogicalType elementType = arrayType.getElementType();
32+
var elementType = arrayType.getElementType();
3333
return isScalar(elementType) || isScalarArray(elementType);
3434
}
3535
return false;
3636
}
3737

3838
public static boolean isScalar(LogicalType type) {
39-
switch (type.getTypeRoot()) {
40-
case BOOLEAN:
41-
case TINYINT:
42-
case SMALLINT:
43-
case INTEGER:
44-
case BIGINT:
45-
case FLOAT:
46-
case DOUBLE:
47-
case CHAR:
48-
case VARCHAR:
49-
case BINARY:
50-
case VARBINARY:
51-
case DATE:
52-
case TIME_WITHOUT_TIME_ZONE:
53-
case TIMESTAMP_WITH_TIME_ZONE:
54-
case TIMESTAMP_WITHOUT_TIME_ZONE:
55-
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
56-
case DECIMAL:
57-
return true;
58-
default:
59-
return false;
60-
}
39+
return switch (type.getTypeRoot()) {
40+
case BOOLEAN,
41+
TINYINT,
42+
SMALLINT,
43+
INTEGER,
44+
BIGINT,
45+
FLOAT,
46+
DOUBLE,
47+
CHAR,
48+
VARCHAR,
49+
BINARY,
50+
VARBINARY,
51+
DATE,
52+
TIME_WITHOUT_TIME_ZONE,
53+
TIMESTAMP_WITH_TIME_ZONE,
54+
TIMESTAMP_WITHOUT_TIME_ZONE,
55+
TIMESTAMP_WITH_LOCAL_TIME_ZONE,
56+
DECIMAL ->
57+
true;
58+
default -> false;
59+
};
6160
}
6261
}

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresArrayTypeConverter.java

Lines changed: 23 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -22,56 +22,28 @@ public class PostgresArrayTypeConverter {
2222

2323
/** Return the base array type for flink type */
2424
public static String getArrayScalarName(LogicalType type) {
25-
switch (type.getTypeRoot()) {
26-
case CHAR:
27-
case VARCHAR:
28-
return "text";
29-
case BOOLEAN:
30-
return "boolean";
31-
case BINARY:
32-
case VARBINARY:
33-
return "bytea";
34-
case DECIMAL:
35-
return "decimal";
36-
case TINYINT:
37-
return "smallint";
38-
case SMALLINT:
39-
return "smallint";
40-
case INTEGER:
41-
return "integer";
42-
case BIGINT:
43-
return "bigint";
44-
case FLOAT:
45-
return "real"; // PostgreSQL uses REAL for float
46-
case DOUBLE:
47-
return "double";
48-
case DATE:
49-
return "date";
50-
case TIME_WITHOUT_TIME_ZONE:
51-
return "time without time zone";
52-
case TIMESTAMP_WITHOUT_TIME_ZONE:
53-
return "timestamp without time zone";
54-
case TIMESTAMP_WITH_TIME_ZONE:
55-
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
56-
return "timestamptz";
57-
case INTERVAL_YEAR_MONTH:
58-
return "interval year to month";
59-
case INTERVAL_DAY_TIME:
60-
return "interval day to second";
61-
case NULL:
62-
return "void";
63-
case ARRAY:
64-
return getArrayScalarName(((ArrayType) type).getElementType());
65-
case MULTISET:
66-
case MAP:
67-
case ROW:
68-
case DISTINCT_TYPE:
69-
case STRUCTURED_TYPE:
70-
case RAW:
71-
case SYMBOL:
72-
case UNRESOLVED:
73-
default:
74-
throw new RuntimeException("Cannot convert type to array type");
75-
}
25+
return switch (type.getTypeRoot()) {
26+
case CHAR, VARCHAR -> "text";
27+
case BOOLEAN -> "boolean";
28+
case BINARY, VARBINARY -> "bytea";
29+
case DECIMAL -> "decimal";
30+
case TINYINT -> "smallint";
31+
case SMALLINT -> "smallint";
32+
case INTEGER -> "integer";
33+
case BIGINT -> "bigint";
34+
case FLOAT -> "real"; // PostgreSQL uses REAL for float
35+
case DOUBLE -> "double";
36+
case DATE -> "date";
37+
case TIME_WITHOUT_TIME_ZONE -> "time without time zone";
38+
case TIMESTAMP_WITHOUT_TIME_ZONE -> "timestamp without time zone";
39+
case TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE -> "timestamptz";
40+
case INTERVAL_YEAR_MONTH -> "interval year to month";
41+
case INTERVAL_DAY_TIME -> "interval day to second";
42+
case NULL -> "void";
43+
case ARRAY -> getArrayScalarName(((ArrayType) type).getElementType());
44+
case MULTISET, MAP, ROW, DISTINCT_TYPE, STRUCTURED_TYPE, RAW, SYMBOL, UNRESOLVED ->
45+
throw new RuntimeException("Cannot convert type to array type");
46+
default -> throw new RuntimeException("Cannot convert type to array type");
47+
};
7648
}
7749
}

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresJsonTypeSerializer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,23 +45,23 @@ public String dialectTypeName() {
4545
public GenericDeserializationConverter<JdbcDeserializationConverter> getDeserializerConverter() {
4646
return () ->
4747
(val) -> {
48-
FlinkJsonType t = (FlinkJsonType) val;
48+
var t = (FlinkJsonType) val;
4949
return t.getJson();
5050
};
5151
}
5252

5353
@Override
5454
public GenericSerializationConverter<JdbcSerializationConverter> getSerializerConverter(
5555
LogicalType type) {
56-
FlinkJsonTypeSerializer typeSerializer = new FlinkJsonTypeSerializer();
56+
var typeSerializer = new FlinkJsonTypeSerializer();
5757

5858
return () ->
5959
(val, index, statement) -> {
6060
if (val != null && !val.isNullAt(index)) {
61-
PGobject pgObject = new PGobject();
61+
var pgObject = new PGobject();
6262
pgObject.setType("json");
6363
RawValueData<FlinkJsonType> object = val.getRawValue(index);
64-
FlinkJsonType vec = object.toObject(typeSerializer);
64+
var vec = object.toObject(typeSerializer);
6565
if (vec == null) {
6666
statement.setObject(index, null);
6767
} else {

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresRowTypeSerializer.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcSerializationConverter;
2121
import org.apache.flink.formats.common.TimestampFormat;
2222
import org.apache.flink.formats.json.JsonFormatOptions.MapNullKeyMode;
23-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
2423
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
25-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
2624
import org.apache.flink.table.types.logical.ArrayType;
2725
import org.apache.flink.table.types.logical.LogicalType;
2826
import org.apache.flink.types.Row;
@@ -56,21 +54,21 @@ public GenericDeserializationConverter<JdbcDeserializationConverter> getDeserial
5654
@Override
5755
public GenericSerializationConverter<JdbcSerializationConverter> getSerializerConverter(
5856
LogicalType type) {
59-
ObjectMapper mapper = new ObjectMapper();
57+
var mapper = new ObjectMapper();
6058
return () ->
6159
(val, index, statement) -> {
6260
if (val != null && !val.isNullAt(index)) {
63-
SqrlRowDataToJsonConverters rowDataToJsonConverter =
61+
var rowDataToJsonConverter =
6462
new SqrlRowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, "null");
6563

66-
ArrayType arrayType = (ArrayType) type;
67-
ObjectNode objectNode = mapper.createObjectNode();
68-
JsonNode convert =
64+
var arrayType = (ArrayType) type;
65+
var objectNode = mapper.createObjectNode();
66+
var convert =
6967
rowDataToJsonConverter
7068
.createConverter(arrayType.getElementType())
7169
.convert(mapper, objectNode, val);
7270

73-
PGobject pgObject = new PGobject();
71+
var pgObject = new PGobject();
7472
pgObject.setType("json");
7573
pgObject.setValue(convert.toString());
7674
statement.setObject(index, pgObject);

0 commit comments

Comments
 (0)