Skip to content

Commit 19f459e

Browse files
committed
Merge branch 'main' of github.com:DataSQRL/flink-jar-runner into safe-kafka-connectors
2 parents 63aec7a + 26ee92c commit 19f459e

File tree

46 files changed

+395
-471
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+395
-471
lines changed

.github/workflows/deploy.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ jobs:
3232
uses: actions/setup-java@v3
3333
with:
3434
distribution: 'temurin'
35-
java-version: '11'
35+
java-version: '17'
3636

3737
- name: Import GPG Key
3838
if: github.event_name != 'pull_request'

.github/workflows/uber-jar.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ jobs:
3636
uses: actions/setup-java@v3
3737
with:
3838
distribution: 'temurin'
39-
java-version: '11'
39+
java-version: '17'
4040

4141
- name: Generate settings.xml
4242
run: |

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlBaseJdbcRowConverter.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public SqrlBaseJdbcRowConverter(RowType rowType) {
4848
protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
4949
JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) {
5050
if (type.getTypeRoot() == TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
51-
int timestampWithTimezone = Types.TIMESTAMP_WITH_TIMEZONE;
51+
var timestampWithTimezone = Types.TIMESTAMP_WITH_TIMEZONE;
5252
return (val, index, statement) -> {
5353
if (val == null || val.isNullAt(index) || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
5454
statement.setNull(index, timestampWithTimezone);
@@ -62,15 +62,15 @@ protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
6262

6363
@Override
6464
public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
65-
LogicalTypeRoot root = type.getTypeRoot();
65+
var root = type.getTypeRoot();
6666

6767
if (root == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
6868
return val ->
69-
val instanceof LocalDateTime
70-
? TimestampData.fromLocalDateTime((LocalDateTime) val)
69+
val instanceof LocalDateTime ldt
70+
? TimestampData.fromLocalDateTime(ldt)
7171
: TimestampData.fromTimestamp((Timestamp) val);
7272
} else if (root == LogicalTypeRoot.ARRAY) {
73-
ArrayType arrayType = (ArrayType) type;
73+
var arrayType = (ArrayType) type;
7474
return createArrayConverter(arrayType);
7575
} else if (root == LogicalTypeRoot.ROW) {
7676
return val -> val;
@@ -83,16 +83,15 @@ public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
8383

8484
@Override
8585
protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
86-
switch (type.getTypeRoot()) {
87-
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
88-
final int tsPrecision = ((LocalZonedTimestampType) type).getPrecision();
89-
return (val, index, statement) ->
86+
return switch (type.getTypeRoot()) {
87+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE -> {
88+
final var tsPrecision = ((LocalZonedTimestampType) type).getPrecision();
89+
yield (val, index, statement) ->
9090
statement.setTimestamp(index, val.getTimestamp(index, tsPrecision).toTimestamp());
91-
case MULTISET:
92-
case RAW:
93-
default:
94-
return super.createExternalConverter(type);
95-
}
91+
}
92+
case MULTISET, RAW -> super.createExternalConverter(type);
93+
default -> super.createExternalConverter(type);
94+
};
9695
}
9796

9897
@SneakyThrows
@@ -101,10 +100,10 @@ private void createSqlArrayObject(
101100
// Scalar arrays of any dimension are one array call
102101
if (isScalarArray(type)) {
103102
Object[] boxed;
104-
if (data instanceof GenericArrayData) {
105-
boxed = ((GenericArrayData) data).toObjectArray();
106-
} else if (data instanceof BinaryArrayData) {
107-
boxed = ((BinaryArrayData) data).toObjectArray(getBaseFlinkArrayType(type));
103+
if (data instanceof GenericArrayData arrayData) {
104+
boxed = arrayData.toObjectArray();
105+
} else if (data instanceof BinaryArrayData arrayData) {
106+
boxed = arrayData.toObjectArray(getBaseFlinkArrayType(type));
108107
} else {
109108
throw new RuntimeException("Unsupported ArrayData type: " + data.getClass());
110109
}

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlJdbcDynamicTableFactory.java

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,14 @@ public class SqrlJdbcDynamicTableFactory implements DynamicTableSinkFactory {
7171

7272
@Override
7373
public DynamicTableSink createDynamicTableSink(Context context) {
74-
final FactoryUtil.TableFactoryHelper helper =
75-
FactoryUtil.createTableFactoryHelper(this, context);
76-
final ReadableConfig config = helper.getOptions();
74+
final var helper = FactoryUtil.createTableFactoryHelper(this, context);
75+
final var config = helper.getOptions();
7776

7877
helper.validate();
7978
validateConfigOptions(config, context.getClassLoader());
8079
validateDataTypeWithJdbcDialect(
8180
context.getPhysicalRowDataType(), config.get(URL), context.getClassLoader());
82-
InternalJdbcConnectionOptions jdbcOptions = getJdbcOptions(config, context.getClassLoader());
81+
var jdbcOptions = getJdbcOptions(config, context.getClassLoader());
8382

8483
return new JdbcDynamicTableSink(
8584
jdbcOptions,
@@ -91,15 +90,15 @@ public DynamicTableSink createDynamicTableSink(Context context) {
9190

9291
private static void validateDataTypeWithJdbcDialect(
9392
DataType dataType, String url, ClassLoader classLoader) {
94-
JdbcDialect dialect = loadDialect(url, classLoader);
93+
var dialect = loadDialect(url, classLoader);
9594

9695
dialect.validate((RowType) dataType.getLogicalType());
9796
}
9897

9998
private InternalJdbcConnectionOptions getJdbcOptions(
10099
ReadableConfig readableConfig, ClassLoader classLoader) {
101-
final String url = readableConfig.get(URL);
102-
final InternalJdbcConnectionOptions.Builder builder =
100+
final var url = readableConfig.get(URL);
101+
final var builder =
103102
InternalJdbcConnectionOptions.builder()
104103
.setClassLoader(classLoader)
105104
.setDBUrl(url)
@@ -116,7 +115,7 @@ private InternalJdbcConnectionOptions getJdbcOptions(
116115
}
117116

118117
private static JdbcDialect loadDialect(String url, ClassLoader classLoader) {
119-
JdbcDialect dialect = JdbcDialectLoader.load(url, classLoader);
118+
var dialect = JdbcDialectLoader.load(url, classLoader);
120119
// sqrl: standard postgres dialect with extended dialect
121120
if (dialect.dialectName().equalsIgnoreCase("PostgreSQL")) {
122121
return new SqrlPostgresDialect();
@@ -125,7 +124,7 @@ private static JdbcDialect loadDialect(String url, ClassLoader classLoader) {
125124
}
126125

127126
private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
128-
final JdbcExecutionOptions.Builder builder = new JdbcExecutionOptions.Builder();
127+
final var builder = new JdbcExecutionOptions.Builder();
129128
builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
130129
builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
131130
builder.withMaxRetries(config.get(SINK_MAX_RETRIES));
@@ -135,7 +134,7 @@ private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
135134
private JdbcDmlOptions getJdbcDmlOptions(
136135
InternalJdbcConnectionOptions jdbcOptions, DataType dataType, int[] primaryKeyIndexes) {
137136

138-
String[] keyFields =
137+
var keyFields =
139138
Arrays.stream(primaryKeyIndexes)
140139
.mapToObj(i -> DataType.getFieldNames(dataType).get(i))
141140
.toArray(String[]::new);
@@ -209,7 +208,7 @@ public Set<ConfigOption<?>> forwardOptions() {
209208
}
210209

211210
private void validateConfigOptions(ReadableConfig config, ClassLoader classLoader) {
212-
String jdbcUrl = config.get(URL);
211+
var jdbcUrl = config.get(URL);
213212
// JdbcDialectLoader.load(jdbcUrl, classLoader);
214213

215214
checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD});
@@ -229,50 +228,47 @@ private void validateConfigOptions(ReadableConfig config, ClassLoader classLoade
229228
long upperBound = config.get(SCAN_PARTITION_UPPER_BOUND);
230229
if (lowerBound > upperBound) {
231230
throw new IllegalArgumentException(
232-
String.format(
233-
"'%s'='%s' must not be larger than '%s'='%s'.",
234-
SCAN_PARTITION_LOWER_BOUND.key(),
235-
lowerBound,
236-
SCAN_PARTITION_UPPER_BOUND.key(),
237-
upperBound));
231+
"'%s'='%s' must not be larger than '%s'='%s'."
232+
.formatted(
233+
SCAN_PARTITION_LOWER_BOUND.key(),
234+
lowerBound,
235+
SCAN_PARTITION_UPPER_BOUND.key(),
236+
upperBound));
238237
}
239238
}
240239

241240
checkAllOrNone(config, new ConfigOption[] {LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL});
242241

243242
if (config.get(LOOKUP_MAX_RETRIES) < 0) {
244243
throw new IllegalArgumentException(
245-
String.format(
246-
"The value of '%s' option shouldn't be negative, but is %s.",
247-
LOOKUP_MAX_RETRIES.key(), config.get(LOOKUP_MAX_RETRIES)));
244+
"The value of '%s' option shouldn't be negative, but is %s."
245+
.formatted(LOOKUP_MAX_RETRIES.key(), config.get(LOOKUP_MAX_RETRIES)));
248246
}
249247

250248
if (config.get(SINK_MAX_RETRIES) < 0) {
251249
throw new IllegalArgumentException(
252-
String.format(
253-
"The value of '%s' option shouldn't be negative, but is %s.",
254-
SINK_MAX_RETRIES.key(), config.get(SINK_MAX_RETRIES)));
250+
"The value of '%s' option shouldn't be negative, but is %s."
251+
.formatted(SINK_MAX_RETRIES.key(), config.get(SINK_MAX_RETRIES)));
255252
}
256253

257254
if (config.get(MAX_RETRY_TIMEOUT).getSeconds() <= 0) {
258255
throw new IllegalArgumentException(
259-
String.format(
260-
"The value of '%s' option must be in second granularity and shouldn't be smaller than 1 second, but is %s.",
261-
MAX_RETRY_TIMEOUT.key(),
262-
config.get(
263-
ConfigOptions.key(MAX_RETRY_TIMEOUT.key()).stringType().noDefaultValue())));
256+
"The value of '%s' option must be in second granularity and shouldn't be smaller than 1 second, but is %s."
257+
.formatted(
258+
MAX_RETRY_TIMEOUT.key(),
259+
config.get(
260+
ConfigOptions.key(MAX_RETRY_TIMEOUT.key()).stringType().noDefaultValue())));
264261
}
265262
}
266263

267264
private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptions) {
268-
int presentCount = 0;
265+
var presentCount = 0;
269266
for (ConfigOption configOption : configOptions) {
270267
if (config.getOptional(configOption).isPresent()) {
271268
presentCount++;
272269
}
273270
}
274-
String[] propertyNames =
275-
Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
271+
var propertyNames = Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
276272
Preconditions.checkArgument(
277273
configOptions.length == presentCount || presentCount == 0,
278274
"Either all or none of the following options should be provided:\n"

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresDialect.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.datasqrl.connector.postgresql.jdbc;
1717

18+
import java.io.Serial;
1819
import java.util.Arrays;
1920
import java.util.EnumSet;
2021
import java.util.List;
@@ -36,7 +37,7 @@
3637
*/
3738
public class SqrlPostgresDialect extends AbstractDialect {
3839

39-
private static final long serialVersionUID = 1L;
40+
@Serial private static final long serialVersionUID = 1L;
4041

4142
// Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs:
4243
// https://www.postgresql.org/docs/12/datatype-datetime.html
@@ -67,9 +68,9 @@ public Optional<String> defaultDriverName() {
6768
@Override
6869
public Optional<String> getUpsertStatement(
6970
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
70-
String uniqueColumns =
71+
var uniqueColumns =
7172
Arrays.stream(uniqueKeyFields).map(this::quoteIdentifier).collect(Collectors.joining(", "));
72-
String updateClause =
73+
var updateClause =
7374
Arrays.stream(fieldNames)
7475
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
7576
.collect(Collectors.joining(", "));
@@ -93,8 +94,8 @@ public void validate(RowType rowType) throws ValidationException {
9394

9495
if (!unsupportedTypes.isEmpty()) {
9596
throw new ValidationException(
96-
String.format(
97-
"The %s dialect doesn't support type: %s.", this.dialectName(), unsupportedTypes));
97+
"The %s dialect doesn't support type: %s."
98+
.formatted(this.dialectName(), unsupportedTypes));
9899
}
99100

100101
super.validate(rowType);

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresRowConverter.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.datasqrl.connector.postgresql.jdbc;
1717

1818
import com.datasqrl.connector.postgresql.type.JdbcTypeSerializer;
19+
import java.io.Serial;
1920
import java.lang.reflect.Type;
2021
import java.util.Map;
2122
import java.util.ServiceLoader;
@@ -35,7 +36,7 @@
3536
*/
3637
public class SqrlPostgresRowConverter extends SqrlBaseJdbcRowConverter {
3738

38-
private static final long serialVersionUID = 1L;
39+
@Serial private static final long serialVersionUID = 1L;
3940

4041
public static final Map<
4142
Type, JdbcTypeSerializer<JdbcDeserializationConverter, JdbcSerializationConverter>>
@@ -98,21 +99,18 @@ public JdbcDeserializationConverter createArrayConverter(ArrayType arrayType) {
9899
// primitive byte arrays
99100
final Class<?> elementClass =
100101
LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
101-
final JdbcDeserializationConverter elementConverter =
102-
createNullableInternalConverter(arrayType.getElementType());
102+
final var elementConverter = createNullableInternalConverter(arrayType.getElementType());
103103
return val -> {
104104
// sqrl: check if scalar array
105105

106106
Object[] in;
107-
if (val instanceof PgArray) {
108-
PgArray pgArray = (PgArray) val;
107+
if (val instanceof PgArray pgArray) {
109108
in = (Object[]) pgArray.getArray();
110109
} else {
111110
in = (Object[]) val;
112111
}
113-
final Object[] array =
114-
(Object[]) java.lang.reflect.Array.newInstance(elementClass, in.length);
115-
for (int i = 0; i < in.length; i++) {
112+
final var array = (Object[]) java.lang.reflect.Array.newInstance(elementClass, in.length);
113+
for (var i = 0; i < in.length; i++) {
116114
array[i] = elementConverter.deserialize(in[i]);
117115
}
118116
return new GenericArrayData(array);

connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/FlinkArrayTypeUtil.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,42 +21,41 @@
2121
public class FlinkArrayTypeUtil {
2222

2323
public static LogicalType getBaseFlinkArrayType(LogicalType type) {
24-
if (type instanceof ArrayType) {
25-
return getBaseFlinkArrayType(((ArrayType) type).getElementType());
24+
if (type instanceof ArrayType arrayType) {
25+
return getBaseFlinkArrayType(arrayType.getElementType());
2626
}
2727
return type;
2828
}
2929

3030
public static boolean isScalarArray(LogicalType type) {
31-
if (type instanceof ArrayType) {
32-
LogicalType elementType = ((ArrayType) type).getElementType();
31+
if (type instanceof ArrayType arrayType) {
32+
var elementType = arrayType.getElementType();
3333
return isScalar(elementType) || isScalarArray(elementType);
3434
}
3535
return false;
3636
}
3737

3838
public static boolean isScalar(LogicalType type) {
39-
switch (type.getTypeRoot()) {
40-
case BOOLEAN:
41-
case TINYINT:
42-
case SMALLINT:
43-
case INTEGER:
44-
case BIGINT:
45-
case FLOAT:
46-
case DOUBLE:
47-
case CHAR:
48-
case VARCHAR:
49-
case BINARY:
50-
case VARBINARY:
51-
case DATE:
52-
case TIME_WITHOUT_TIME_ZONE:
53-
case TIMESTAMP_WITH_TIME_ZONE:
54-
case TIMESTAMP_WITHOUT_TIME_ZONE:
55-
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
56-
case DECIMAL:
57-
return true;
58-
default:
59-
return false;
60-
}
39+
return switch (type.getTypeRoot()) {
40+
case BOOLEAN,
41+
TINYINT,
42+
SMALLINT,
43+
INTEGER,
44+
BIGINT,
45+
FLOAT,
46+
DOUBLE,
47+
CHAR,
48+
VARCHAR,
49+
BINARY,
50+
VARBINARY,
51+
DATE,
52+
TIME_WITHOUT_TIME_ZONE,
53+
TIMESTAMP_WITH_TIME_ZONE,
54+
TIMESTAMP_WITHOUT_TIME_ZONE,
55+
TIMESTAMP_WITH_LOCAL_TIME_ZONE,
56+
DECIMAL ->
57+
true;
58+
default -> false;
59+
};
6160
}
6261
}

0 commit comments

Comments
 (0)