Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
<shade.base>org.apache.flink.shaded.databend</shade.base>

<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
<maven-shade-plugin.version>3.5.1</maven-shade-plugin.version>
<maven-checkstyle-plugin.version>2.17</maven-checkstyle-plugin.version>
<spotless-maven-plugin.version>2.36.0</spotless-maven-plugin.version>
<maven-release-plugin.version>2.5.3</maven-release-plugin.version>
Expand Down Expand Up @@ -146,7 +146,7 @@
<dependency>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.1.2</version>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ public DynamicTableSink createDynamicTableSink(Context context) {
validateConfigOptions(config);

ResolvedCatalogTable catalogTable = context.getCatalogTable();
// String[] primaryKeys = catalogTable
// .getResolvedSchema()
// .getPrimaryKey()
// .map(UniqueConstraint::getColumns)
// .map(keys -> keys.toArray(new String[0]))
// .orElse(new String[0]);
Properties databendProperties =
getDatabendProperties(context.getCatalogTable().getOptions());
return new DatabendDynamicTableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,30 @@
import org.apache.flink.connector.databend.DatabendDynamicTableFactory;
import org.apache.flink.connector.databend.util.DataTypeUtil;
import org.apache.flink.connector.databend.util.DatabendUtil;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.exceptions.*;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
Expand All @@ -17,14 +38,27 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.*;

import static org.apache.flink.connector.databend.config.DatabendConfig.*;
import java.sql.ResultSetMetaData;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import static org.apache.flink.connector.databend.config.DatabendConfig.DATABASE_NAME;
import static org.apache.flink.connector.databend.config.DatabendConfig.PASSWORD;
import static org.apache.flink.connector.databend.config.DatabendConfig.TABLE_NAME;
import static org.apache.flink.connector.databend.config.DatabendConfig.URL;
import static org.apache.flink.connector.databend.config.DatabendConfig.USERNAME;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;

Expand Down Expand Up @@ -211,34 +245,186 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
}

