diff --git a/docs/content.zh/docs/connectors/datastream/jdbc.md b/docs/content.zh/docs/connectors/datastream/jdbc.md
index a57cf0bbe..6447cd0ac 100644
--- a/docs/content.zh/docs/connectors/datastream/jdbc.md
+++ b/docs/content.zh/docs/connectors/datastream/jdbc.md
@@ -84,6 +84,60 @@ env.from_collection(
env.execute()
```
+### 支持字符串分片读取
+
+
+
+ Support String Split Database |
+ Slice Read String Grammar |
+
+
+
+
+ MySQL |
+ ABS(MD5(`column`) % `number_of_partitions`) |
+
+
+ Oracle |
+ MOD(ORA_HASH(`column`) , `number_of_partitions`) |
+
+ PostgreSQL |
+ (ABS(HASHTEXT(`column`)) % `number_of_partitions`) |
+
+
+ MS SQL Server |
+ ABS(HASHBYTES('MD5', `column`) % `number_of_partitions`) |
+
+
+
+
+
+```java
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ Serializable[][] queryParameters = new Long[3][1];
+ queryParameters[0] = new Long[]{0L};
+ queryParameters[1] = new Long[]{1L};
+ queryParameters[2] = new Long[]{2L};
+
+ TypeInformation>[] fieldTypes = new TypeInformation>[]{
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO};
+ RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
+
+ JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername("com.mysql.cj.jdbc.Driver")
+ .setDBUrl("jdbc:mysql://localhost:3306/test")
+ .setUsername("root")
+ .setPassword("12345678")
+ .setPartitionColumnTypeString(true) // 当读取字符串分区键一定设置为true
+ .setQuery("select * from fake_source_sink where ABS(MD5( `name`) % 2 ) = ?") // 根据不同数据库改造SQL语法
+ .setRowTypeInfo(rowTypeInfo).setParametersProvider(new JdbcGenericParameterValuesProvider(queryParameters)).finish();
+ DataStreamSource input = env.createInput(jdbcInputFormat);
+ env.execute();
+```
+
{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content.zh/docs/connectors/table/jdbc.md b/docs/content.zh/docs/connectors/table/jdbc.md
index cf4e63842..eebbc3430 100644
--- a/docs/content.zh/docs/connectors/table/jdbc.md
+++ b/docs/content.zh/docs/connectors/table/jdbc.md
@@ -353,12 +353,57 @@ ON myTopic.key = MyUserTable.id;
为了在并行 `Source` task 实例中加速读取数据,Flink 为 JDBC table 提供了分区扫描的特性。
如果下述分区扫描参数中的任一项被指定,则下述所有的分区扫描参数必须都被指定。这些参数描述了在多个 task 并行读取数据时如何对表进行分区。
-`scan.partition.column` 必须是相关表中的数字、日期或时间戳列。注意,`scan.partition.lower-bound` 和 `scan.partition.upper-bound` 用于决定分区的起始位置和过滤表中的数据。如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。
+`scan.partition.column` 必须是相关表中的数字、日期或时间戳列、字符串类型。注意,`scan.partition.lower-bound` 和 `scan.partition.upper-bound` 用于决定分区的起始位置和过滤表中的数据。如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。
- `scan.partition.column`:输入用于进行分区的列名。
- `scan.partition.num`:分区数。
- `scan.partition.lower-bound`:第一个分区的最小值。
- `scan.partition.upper-bound`:最后一个分区的最大值。
+#### 支持字符串分片读取
+
+
+
+ Support String Split Database |
+ Slice Read String Grammar |
+
+
+
+
+ MySQL |
+ ABS(MD5(`column`) % `number_of_partitions`) |
+
+
+ Oracle |
+ MOD(ORA_HASH(`column`) , `number_of_partitions`) |
+
+ PostgreSQL |
+ (ABS(HASHTEXT(`column`)) % `number_of_partitions`) |
+
+
+ MS SQL Server |
+ ABS(HASHBYTES('MD5', `column`) % `number_of_partitions`) |
+
+
+
+```SQL
+CREATE TABLE my_split_string_read_table (
+ id int,
+ name STRING,
+ age INT,
+ email STRING,
+ PRIMARY KEY (id) NOT ENFORCED
+) WITH (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://localhost:3306/test',
+ 'table-name' = 'mysql_table_name',
+ 'username'='root',
+ 'password'='12345678',
+ 'scan.partition.column'='name',
+ 'scan.partition.num'='3',
+ 'scan.partition.lower-bound'='0', -- 分别获取到 ABS(MD5(`name`) % 3) 0,1,2 的全表数据分区数据3
+ 'scan.partition.upper-bound'='2'
+)
+```
diff --git a/docs/content/docs/connectors/datastream/jdbc.md b/docs/content/docs/connectors/datastream/jdbc.md
index 41e46fb86..df3ae0b0b 100644
--- a/docs/content/docs/connectors/datastream/jdbc.md
+++ b/docs/content/docs/connectors/datastream/jdbc.md
@@ -64,6 +64,62 @@ JdbcSink.sink(
{{< /tab >}}
{{< /tabs >}}
+
+#### 分片读取 字符串列 目前只支持 部分数据库
+
+
+
+ Support String Split Database |
+ Slice Read String Grammar |
+
+
+
+
+ MySQL |
+ ABS(MD5(`column`) % `number_of_partitions`) |
+
+
+ Oracle |
+ MOD(ORA_HASH(`column`) , `number_of_partitions`) |
+
+ PostgreSQL |
+ (ABS(HASHTEXT(`column`)) % `number_of_partitions`) |
+
+
+ MS SQL Server |
+ ABS(HASHBYTES('MD5', `column`) % `number_of_partitions`) |
+
+
+
+
+### Supports Fragment Read String
+
+```java
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ Serializable[][] queryParameters = new Long[3][1];
+ queryParameters[0] = new Long[]{0L};
+ queryParameters[1] = new Long[]{1L};
+ queryParameters[2] = new Long[]{2L};
+
+ TypeInformation>[] fieldTypes = new TypeInformation>[]{
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO};
+ RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
+
+ JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername("com.mysql.cj.jdbc.Driver")
+ .setDBUrl("jdbc:mysql://localhost:3306/test")
+ .setUsername("root")
+ .setPassword("12345678")
+ .setPartitionColumnTypeString(true) // When reading string partitioning key must be set to true
+ .setQuery("select * from fake_source_sink where ABS(MD5( `name`) % 2 ) = ?") // Modify SQL syntax for different databases
+ .setRowTypeInfo(rowTypeInfo).setParametersProvider(new JdbcGenericParameterValuesProvider(queryParameters)).finish();
+ DataStreamSource input = env.createInput(jdbcInputFormat);
+ env.execute();
+```
+
### SQL DML statement and JDBC statement builder
The sink builds one [JDBC prepared statement](https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/PreparedStatement.html) from a user-provider SQL string, e.g.:
diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md
index 056f2ab59..13b88a4d7 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -366,13 +366,59 @@ See [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) for
To accelerate reading data in parallel `Source` task instances, Flink provides partitioned scan feature for JDBC table.
All the following scan partition options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple tasks.
-The `scan.partition.column` must be a numeric, date, or timestamp column from the table in question. Notice that `scan.partition.lower-bound` and `scan.partition.upper-bound` are used to decide the partition stride and filter the rows in table. If it is a batch job, it also doable to get the max and min value first before submitting the flink job.
+The `scan.partition.column` must be a numeric, date, or timestamp or string column from the table in question. Notice that `scan.partition.lower-bound` and `scan.partition.upper-bound` are used to decide the partition stride and filter the rows in table. If it is a batch job, it also doable to get the max and min value first before submitting the flink job.
- `scan.partition.column`: The column name used for partitioning the input.
- `scan.partition.num`: The number of partitions.
- `scan.partition.lower-bound`: The smallest value of the first partition.
- `scan.partition.upper-bound`: The largest value of the last partition.
+#### Support String Fragment Reading
+
+
+
+ Support String Split Database |
+ Slice Read String Grammar |
+
+
+
+
+ MySQL |
+ ABS(MD5(`column`) % `number_of_partitions`) |
+
+
+ Oracle |
+ MOD(ORA_HASH(`column`) , `number_of_partitions`) |
+
+ PostgreSQL |
+ (ABS(HASHTEXT(`column`)) % `number_of_partitions`) |
+
+
+ MS SQL Server |
+ ABS(HASHBYTES('MD5', `column`) % `number_of_partitions`) |
+
+
+
+```SQL
+CREATE TABLE my_split_string_read_table (
+ id int,
+ name STRING,
+ age INT,
+ email STRING,
+ PRIMARY KEY (id) NOT ENFORCED
+) WITH (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://localhost:3306/test',
+ 'table-name' = 'mysql_table_name',
+ 'username'='root',
+ 'password'='12345678',
+ 'scan.partition.column'='name',
+ 'scan.partition.num'='3',
+ 'scan.partition.lower-bound'='0', -- Obtain ABS(MD5(' name ') % 3) 0,1,2 Full table data partition data 3
+ 'scan.partition.upper-bound'='2'
+)
+```
+
### Lookup Cache
JDBC connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
index e99c464d6..e0d9977c7 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
@@ -121,6 +121,7 @@ public class JdbcInputFormat extends RichInputFormat
protected boolean hasNext;
protected Object[][] parameterValues;
+ protected boolean isPartitionColumnTypeString;
public JdbcInputFormat() {}
@@ -190,7 +191,11 @@ public void closeInputFormat() {
public void open(InputSplit inputSplit) throws IOException {
try {
if (inputSplit != null && parameterValues != null) {
- for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
+ int parameterLength =
+ isPartitionColumnTypeString
+ ? 1
+ : parameterValues[inputSplit.getSplitNumber()].length;
+ for (int i = 0; i < parameterLength; i++) {
Object param = parameterValues[inputSplit.getSplitNumber()][i];
if (param instanceof String) {
statement.setString(i + 1, (String) param);
@@ -265,7 +270,7 @@ public void close() throws IOException {
* Checks whether all data has been read.
*
* @return boolean value indication whether all data has been read.
- * @throws IOException
+ * @throws IOException if an I/O error occurs
*/
@Override
public boolean reachedEnd() throws IOException {
@@ -393,6 +398,12 @@ public JdbcInputFormatBuilder setParametersProvider(
return this;
}
+ public JdbcInputFormatBuilder setPartitionColumnTypeString(
+ boolean partitionColumnTypeString) {
+ format.isPartitionColumnTypeString = partitionColumnTypeString;
+ return this;
+ }
+
public JdbcInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) {
format.rowTypeInfo = rowTypeInfo;
return this;
@@ -425,6 +436,7 @@ public JdbcInputFormat finish() {
if (format.parameterValues == null) {
LOG.debug("No input splitting configured (data will be read with parallelism 1).");
}
+
return format;
}
}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java
index e617a9fcc..6fbd58949 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java
@@ -56,6 +56,14 @@ public String quoteIdentifier(String identifier) {
return identifier;
}
+ @Override
+ public String hashModForField(String fieldName, int numPartitions) {
+ throw new IllegalArgumentException(
+ "The Db2 database itself is not supported by the hash md5 syntax "
+ + fieldName
+ + "Cannot be read in fragments");
+ }
+
@Override
public String dialectName() {
return "Db2";
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java
index dad028f07..1b33e40fe 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java
@@ -59,6 +59,14 @@ public String quoteIdentifier(String identifier) {
return identifier;
}
+ @Override
+ public String hashModForField(String fieldName, int numPartitions) {
+ throw new IllegalArgumentException(
+ "The Derby database itself is not supported by the hash md5 syntax "
+ + fieldName
+ + "Cannot be read in fragments");
+ }
+
@Override
public String dialectName() {
return "Derby";
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java
index 1ed061fee..9762dfc0e 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java
@@ -71,6 +71,11 @@ public String quoteIdentifier(String identifier) {
return identifier;
}
+ @Override
+ public String hashModForField(String fieldName, int numPartitions) {
+ return "MOD(ORA_HASH(" + quoteIdentifier(fieldName) + ")," + numPartitions + ")";
+ }
+
@Override
public Optional getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
index d0924aee8..af733d210 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
@@ -40,6 +40,11 @@ public Optional defaultDriverName() {
return Optional.of("org.postgresql.Driver");
}
+ @Override
+ public String hashModForField(String fieldName, int numPartitions) {
+ return "(ABS(HASHTEXT(" + quoteIdentifier(fieldName) + ")) % " + numPartitions + ")";
+ }
+
@Override
public String dialectName() {
return "PostgreSQL";
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java
index 91469a6b3..1a9fa75b3 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java
@@ -67,6 +67,11 @@ public String quoteIdentifier(String identifier) {
return identifier;
}
+ @Override
+ public String hashModForField(String fieldName, int numPartitions) {
+ return "ABS(HASHBYTES('MD5', " + quoteIdentifier(fieldName) + ") % " + numPartitions + ")";
+ }
+
@Override
public Optional getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java
index 041d24c33..dd36cdd5d 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java
@@ -64,6 +64,14 @@ public String dialectName() {
return "Trino";
}
+ @Override
+ public String hashModForField(String fieldName, int numPartitions) {
+ throw new IllegalArgumentException(
+ "The Trino database itself is not supported by the hash md5 syntax "
+ + fieldName
+ + "Cannot be read in fragments");
+ }
+
@Override
public String quoteIdentifier(String identifier) {
return identifier;
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
index 6cc6bbd57..88be55aed 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
@@ -84,6 +84,17 @@ default Optional defaultDriverName() {
*/
String quoteIdentifier(String identifier);
+ /**
+ * Computes the Hash value of the string.
+ *
+ * @param fieldName jdbc partition reads column names.
+ * @param numPartitions Set the number of partitions to be read.
+ * @return the quoted identifier.
+ */
+ default String hashModForField(String fieldName, int numPartitions) {
+ return " ABS(MD5(" + quoteIdentifier(fieldName) + ") % " + numPartitions + ")";
+ }
+
/**
* Constructs the dialects upsert statement if supported; such as MySQL's {@code DUPLICATE KEY
* UPDATE}, or PostgreSQL's {@code ON CONFLICT... DO UPDATE SET..}. If supported, the returned
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
index 064eb2a48..50e6d2a84 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
@@ -27,6 +27,7 @@ public class JdbcReadOptions implements Serializable {
private final String query;
private final String partitionColumnName;
+ private final Boolean isPartitionColumnTypeString;
private final Long partitionLowerBound;
private final Long partitionUpperBound;
private final Integer numPartitions;
@@ -37,6 +38,7 @@ public class JdbcReadOptions implements Serializable {
private JdbcReadOptions(
String query,
String partitionColumnName,
+ Boolean isPartitionColumnTypeString,
Long partitionLowerBound,
Long partitionUpperBound,
Integer numPartitions,
@@ -44,6 +46,7 @@ private JdbcReadOptions(
boolean autoCommit) {
this.query = query;
this.partitionColumnName = partitionColumnName;
+ this.isPartitionColumnTypeString = isPartitionColumnTypeString;
this.partitionLowerBound = partitionLowerBound;
this.partitionUpperBound = partitionUpperBound;
this.numPartitions = numPartitions;
@@ -60,6 +63,10 @@ public Optional getPartitionColumnName() {
return Optional.ofNullable(partitionColumnName);
}
+ public Optional getPartitionColumnTypeString() {
+ return Optional.ofNullable(isPartitionColumnTypeString);
+ }
+
public Optional getPartitionLowerBound() {
return Optional.ofNullable(partitionLowerBound);
}
@@ -104,6 +111,7 @@ public boolean equals(Object o) {
public static class Builder {
protected String query;
protected String partitionColumnName;
+ protected Boolean isPartitionColumnTypeString;
protected Long partitionLowerBound;
protected Long partitionUpperBound;
protected Integer numPartitions;
@@ -123,6 +131,10 @@ public Builder setPartitionColumnName(String partitionColumnName) {
return this;
}
+ public void setPartitionColumnTypeString(Boolean partitionColumnTypeString) {
+ isPartitionColumnTypeString = partitionColumnTypeString;
+ }
+
/** optional, the smallest value of the first partition. */
public Builder setPartitionLowerBound(long partitionLowerBound) {
this.partitionLowerBound = partitionLowerBound;
@@ -163,6 +175,7 @@ public JdbcReadOptions build() {
return new JdbcReadOptions(
query,
partitionColumnName,
+ isPartitionColumnTypeString,
partitionLowerBound,
partitionUpperBound,
numPartitions,
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java
index eebeac5ab..8f826270d 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java
@@ -30,9 +30,12 @@ public class CompositeJdbcParameterValuesProvider implements JdbcParameterValues
JdbcParameterValuesProvider b;
public CompositeJdbcParameterValuesProvider(
- JdbcParameterValuesProvider a, JdbcParameterValuesProvider b) {
+ JdbcParameterValuesProvider a,
+ JdbcParameterValuesProvider b,
+ boolean isPartitionColumnTypeString) {
Preconditions.checkArgument(
- a.getParameterValues().length == b.getParameterValues().length,
+ isPartitionColumnTypeString
+ || a.getParameterValues().length == b.getParameterValues().length,
"Both JdbcParameterValuesProvider should have the same length.");
this.a = a;
this.b = b;
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java
index 75356e501..cc8b14618 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java
@@ -44,6 +44,8 @@ public class JdbcNumericBetweenParametersProvider implements JdbcParameterValues
private final long minVal;
private final long maxVal;
+ private boolean isPartitionColumnTypeString;
+
private long batchSize;
private int batchNum;
@@ -52,11 +54,18 @@ public class JdbcNumericBetweenParametersProvider implements JdbcParameterValues
*
* @param minVal the lower bound of the produced "from" values
* @param maxVal the upper bound of the produced "to" values
+ * @param isPartitionColumnTypeString Whether to use string types as slices to read fields
*/
- public JdbcNumericBetweenParametersProvider(long minVal, long maxVal) {
+ public JdbcNumericBetweenParametersProvider(
+ long minVal, long maxVal, boolean isPartitionColumnTypeString) {
Preconditions.checkArgument(minVal <= maxVal, "minVal must not be larger than maxVal");
+ Preconditions.checkArgument(
+ !isPartitionColumnTypeString
+ || (minVal < Integer.MAX_VALUE && maxVal < Integer.MAX_VALUE),
+ "When using a string as a shard field to set the upper and lower bounds lowerBound, upperBound exceeds the INT storage range, and the current logic uses the hash for each value within each shard boundary value");
this.minVal = minVal;
this.maxVal = maxVal;
+ this.isPartitionColumnTypeString = isPartitionColumnTypeString;
}
/**
@@ -87,6 +96,9 @@ public JdbcNumericBetweenParametersProvider ofBatchSize(long batchSize) {
public JdbcNumericBetweenParametersProvider ofBatchNum(int batchNum) {
Preconditions.checkArgument(batchNum > 0, "Batch number must be positive");
+ Preconditions.checkArgument(
+ !isPartitionColumnTypeString || (minVal >= 0),
+ "Ensure that lowerBound is greater than or equal to 0 and lowerBound, upperBound, is less than or equal to the number of partitions");
long maxElemCount = (maxVal - minVal) + 1;
if (batchNum > maxElemCount) {
@@ -102,16 +114,21 @@ public Serializable[][] getParameterValues() {
Preconditions.checkState(
batchSize > 0,
"Batch size and batch number must be positive. Have you called `ofBatchSize` or `ofBatchNum`?");
-
+ Serializable[][] parameters = new Serializable[batchNum][2];
long maxElemCount = (maxVal - minVal) + 1;
long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
-
- Serializable[][] parameters = new Serializable[batchNum][2];
- long start = minVal;
- for (int i = 0; i < batchNum; i++) {
- long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
- parameters[i] = new Long[] {start, end};
- start = end + 1;
+ if (!isPartitionColumnTypeString) {
+ long start = minVal;
+ for (int i = 0; i < batchNum; i++) {
+ long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
+ parameters[i] = new Long[] {start, end};
+ start = end + 1;
+ }
+ } else {
+ for (int i = 0; i <= maxVal - minVal; i++) {
+ Long hashValue = minVal + i;
+ parameters[i] = new Long[] {hashValue, hashValue};
+ }
}
return parameters;
}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
index 3eecfa154..b76c07684 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
@@ -37,7 +37,9 @@
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
@@ -45,6 +47,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -115,6 +118,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
helper.validate();
validateConfigOptions(config, context.getClassLoader());
+ DataType physicalRowDataType = context.getPhysicalRowDataType();
validateDataTypeWithJdbcDialect(
context.getPhysicalRowDataType(),
config.get(URL),
@@ -122,7 +126,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
context.getClassLoader());
return new JdbcDynamicTableSource(
getJdbcOptions(helper.getOptions(), context.getClassLoader()),
- getJdbcReadOptions(helper.getOptions()),
+ getJdbcReadOptions(helper.getOptions(), physicalRowDataType),
helper.getOptions().get(LookupOptions.MAX_RETRIES),
getLookupCache(config),
context.getPhysicalRowDataType());
@@ -156,12 +160,24 @@ private InternalJdbcConnectionOptions getJdbcOptions(
return builder.build();
}
- private JdbcReadOptions getJdbcReadOptions(ReadableConfig readableConfig) {
+ private JdbcReadOptions getJdbcReadOptions(
+ ReadableConfig readableConfig, DataType physicalRowDataType) {
final Optional partitionColumnName =
readableConfig.getOptional(SCAN_PARTITION_COLUMN);
+ RowType logicalType = (RowType) physicalRowDataType.getLogicalType();
+ Map fieldTypeMap =
+ logicalType.getFields().stream()
+ .collect(
+ Collectors.toMap(
+ rowField -> rowField.getName(),
+ rowField -> rowField.getType()));
final JdbcReadOptions.Builder builder = JdbcReadOptions.builder();
if (partitionColumnName.isPresent()) {
- builder.setPartitionColumnName(partitionColumnName.get());
+ String partitionName = partitionColumnName.get();
+ Boolean isPartitionColumnTypeString =
+ fieldTypeMap.get(partitionName) instanceof VarCharType ? true : false;
+ builder.setPartitionColumnName(partitionName);
+ builder.setPartitionColumnTypeString(isPartitionColumnTypeString);
builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
builder.setPartitionUpperBound(readableConfig.get(SCAN_PARTITION_UPPER_BOUND));
builder.setNumPartitions(readableConfig.get(SCAN_PARTITION_NUM));
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index c8ef2e334..c182e38b6 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -139,25 +139,42 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
options.getTableName(),
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
new String[0]);
- final List predicates = new ArrayList();
+ final List predicates = new ArrayList<>();
if (readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = readOptions.getPartitionLowerBound().get();
long upperBound = readOptions.getPartitionUpperBound().get();
int numPartitions = readOptions.getNumPartitions().get();
- Serializable[][] allPushdownParams = replicatePushdownParamsForN(numPartitions);
+ Serializable[][] allPushDownParams = replicatePushdownParamsForN(numPartitions);
+ // Get here whether the type of the external configuration partition key is a string
+ boolean isPartitionColumnTypeString =
+ readOptions.getPartitionColumnTypeString().get().booleanValue();
JdbcParameterValuesProvider allParams =
new CompositeJdbcParameterValuesProvider(
- new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)
+ new JdbcNumericBetweenParametersProvider(
+ lowerBound, upperBound, isPartitionColumnTypeString)
.ofBatchNum(numPartitions),
- new JdbcGenericParameterValuesProvider(allPushdownParams));
+ new JdbcGenericParameterValuesProvider(allPushDownParams),
+ isPartitionColumnTypeString);
builder.setParametersProvider(allParams);
-
- predicates.add(
- dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
- + " BETWEEN ? AND ?");
+ // Set partition type
+ builder.setPartitionColumnTypeString(isPartitionColumnTypeString);
+
+ String generatePredicates;
+ if (readOptions.getPartitionColumnTypeString().isPresent()
+ && readOptions.getPartitionColumnTypeString().get()) {
+ generatePredicates =
+ dialect.hashModForField(
+ readOptions.getPartitionColumnName().get(), numPartitions)
+ + " = ? ";
+ } else {
+ generatePredicates =
+ dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
+ + " BETWEEN ? AND ?";
+ }
+ predicates.add(generatePredicates);
} else {
builder.setParametersProvider(
new JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
@@ -184,7 +201,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
builder.setRowConverter(dialect.getRowConverter(rowType));
builder.setRowDataTypeInfo(
runtimeProviderContext.createTypeInformation(physicalRowDataType));
-
return InputFormatProvider.of(builder.build());
}
@@ -289,10 +305,10 @@ private Optional parseFilterToPredicate(ResolvedExpressi
}
private Serializable[][] replicatePushdownParamsForN(int n) {
- Serializable[][] allPushdownParams = new Serializable[n][pushdownParams.length];
+ Serializable[][] allPushDownParams = new Serializable[n][pushdownParams.length];
for (int i = 0; i < n; i++) {
- allPushdownParams[i] = this.pushdownParams;
+ allPushDownParams[i] = this.pushdownParams;
}
- return allPushdownParams;
+ return allPushDownParams;
}
}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java
index 76452684a..5ecb7793c 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java
@@ -60,15 +60,16 @@ public class JdbcRowDataInputFormat extends RichInputFormat
private static final long serialVersionUID = 2L;
private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataInputFormat.class);
- private JdbcConnectionProvider connectionProvider;
- private int fetchSize;
- private Boolean autoCommit;
+ private final JdbcConnectionProvider connectionProvider;
+ private final int fetchSize;
+ private final Boolean autoCommit;
private Object[][] parameterValues;
- private String queryTemplate;
- private int resultSetType;
- private int resultSetConcurrency;
- private JdbcRowConverter rowConverter;
- private TypeInformation rowDataTypeInfo;
+ private final boolean isPartitionColumnTypeString;
+ private final String queryTemplate;
+ private final int resultSetType;
+ private final int resultSetConcurrency;
+ private final JdbcRowConverter rowConverter;
+ private final TypeInformation rowDataTypeInfo;
private transient PreparedStatement statement;
private transient ResultSet resultSet;
@@ -79,6 +80,7 @@ private JdbcRowDataInputFormat(
int fetchSize,
Boolean autoCommit,
Object[][] parameterValues,
+ boolean isPartitionColumnTypeString,
String queryTemplate,
int resultSetType,
int resultSetConcurrency,
@@ -88,6 +90,7 @@ private JdbcRowDataInputFormat(
this.fetchSize = fetchSize;
this.autoCommit = autoCommit;
this.parameterValues = parameterValues;
+ this.isPartitionColumnTypeString = isPartitionColumnTypeString;
this.queryTemplate = queryTemplate;
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
@@ -154,7 +157,11 @@ public void closeInputFormat() {
public void open(InputSplit inputSplit) throws IOException {
try {
if (inputSplit != null && parameterValues != null) {
- for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
+ int parameterLength =
+ isPartitionColumnTypeString
+ ? 1
+ : parameterValues[inputSplit.getSplitNumber()].length;
+ for (int i = 0; i < parameterLength; i++) {
Object param = parameterValues[inputSplit.getSplitNumber()][i];
if (param instanceof String) {
statement.setString(i + 1, (String) param);
@@ -298,7 +305,7 @@ public static Builder builder() {
/** Builder for {@link JdbcRowDataInputFormat}. */
public static class Builder {
- private JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
+ private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
private int fetchSize;
private Boolean autoCommit;
private Object[][] parameterValues;
@@ -307,6 +314,7 @@ public static class Builder {
private TypeInformation rowDataTypeInfo;
private int resultSetType = ResultSet.TYPE_FORWARD_ONLY;
private int resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
+ private boolean isPartitionColumnTypeString = false;
public Builder() {
this.connOptionsBuilder = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder();
@@ -376,6 +384,10 @@ public Builder setResultSetConcurrency(int resultSetConcurrency) {
return this;
}
+ public void setPartitionColumnTypeString(boolean partitionColumnTypeString) {
+ isPartitionColumnTypeString = partitionColumnTypeString;
+ }
+
public JdbcRowDataInputFormat build() {
if (this.queryTemplate == null) {
throw new NullPointerException("No query supplied");
@@ -391,6 +403,7 @@ public JdbcRowDataInputFormat build() {
this.fetchSize,
this.autoCommit,
this.parameterValues,
+ this.isPartitionColumnTypeString,
this.queryTemplate,
this.resultSetType,
this.resultSetConcurrency,
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
index f72434db7..8bf9f56de 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
@@ -58,6 +58,41 @@ void tearDown() throws IOException {
jdbcInputFormat = null;
}
+ @Test
+ void testJdbcInputFormatspiltKeyString() throws IOException {
+ final int fetchSize = 1;
+ final long min = TEST_DATA[0].id;
+ final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
+ JdbcParameterValuesProvider pramProvider =
+ new JdbcNumericBetweenParametersProvider(min, max, true).ofBatchSize(fetchSize);
+ // check string The number of input slices for the scan boundary
+
+ Serializable[][] correctParameters = new Serializable[10][2];
+ correctParameters[0] = new Long[] {min, min};
+ correctParameters[1] = new Long[] {min + 1, min + 1};
+ correctParameters[2] = new Long[] {min + 2, min + 2};
+ correctParameters[3] = new Long[] {min + 3, min + 3};
+ correctParameters[4] = new Long[] {min + 4, min + 4};
+ correctParameters[5] = new Long[] {min + 5, min + 5};
+ correctParameters[6] = new Long[] {min + 6, min + 6};
+ correctParameters[7] = new Long[] {min + 7, min + 7};
+ correctParameters[8] = new Long[] {min + 8, min + 8};
+ correctParameters[9] = new Long[] {min + 9, min + 9};
+
+ assertThat(pramProvider.getParameterValues().length).isEqualTo(max - min + 1);
+ assertThat(pramProvider.getParameterValues()).isEqualTo(correctParameters);
+
+ // Test lower boundary values equal
+ JdbcParameterValuesProvider singleProvider =
+ new JdbcNumericBetweenParametersProvider(1, 1, true).ofBatchSize(fetchSize);
+ // check string The number of input slices for the scan boundary
+
+ Serializable[][] singleCorrectParameters = new Serializable[1][2];
+ singleCorrectParameters[0] = new Long[] {1L, 1L};
+
+ assertThat(singleProvider.getParameterValues()).isEqualTo(singleCorrectParameters);
+ }
+
@Test
void testUntypedRowInfo() {
assertThatThrownBy(
@@ -282,7 +317,7 @@ void testJdbcInputFormatWithParallelismAndNumericColumnSplitting() throws IOExce
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
JdbcParameterValuesProvider pramProvider =
- new JdbcNumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
+ new JdbcNumericBetweenParametersProvider(min, max, false).ofBatchSize(fetchSize);
jdbcInputFormat =
JdbcInputFormat.buildJdbcInputFormat()
.setDrivername(getMetadata().getDriverClass())
@@ -320,7 +355,7 @@ void testJdbcInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOE
final long max = TEST_DATA[TEST_DATA.length - 1].id;
final long fetchSize = max + 1; // generate a single split
JdbcParameterValuesProvider pramProvider =
- new JdbcNumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
+ new JdbcNumericBetweenParametersProvider(min, max, false).ofBatchSize(fetchSize);
jdbcInputFormat =
JdbcInputFormat.buildJdbcInputFormat()
.setDrivername(getMetadata().getDriverClass())
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2DynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2DynamicTableSourceITCase.java
index d92217a2f..0eafb4693 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2DynamicTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2DynamicTableSourceITCase.java
@@ -32,6 +32,7 @@
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.connector.jdbc.testutils.databases.DbName.DB2_DB;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow;
@@ -44,6 +45,7 @@ public class Db2DynamicTableSourceITCase extends JdbcDynamicTableSourceITCase
protected TableRow createInputTable() {
return tableRow(
"jdbDynamicTableSource",
+ DB2_DB,
field("id", dbType("BIGINT"), DataTypes.BIGINT().notNull()),
field("decimal_col", dbType("NUMERIC(10, 4)"), DataTypes.DECIMAL(10, 4)),
field("timestamp6_col", dbType("TIMESTAMP(6)"), DataTypes.TIMESTAMP(6)),
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/table/DerbyDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/table/DerbyDynamicTableSourceITCase.java
index fa1d67350..215932159 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/table/DerbyDynamicTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/table/DerbyDynamicTableSourceITCase.java
@@ -31,6 +31,7 @@
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.connector.jdbc.testutils.databases.DbName.DERBY_DB;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow;
@@ -42,6 +43,7 @@ public class DerbyDynamicTableSourceITCase extends JdbcDynamicTableSourceITCase
protected TableRow createInputTable() {
return tableRow(
"jdbDynamicTableSource",
+ DERBY_DB,
field("id", DataTypes.BIGINT().notNull()),
field("decimal_col", DataTypes.DECIMAL(10, 4)),
field("timestamp6_col", dbType("TIMESTAMP"), DataTypes.TIMESTAMP(6)),
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/table/MySqlDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/table/MySqlDynamicTableSourceITCase.java
index 967c7e841..95a245f8c 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/table/MySqlDynamicTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/table/MySqlDynamicTableSourceITCase.java
@@ -21,6 +21,7 @@
import org.apache.flink.connector.jdbc.databases.mysql.MySqlTestBase;
import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
+import org.apache.flink.connector.jdbc.testutils.databases.DbName;
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.types.Row;
@@ -43,6 +44,7 @@ public class MySqlDynamicTableSourceITCase extends JdbcDynamicTableSourceITCase
protected TableRow createInputTable() {
return tableRow(
"jdbDynamicTableSource",
+ DbName.MYSQL_DB,
field("id", DataTypes.BIGINT().notNull()),
field("decimal_col", DataTypes.DECIMAL(10, 4)),
field("timestamp6_col", DataTypes.TIMESTAMP(6)),
@@ -50,7 +52,8 @@ protected TableRow createInputTable() {
field("real_col", dbType("REAL"), DataTypes.DOUBLE()),
field("double_col", DataTypes.DOUBLE()),
field("time_col", dbType("TIME"), DataTypes.TIME()),
- field("timestamp9_col", DataTypes.TIMESTAMP(6)));
+ field("timestamp9_col", DataTypes.TIMESTAMP(6)),
+ field("string_col", DataTypes.VARCHAR(30)));
}
protected List getTestData() {
@@ -62,7 +65,8 @@ protected List getTestData() {
1.175E-37D,
1.79769E308D,
LocalTime.parse("15:35"),
- LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+ LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+ "Leblanc_1"),
Row.of(
2L,
BigDecimal.valueOf(101.1234),
@@ -70,6 +74,7 @@ protected List getTestData() {
-1.175E-37D,
-1.79769E308,
LocalTime.parse("15:36:01"),
- LocalDateTime.parse("2020-01-01T15:36:01.123456")));
+ LocalDateTime.parse("2020-01-01T15:36:01.123456"),
+ "Wade"));
}
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/table/OracleDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/table/OracleDynamicTableSourceITCase.java
index 9e758eeeb..b79c11a78 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/table/OracleDynamicTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/table/OracleDynamicTableSourceITCase.java
@@ -21,6 +21,7 @@
import org.apache.flink.connector.jdbc.databases.oracle.OracleTestBase;
import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
+import org.apache.flink.connector.jdbc.testutils.databases.DbName;
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.types.Row;
@@ -43,6 +44,7 @@ public class OracleDynamicTableSourceITCase extends JdbcDynamicTableSourceITCase
protected TableRow createInputTable() {
return tableRow(
"jdbDynamicTableSource",
+ DbName.ORACLE_DB,
field("id", dbType("INTEGER"), DataTypes.BIGINT().notNull()),
field("decimal_col", DataTypes.DECIMAL(10, 4)),
field("timestamp6_col", dbType("TIMESTAMP"), DataTypes.TIMESTAMP(6)),
@@ -53,7 +55,7 @@ protected TableRow createInputTable() {
field("binary_double_col", dbType("BINARY_DOUBLE"), DataTypes.DOUBLE()),
field("char_col", dbType("CHAR"), DataTypes.CHAR(1)),
field("nchar_col", dbType("NCHAR(3)"), DataTypes.VARCHAR(3)),
- field("varchar2_col", dbType("VARCHAR2(30)"), DataTypes.VARCHAR(30)),
+ field("string_col", dbType("VARCHAR2(30)"), DataTypes.VARCHAR(30)),
field("date_col", dbType("DATE"), DataTypes.DATE()),
field("timestamp9_col", dbType("TIMESTAMP(9)"), DataTypes.TIMESTAMP(9)),
field("clob_col", dbType("CLOB"), DataTypes.STRING()));
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/table/PostgresDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/table/PostgresDynamicTableSourceITCase.java
index 174be59d8..66d6d5bec 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/table/PostgresDynamicTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/table/PostgresDynamicTableSourceITCase.java
@@ -21,6 +21,7 @@
import org.apache.flink.connector.jdbc.databases.postgres.PostgresTestBase;
import org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
+import org.apache.flink.connector.jdbc.testutils.databases.DbName;
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.types.Row;
@@ -43,13 +44,15 @@ public class PostgresDynamicTableSourceITCase extends JdbcDynamicTableSourceITCa
protected TableRow createInputTable() {
return tableRow(
"jdbDynamicTableSource",
+ DbName.POSTGRES_DB,
field("id", DataTypes.BIGINT().notNull()),
field("decimal_col", DataTypes.DECIMAL(10, 4)),
field("timestamp6_col", DataTypes.TIMESTAMP(6)),
// other fields
field("real_col", dbType("REAL"), DataTypes.FLOAT()),
field("double_col", dbType("DOUBLE PRECISION"), DataTypes.DOUBLE()),
- field("time_col", dbType("TIME"), DataTypes.TIME()));
+ field("time_col", dbType("TIME"), DataTypes.TIME()),
+ field("string_col", DataTypes.VARCHAR(30)));
}
protected List getTestData() {
@@ -60,13 +63,15 @@ protected List getTestData() {
LocalDateTime.parse("2020-01-01T15:35:00.123456"),
1.175E-37F,
1.79769E308D,
- LocalTime.parse("15:35")),
+ LocalTime.parse("15:35"),
+ "Leblanc_1"),
Row.of(
2L,
BigDecimal.valueOf(101.1234),
LocalDateTime.parse("2020-01-01T15:36:01.123456"),
-1.175E-37F,
-1.79769E308,
- LocalTime.parse("15:36:01")));
+ LocalTime.parse("15:36:01"),
+ "Wade"));
}
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/table/SqlServerDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/table/SqlServerDynamicTableSourceITCase.java
index f9444f45c..0a072e022 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/table/SqlServerDynamicTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/table/SqlServerDynamicTableSourceITCase.java
@@ -21,6 +21,7 @@
import org.apache.flink.connector.jdbc.databases.sqlserver.SqlServerTestBase;
import org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialect;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
+import org.apache.flink.connector.jdbc.testutils.databases.DbName;
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.types.Row;
@@ -47,6 +48,7 @@ public class SqlServerDynamicTableSourceITCase extends JdbcDynamicTableSourceITC
protected TableRow createInputTable() {
return tableRow(
"jdbDynamicTableSource",
+ DbName.SQL_SERVER_DB,
field("id", DataTypes.BIGINT().notNull()),
field("decimal_col", DataTypes.DECIMAL(10, 4)),
field("timestamp6_col", dbType("DATETIME2"), DataTypes.TIMESTAMP(6)),
@@ -63,7 +65,7 @@ protected TableRow createInputTable() {
field("datetime2_col", dbType("DATETIME2"), DataTypes.TIMESTAMP()),
field("char_col", dbType("CHAR"), DataTypes.STRING()),
field("nchar_col", dbType("NCHAR(3)"), DataTypes.STRING()),
- field("varchar2_col", dbType("VARCHAR(30)"), DataTypes.STRING()),
+ field("string_col", dbType("VARCHAR(30)"), DataTypes.STRING()),
field("nvarchar2_col", dbType("NVARCHAR(30)"), DataTypes.STRING()),
field("text_col", dbType("TEXT"), DataTypes.STRING()),
field("ntext_col", dbType("NTEXT"), DataTypes.STRING()));
@@ -87,7 +89,7 @@ protected List getTestData() {
LocalDateTime.parse("2020-01-01T15:35:00.1234567"),
"a",
"abc",
- "abcdef",
+ "Leblanc_1",
"xyz",
"Hello World",
"World Hello"),
@@ -110,7 +112,7 @@ protected List getTestData() {
"abcdef",
"xyz",
"Hey Leonard",
- "World Hello"));
+ "Wade"));
}
@Test
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/trino/table/TrinoDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/trino/table/TrinoDynamicTableSourceITCase.java
index 39b6c3c5c..40b164e04 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/trino/table/TrinoDynamicTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/trino/table/TrinoDynamicTableSourceITCase.java
@@ -21,6 +21,7 @@
import org.apache.flink.connector.jdbc.databases.trino.TrinoTestBase;
import org.apache.flink.connector.jdbc.databases.trino.dialect.TrinoDialect;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
+import org.apache.flink.connector.jdbc.testutils.databases.DbName;
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.types.Row;
@@ -47,6 +48,7 @@ class TrinoDynamicTableSourceITCase extends JdbcDynamicTableSourceITCase impleme
protected TableRow createInputTable() {
return tableRow(
"jdbDynamicTableSource",
+ DbName.TRINO_DB,
field("id", dbType("INTEGER"), DataTypes.BIGINT()),
field("decimal_col", DataTypes.DECIMAL(10, 4)),
field("timestamp6_col", DataTypes.TIMESTAMP(6)),
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
index f79e7f7eb..5c34f282e 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
@@ -124,7 +124,7 @@ private void runTest(boolean exploitParallelism) throws Exception {
inputBuilder
.setQuery(SELECT_ALL_BOOKS_SPLIT_BY_ID)
.setParametersProvider(
- new JdbcNumericBetweenParametersProvider(min, max)
+ new JdbcNumericBetweenParametersProvider(min, max, false)
.ofBatchSize(fetchSize));
}
DataSet source = environment.createInput(inputBuilder.finish());
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java
index 25e350e1a..72f20f362 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java
@@ -30,7 +30,7 @@ class NumericBetweenParametersProviderTest {
@Test
void testBatchSizeDivisible() {
JdbcNumericBetweenParametersProvider provider =
- new JdbcNumericBetweenParametersProvider(-5, 9).ofBatchSize(3);
+ new JdbcNumericBetweenParametersProvider(-5, 9, false).ofBatchSize(3);
Serializable[][] actual = provider.getParameterValues();
long[][] expected = {
@@ -46,7 +46,7 @@ void testBatchSizeDivisible() {
@Test
void testBatchSizeNotDivisible() {
JdbcNumericBetweenParametersProvider provider =
- new JdbcNumericBetweenParametersProvider(-5, 11).ofBatchSize(4);
+ new JdbcNumericBetweenParametersProvider(-5, 11, false).ofBatchSize(4);
Serializable[][] actual = provider.getParameterValues();
long[][] expected = {
@@ -62,7 +62,7 @@ void testBatchSizeNotDivisible() {
@Test
void testBatchSizeTooLarge() {
JdbcNumericBetweenParametersProvider provider =
- new JdbcNumericBetweenParametersProvider(0, 2).ofBatchSize(5);
+ new JdbcNumericBetweenParametersProvider(0, 2, false).ofBatchSize(5);
Serializable[][] actual = provider.getParameterValues();
long[][] expected = {new long[] {0, 2}};
@@ -72,7 +72,7 @@ void testBatchSizeTooLarge() {
@Test
void testBatchNumDivisible() {
JdbcNumericBetweenParametersProvider provider =
- new JdbcNumericBetweenParametersProvider(-5, 9).ofBatchNum(5);
+ new JdbcNumericBetweenParametersProvider(-5, 9, false).ofBatchNum(5);
Serializable[][] actual = provider.getParameterValues();
long[][] expected = {
@@ -88,7 +88,7 @@ void testBatchNumDivisible() {
@Test
void testBatchNumNotDivisible() {
JdbcNumericBetweenParametersProvider provider =
- new JdbcNumericBetweenParametersProvider(-5, 11).ofBatchNum(5);
+ new JdbcNumericBetweenParametersProvider(-5, 11, false).ofBatchNum(5);
Serializable[][] actual = provider.getParameterValues();
long[][] expected = {
@@ -104,7 +104,7 @@ void testBatchNumNotDivisible() {
@Test
void testBatchNumTooLarge() {
JdbcNumericBetweenParametersProvider provider =
- new JdbcNumericBetweenParametersProvider(0, 2).ofBatchNum(5);
+ new JdbcNumericBetweenParametersProvider(0, 2, false).ofBatchNum(5);
Serializable[][] actual = provider.getParameterValues();
long[][] expected = {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
index bab5a408d..c27f3ac2a 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
@@ -18,9 +18,18 @@
package org.apache.flink.connector.jdbc.table;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcInputFormat;
+import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
+import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect;
+import org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect;
+import org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialect;
+import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.databases.DbName;
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -47,12 +56,14 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -101,6 +112,15 @@ public abstract class JdbcDynamicTableSourceITCase implements DatabaseTest {
public static StreamExecutionEnvironment env;
public static TableEnvironment tEnv;
+ private static final List dbNameList = new ArrayList<>();
+
+ static {
+ dbNameList.add(DbName.POSTGRES_DB);
+ dbNameList.add(DbName.MYSQL_DB);
+ dbNameList.add(DbName.ORACLE_DB);
+ dbNameList.add(DbName.SQL_SERVER_DB);
+ }
+
protected TableRow createInputTable() {
return tableRow(
"jdbDynamicTableSource",
@@ -142,12 +162,106 @@ void afterEach() {
@Test
void testJdbcSource() {
+ String fieldStr = "string_col";
+ String fieldId = "id";
String testTable = "testTable";
- tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable));
+ boolean isShardContainString = Arrays.asList(inputTable.getTableFields()).contains(fieldId);
+ if (!isShardContainString) {
+ throw new IllegalArgumentException(
+ "The data column in the jdbc table test must contain the `id`");
+ }
+ boolean isPartitionColumnTypeString =
+ (dbNameList.contains(createInputTable().getDbName()) && isShardContainString);
+ DatabaseMetadata metadata = getMetadata();
+ // Non slice read
+ tEnv.executeSql(inputTable.getCreateQueryForFlink(metadata, testTable));
List collected = executeQuery("SELECT * FROM " + testTable);
-
assertThat(collected).containsExactlyInAnyOrderElementsOf(getTestData());
+
+ String testReadPartitionString = "testReadPartitionString";
+
+ // string slice read
+ List withParams = new ArrayList<>();
+
+ String partitionColumn =
+ Arrays.asList(inputTable.getTableFields()).contains(fieldStr) ? fieldStr : fieldId;
+ int partitionNum = 10;
+ int lowerBound = 0;
+ int upperBound = 9;
+ withParams.add(String.format("'scan.partition.column'='%s'", partitionColumn));
+ withParams.add(String.format("'scan.partition.num'='%s'", partitionNum));
+ withParams.add(String.format("'scan.partition.lower-bound'='%s'", lowerBound));
+ withParams.add(String.format("'scan.partition.upper-bound'='%s'", upperBound));
+
+ tEnv.executeSql(
+ inputTable.getCreateQueryForFlink(metadata, testReadPartitionString, withParams));
+ List collectedPartitionString =
+ executeQuery("SELECT * FROM " + testReadPartitionString);
+ assertThat(collectedPartitionString).containsExactlyInAnyOrderElementsOf(getTestData());
+
+ Serializable[][] queryParameters = new Long[3][1];
+ queryParameters[0] = new Long[] {0L};
+ queryParameters[1] = new Long[] {1L};
+ queryParameters[2] = new Long[] {2L};
+
+ String partitionKeyName = isPartitionColumnTypeString ? fieldStr : fieldId;
+ String sqlFilterField;
+ switch (createInputTable().getDbName()) {
+ case POSTGRES_DB:
+ sqlFilterField =
+ new PostgresDialect()
+ .hashModForField(partitionKeyName, queryParameters.length);
+ break;
+ case MYSQL_DB:
+ sqlFilterField =
+ new MySqlDialect()
+ .hashModForField(partitionKeyName, queryParameters.length);
+ break;
+ case ORACLE_DB:
+ sqlFilterField =
+ new OracleDialect()
+ .hashModForField(partitionKeyName, queryParameters.length);
+ break;
+ case SQL_SERVER_DB:
+ sqlFilterField =
+ new SqlServerDialect()
+ .hashModForField(partitionKeyName, queryParameters.length);
+ break;
+ default:
+ sqlFilterField = partitionKeyName;
+ break;
+ }
+
+ ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
+ JdbcInputFormat jdbcInputFormat =
+ JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername(metadata.getDriverClass())
+ .setDBUrl(metadata.getJdbcUrl())
+ .setUsername(metadata.getUsername())
+ .setPassword(metadata.getPassword())
+ .setPartitionColumnTypeString(isPartitionColumnTypeString)
+ .setQuery(
+ "select * from "
+ + inputTable.getTableName()
+ + " where ( "
+ + sqlFilterField
+ + " ) = ?")
+ .setRowTypeInfo(inputTable.getTableRowTypeInfo())
+ .setParametersProvider(
+ new JdbcGenericParameterValuesProvider(queryParameters))
+ .finish();
+ try {
+ int jdbcInputRowSize =
+ executionEnvironment
+ .createInput(jdbcInputFormat)
+ .map(row -> row.getField(fieldId))
+ .collect()
+ .size();
+ assertThat(jdbcInputRowSize).isEqualTo(getTestData().size());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Test
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
index b2e4deea8..4c93f8c0e 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
@@ -55,8 +55,9 @@
class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
private JdbcRowDataInputFormat inputFormat;
- private static String[] fieldNames = new String[] {"id", "title", "author", "price", "qty"};
- private static DataType[] fieldDataTypes =
+ private static final String[] fieldNames =
+ new String[] {"id", "title", "author", "price", "qty"};
+ private static final DataType[] fieldDataTypes =
new DataType[] {
DataTypes.INT(),
DataTypes.STRING(),
@@ -240,7 +241,7 @@ void testJdbcInputFormatWithParallelismAndNumericColumnSplitting() throws IOExce
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
JdbcParameterValuesProvider pramProvider =
- new JdbcNumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
+ new JdbcNumericBetweenParametersProvider(min, max, false).ofBatchSize(fetchSize);
inputFormat =
JdbcRowDataInputFormat.builder()
.setDrivername(getMetadata().getDriverClass())
@@ -278,7 +279,7 @@ void testJdbcInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOE
final long max = TEST_DATA[TEST_DATA.length - 1].id;
final long fetchSize = max + 1; // generate a single split
JdbcParameterValuesProvider pramProvider =
- new JdbcNumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
+ new JdbcNumericBetweenParametersProvider(min, max, false).ofBatchSize(fetchSize);
inputFormat =
JdbcRowDataInputFormat.builder()
.setDrivername(getMetadata().getDriverClass())
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/DbName.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/DbName.java
new file mode 100644
index 000000000..0953a3886
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/DbName.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases;
+
+/** Define a database name. * */
+public enum DbName {
+ POSTGRES_DB,
+ MYSQL_DB,
+ ORACLE_DB,
+ SQL_SERVER_DB,
+ TRINO_DB,
+ DB2_DB,
+ DERBY_DB
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java
index d8cbd7937..fabe09639 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java
@@ -23,6 +23,7 @@
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.databases.DbName;
import org.apache.flink.connector.jdbc.testutils.functions.JdbcResultSetBuilder;
import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
import org.apache.flink.table.api.DataTypes;
@@ -53,13 +54,16 @@
public abstract class TableBase implements TableManaged {
private final String name;
+
+ private final DbName dbName;
private final TableField[] fields;
- protected TableBase(String name, TableField[] fields) {
+ protected TableBase(String name, DbName dbName, TableField[] fields) {
Preconditions.checkArgument(name != null && !name.isEmpty(), "Table name must be defined");
Preconditions.checkArgument(
fields != null && fields.length != 0, "Table fields must be defined");
this.name = name;
+ this.dbName = dbName;
this.fields = fields;
}
@@ -104,6 +108,10 @@ public RowTypeInfo getTableRowTypeInfo() {
return new RowTypeInfo(typesArray, fieldsArray);
}
+ public DbName getDbName() {
+ return dbName;
+ }
+
public RowType getTableRowType() {
LogicalType[] typesArray =
getStreamDataTypes().map(DataType::getLogicalType).toArray(LogicalType[]::new);
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java
index 6a8f80d3b..4c95a03dd 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java
@@ -18,13 +18,18 @@
package org.apache.flink.connector.jdbc.testutils.tables;
+import org.apache.flink.connector.jdbc.testutils.databases.DbName;
import org.apache.flink.table.types.DataType;
/** Table builder. * */
public final class TableBuilder {
public static TableRow tableRow(String name, TableField... fields) {
- return new TableRow(name, fields);
+ return new TableRow(name, null, fields);
+ }
+
+ public static TableRow tableRow(String name, DbName dbName, TableField... fields) {
+ return new TableRow(name, dbName, fields);
}
public static TableField field(String name, DataType dataType) {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java
index 87308fb70..2812ab66b 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java
@@ -20,6 +20,7 @@
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.databases.DbName;
import org.apache.flink.connector.jdbc.testutils.functions.JdbcResultSetBuilder;
import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
import org.apache.flink.table.api.DataTypes;
@@ -43,8 +44,8 @@
/** Row table. * */
public class TableRow extends TableBase {
- protected TableRow(String name, TableField[] fields) {
- super(name, fields);
+ protected TableRow(String name, DbName dbName, TableField[] fields) {
+ super(name, dbName, fields);
}
protected JdbcResultSetBuilder getResultSetBuilder() {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/templates/BooksTable.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/templates/BooksTable.java
index c1bb4d8f9..9207d851b 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/templates/BooksTable.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/templates/BooksTable.java
@@ -77,6 +77,7 @@ public class BooksTable extends TableBase implements Table
public BooksTable(String name) {
super(
name,
+ null,
Arrays.asList(
pkField("id", INT().notNull()),
field("title", VARCHAR(50)),