private synchronized TableSchema createTableSchema(String databaseName, String tableName) {
try (PreparedStatement stmt = connection.prepareStatement(
String.format("SELECT * from `%s`.`%s` limit 1", databaseName, tableName))) {
DatabendResultSetMetaData metaData = stmt.getMetaData().unwrap(DatabendResultSetMetaData.class);
Method getColMethod = metaData.getClass().getDeclaredMethod("getCol", int.class);
getColMethod.setAccessible(true);
try {
// check connection
if (connection == null) {
throw new CatalogException("Connection is null");
}
if (connection.isClosed()) {
throw new CatalogException("Connection is closed");
}

String sql = String.format("SELECT * from `%s`.`%s` limit 1", databaseName, tableName);
System.out.println("Executing SQL: " + sql);

PreparedStatement stmt = connection.prepareStatement(sql);
if (stmt == null) {
throw new CatalogException("PreparedStatement is null");
}

ResultSet rs = stmt.executeQuery();
if (rs == null) {
throw new CatalogException("ResultSet is null");
}

// from ResultSet get metadata
ResultSetMetaData standardMetaData = rs.getMetaData();
if (standardMetaData == null) {
throw new CatalogException("ResultSetMetaData is null");
}

System.out.println("Got standard metadata, column count: " + standardMetaData.getColumnCount());

// ResultSetMetaData
try {
// unwrap to DatabendResultSetMetaData
DatabendResultSetMetaData metaData = standardMetaData.unwrap(DatabendResultSetMetaData.class);
if (metaData == null) {
throw new CatalogException("DatabendResultSetMetaData is null after unwrap");
}

Method getColMethod = metaData.getClass().getDeclaredMethod("getCol", int.class);
getColMethod.setAccessible(true);

List<String> primaryKeys = getPrimaryKeys(databaseName, tableName);
TableSchema.Builder builder = TableSchema.builder();

for (int idx = 1; idx <= metaData.getColumnCount(); idx++) {
DatabendColumnInfo columnInfo = (DatabendColumnInfo) getColMethod.invoke(metaData, idx);
String columnName = columnInfo.getColumnName();
DataType columnType = DataTypeUtil.toFlinkType(columnInfo);
if (primaryKeys.contains(columnName)) {
columnType = columnType.notNull();
}
builder.field(columnName, columnType);
}

if (!primaryKeys.isEmpty()) {
builder.primaryKey(primaryKeys.toArray(new String[0]));
}

rs.close();
stmt.close();

return builder.build();

} catch (Exception e) {
return createTableSchemaFromStandardMetaData(standardMetaData, databaseName, tableName, rs, stmt);
}

} catch (Exception e) {
e.printStackTrace();
throw new CatalogException(
String.format(
"Failed getting columns in catalog %s database %s table %s: %s",
getName(), databaseName, tableName, e.getMessage()),
e);
}
}

private TableSchema createTableSchemaFromStandardMetaData(
ResultSetMetaData metaData, String databaseName, String tableName,
ResultSet rs, PreparedStatement stmt) {
try {
List<String> primaryKeys = getPrimaryKeys(databaseName, tableName);
TableSchema.Builder builder = TableSchema.builder();

for (int idx = 1; idx <= metaData.getColumnCount(); idx++) {
DatabendColumnInfo columnInfo = (DatabendColumnInfo) getColMethod.invoke(metaData, idx);
String columnName = columnInfo.getColumnName();
DataType columnType = DataTypeUtil.toFlinkType(columnInfo);
String columnName = metaData.getColumnName(idx);
String columnTypeName = metaData.getColumnTypeName(idx);
int columnType = metaData.getColumnType(idx);
int precision = metaData.getPrecision(idx);
int scale = metaData.getScale(idx);

System.out.printf("Column %d: name=%s, type=%s, sqlType=%d, precision=%d, scale=%d%n",
idx, columnName, columnTypeName, columnType, precision, scale);

DataType flinkType = convertToFlinkType(columnTypeName, columnType, precision, scale);

if (primaryKeys.contains(columnName)) {
columnType = columnType.notNull();
flinkType = flinkType.notNull();
}
builder.field(columnName, columnType);
builder.field(columnName, flinkType);
}

if (!primaryKeys.isEmpty()) {
builder.primaryKey(primaryKeys.toArray(new String[0]));
}

rs.close();
stmt.close();

return builder.build();
} catch (Exception e) {
throw new CatalogException(
String.format(
"Failed getting columns in catalog %s database %s table %s",
getName(), databaseName, tableName),
e);
throw new CatalogException("Failed to create schema from standard metadata", e);
}
}

private DataType convertToFlinkType(String typeName, int sqlType, int precision, int scale) {
if (typeName != null) {
String upperTypeName = typeName.toUpperCase();
if (upperTypeName.contains("INT64") || upperTypeName.contains("BIGINT")) {
return DataTypes.BIGINT();
} else if (upperTypeName.contains("INT32") || upperTypeName.contains("INT")) {
return DataTypes.INT();
} else if (upperTypeName.contains("INT16") || upperTypeName.contains("SMALLINT")) {
return DataTypes.SMALLINT();
} else if (upperTypeName.contains("INT8") || upperTypeName.contains("TINYINT")) {
return DataTypes.TINYINT();
} else if (upperTypeName.contains("FLOAT64") || upperTypeName.contains("DOUBLE")) {
return DataTypes.DOUBLE();
} else if (upperTypeName.contains("FLOAT32") || upperTypeName.contains("FLOAT")) {
return DataTypes.FLOAT();
} else if (upperTypeName.contains("STRING") || upperTypeName.contains("VARCHAR")) {
return DataTypes.STRING();
} else if (upperTypeName.contains("TIMESTAMP")) {
return DataTypes.TIMESTAMP(3);
} else if (upperTypeName.contains("DATE")) {
return DataTypes.DATE();
} else if (upperTypeName.contains("BOOLEAN") || upperTypeName.contains("BOOL")) {
return DataTypes.BOOLEAN();
} else if (upperTypeName.contains("DECIMAL")) {
return DataTypes.DECIMAL(precision > 0 ? precision : 10, scale >= 0 ? scale : 0);
}
}

switch (sqlType) {
case Types.BOOLEAN:
return DataTypes.BOOLEAN();
case Types.TINYINT:
return DataTypes.TINYINT();
case Types.SMALLINT:
return DataTypes.SMALLINT();
case Types.INTEGER:
return DataTypes.INT();
case Types.BIGINT:
return DataTypes.BIGINT();
case Types.FLOAT:
case Types.REAL:
return DataTypes.FLOAT();
case Types.DOUBLE:
return DataTypes.DOUBLE();
case Types.DECIMAL:
case Types.NUMERIC:
return DataTypes.DECIMAL(precision > 0 ? precision : 10, scale >= 0 ? scale : 0);
case Types.CHAR:
return DataTypes.CHAR(precision > 0 ? precision : 1);
case Types.VARCHAR:
case Types.LONGVARCHAR:
return DataTypes.STRING();
case Types.DATE:
return DataTypes.DATE();
case Types.TIME:
return DataTypes.TIME();
case Types.TIMESTAMP:
return DataTypes.TIMESTAMP(3);
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
return DataTypes.BYTES();
default:
System.out.println("Unknown SQL type: " + sqlType + ", using STRING");
return DataTypes.STRING();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,38 @@ protected DatabendRowConverter.DeserializationConverter createToInternalConverte
? DecimalData.fromBigDecimal(new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
case DATE:
return val -> (int) ((Date) val).toLocalDate().toEpochDay();
return val -> {
if (val == null) {
return null;
}
try {
if (val instanceof String) {
String dateStr = ((String) val).trim();
if (dateStr.contains("/")) {
dateStr = dateStr.replace("/", "-");
}
return (int) java.sql.Date.valueOf(dateStr).toLocalDate().toEpochDay();
} else if (val instanceof java.sql.Date) {
return (int) ((java.sql.Date) val).toLocalDate().toEpochDay();
} else if (val instanceof java.time.LocalDate) {
return (int) ((java.time.LocalDate) val).toEpochDay();
} else {
return (int) ((java.sql.Date) val).toLocalDate().toEpochDay();
}
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to convert date value: %s (type: %s)",
val, val.getClass().getName()), e);
}
};
case TIME_WITHOUT_TIME_ZONE:
return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return val -> TimestampData.fromTimestamp((Timestamp) val);
return val -> convertToTimestampData(val);

case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return val -> TimestampData.fromInstant(((Timestamp) val).toInstant());
return val -> convertToTimestampDataWithZone(val);
case CHAR:
case VARCHAR:
return val -> StringData.fromString((String) val);
Expand All @@ -139,6 +163,47 @@ protected DatabendRowConverter.DeserializationConverter createToInternalConverte
}
}

private TimestampData convertToTimestampData(Object val) {
if (val == null) return null;

if (val instanceof String) {
String str = (String) val;
try {
if (str.contains("T")) {
str = str.replace("T", " ");
}
if (str.contains("+")) {
str = str.substring(0, str.indexOf("+"));
}
if (str.contains("Z")) {
str = str.substring(0, str.indexOf("Z"));
}
Timestamp timestamp = Timestamp.valueOf(str.trim());
return TimestampData.fromTimestamp(timestamp);
} catch (Exception e) {
throw new RuntimeException("Failed to parse timestamp: " + str, e);
}
} else if (val instanceof Timestamp) {
return TimestampData.fromTimestamp((Timestamp) val);
} else if (val instanceof Long) {
return TimestampData.fromEpochMillis((Long) val);
}

throw new IllegalArgumentException("Unsupported timestamp type: " + val.getClass());
}

private TimestampData convertToTimestampDataWithZone(Object val) {
if (val == null) return null;

if (val instanceof String) {
return convertToTimestampData(val);
} else if (val instanceof Timestamp) {
return TimestampData.fromInstant(((Timestamp) val).toInstant());
}

throw new IllegalArgumentException("Unsupported timestamp type: " + val.getClass());
}

protected DatabendRowConverter.SerializationConverter createToExternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
Expand Down