Skip to content

Commit 74f6d7a

Browse files
zhoujinsongjinsongzhouxiaosefengczy006baiyangtx
authored
[AMORO-2204] Support Spark 3.5 for mixed format tables (#3428)
* add tencent-dlc profile * Add amoro-dlc Dockerfile * Change derby url * Fix amoro-dlc Dockerfile * Fix ams start cmd * Rollback change in entrypoint.sh * Remove lake fs dependencies * Add lake fs dependencies * Change hive dependency to dlc version * Remove hive version in tencent-dlc profile * Simplify amoro-dlc Dockerfile * Copy entrypoint.sh * Fix amoro-dlc Dockerfile * Fix amoro-dlc Dockerfile * Fix amoro-dlc Dockerfile * Add some debug info for amoro-dlc Dockerfile * Fix amoro-dlc Dockerfile * Add optimizer task class loader * Exclude not needed classes from spark optimizer * Rollback unnecessary changes * Solve the problem of orc package conflict and thrift size limit. * Optimize database configuration for mysql5.6. * Modify thrift's maxMessageSize to 1000m. * Full support AMORO_CONF_DIR envrionment variable * Fix a load-config.sh export error * Add conf dir to class path * Fix ams.sh classpath envrionment value error * Change hive version to 2.3.9 for dlc profile * Determine change store by name * Remove not needed list database calls * Allow configure catalog-impl for hive type * Support catalog-impl property for spark mixed-format * Fix checkstyle errors * Optimize catalog creation API, no need to upload files first (merge request !14) Squash merge branch 'dev/xiaosefeng-catalog' into 'tencent-master' Optimize catalog creation API, no need to upload files first * Fixed display error when switching catalog authentication mode * Fix tencent-dlc docker file build error * Modify cos related dependency packages and dependency sources * Rewrite pos delete files not written by optimizing * Fix log4j2 info log rollover issue * Add amoro conf into classpath * init mixed spark 3.5 modules * adapt mixformate for spark3.5 * Rollback some unnecessary changes * Rollback unnecessary changes * Rollback unnecessary changes * Fixed some unit test issues * Fixed some unit test issues * Fix some comments --------- Co-authored-by: jinsongzhou <jinsongzhou@tencent.com> Co-authored-by: xiaosefeng <xiaosefeng@tencent.com> Co-authored-by: ConradJam <jam.gzczy@gmail.com> Co-authored-by: baiyangtx <xiangnebula@163.com>
1 parent 647478e commit 74f6d7a

File tree

139 files changed

+45012
-58
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

139 files changed

+45012
-58
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/table/internal/InternalTableConstants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.amoro.server.table.internal;
2020

2121
import org.apache.amoro.mixed.InternalMixedIcebergCatalog;
22+
import org.apache.amoro.table.MixedTable;
2223

2324
/** Constants defines for internal table */
2425
public class InternalTableConstants {
@@ -37,6 +38,5 @@ public class InternalTableConstants {
3738
public static final String OSS_PROTOCOL_PREFIX = "oss://";
3839

3940
public static final String CHANGE_STORE_TABLE_NAME_SUFFIX =
40-
InternalMixedIcebergCatalog.CHANGE_STORE_SEPARATOR
41-
+ InternalMixedIcebergCatalog.CHANGE_STORE_NAME;
41+
InternalMixedIcebergCatalog.CHANGE_STORE_SEPARATOR + MixedTable.CHANGE_STORE_IDENTIFIER;
4242
}

amoro-ams/src/main/resources/mysql/upgrade-0.4.1-to-0.5.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ CREATE TABLE `table_identifier`
5959
`table_name` varchar(128) NOT NULL COMMENT 'Table name',
6060
PRIMARY KEY (`table_id`),
6161
UNIQUE KEY `table_name_index` (`catalog_name`,`db_name`,`table_name`)
62-
) ROW_FORMAT=DYNAMIC;
62+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'Table identifier for AMS' ROW_FORMAT=DYNAMIC;
6363
INSERT INTO `table_identifier` (`catalog_name`, `db_name`, `table_name`) SELECT `catalog_name`, `db_name`, `table_name` FROM `table_metadata`;
6464

6565
-- table_metadata

amoro-ams/src/test/java/org/apache/amoro/server/util/TestIcebergTableUtil.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.amoro.io.IcebergDataTestHelpers;
2222
import org.apache.amoro.io.MixedDataTestHelpers;
2323
import org.apache.amoro.mixed.MixedTables;
24+
import org.apache.amoro.properties.CatalogMetaProperties;
2425
import org.apache.amoro.server.utils.IcebergTableUtil;
2526
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
2627
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
@@ -123,7 +124,12 @@ private static Table newIcebergTable(Catalog catalog) {
123124
}
124125

125126
private static MixedTable newMixedTable(Catalog catalog, boolean withKey) {
126-
MixedTables mixedTables = new MixedTables(TableMetaStore.EMPTY, Maps.newHashMap(), catalog);
127+
MixedTables mixedTables =
128+
new MixedTables(
129+
TableMetaStore.EMPTY,
130+
Maps.newHashMap(),
131+
catalog,
132+
CatalogMetaProperties.MIXED_FORMAT_TABLE_STORE_SEPARATOR_DEFAULT);
127133
return mixedTables.createTable(
128134
org.apache.amoro.table.TableIdentifier.of("cata", "db", "table"),
129135
schema,

amoro-common/src/main/java/org/apache/amoro/client/PoolConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void setMaxReconnects(int maxReconnects) {
8888

8989
public static PoolConfig<?> forUrl(String url) {
9090
PoolConfig<?> poolConfig = new PoolConfig<>();
91-
URLEncodedUtils.parse(URI.create(url), Charset.defaultCharset())
91+
URLEncodedUtils.parse(URI.create(url), String.valueOf(Charset.defaultCharset()))
9292
.forEach(
9393
pair -> {
9494
try {

amoro-common/src/main/java/org/apache/amoro/client/ThriftClientPool.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public class ThriftClientPool<
4040
private static final Logger LOG = LoggerFactory.getLogger(ThriftClientPool.class);
4141
private static final int RECONNECT_INTERVAL = 2000;
4242
private static final int BORROW_ATTEMPTS = 5;
43-
4443
private final ThriftClientFactory clientFactory;
4544
private final ThriftPingFactory pingFactory;
4645
private final GenericObjectPool<ThriftClient<T>> pool;

amoro-common/src/main/java/org/apache/amoro/properties/CatalogMetaProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ public class CatalogMetaProperties {
2525
public static final String STORAGE_CONFIGS_KEY_HDFS_SITE = "hadoop.hdfs.site";
2626
public static final String STORAGE_CONFIGS_KEY_CORE_SITE = "hadoop.core.site";
2727
public static final String STORAGE_CONFIGS_KEY_HIVE_SITE = "hive.site";
28+
2829
public static final String STORAGE_CONFIGS_KEY_REGION = "storage.s3.region";
2930
public static final String STORAGE_CONFIGS_KEY_S3_ENDPOINT = "storage.s3.endpoint";
3031
public static final String STORAGE_CONFIGS_KEY_OSS_ENDPOINT = "storage.oss.endpoint";
31-
3232
public static final String STORAGE_CONFIGS_VALUE_TYPE_HDFS_LEGACY = "hdfs";
3333
public static final String STORAGE_CONFIGS_VALUE_TYPE_HADOOP = "Hadoop";
3434
public static final String STORAGE_CONFIGS_VALUE_TYPE_S3 = "S3";

amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/BasicMixedIcebergCatalog.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class BasicMixedIcebergCatalog implements MixedFormatCatalog {
6565
private AmsClient client;
6666
private MixedTables tables;
6767
private SupportsNamespaces asNamespaceCatalog;
68+
private String separator;
6869

6970
@Override
7071
public String name() {
@@ -93,6 +94,7 @@ public void initialize(String name, Map<String, String> properties, TableMetaSto
9394
}
9495
this.databaseFilterPattern = databaseFilterPattern;
9596
this.catalogProperties = properties;
97+
this.separator = tableStoreSeparator();
9698
this.tables = newMixedTables(metaStore, properties, icebergCatalog());
9799
if (properties.containsKey(CatalogMetaProperties.AMS_URI)) {
98100
this.client = new PooledAmsClient(properties.get(CatalogMetaProperties.AMS_URI));
@@ -221,7 +223,13 @@ protected MixedTables newMixedTables(
221223
TableMetaStore metaStore,
222224
Map<String, String> catalogProperties,
223225
org.apache.iceberg.catalog.Catalog icebergCatalog) {
224-
return new MixedTables(metaStore, catalogProperties, icebergCatalog);
226+
return new MixedTables(metaStore, catalogProperties, icebergCatalog, separator);
227+
}
228+
229+
protected String tableStoreSeparator() {
230+
return catalogProperties.getOrDefault(
231+
CatalogMetaProperties.MIXED_FORMAT_TABLE_STORE_SEPARATOR,
232+
CatalogMetaProperties.MIXED_FORMAT_TABLE_STORE_SEPARATOR_DEFAULT);
225233
}
226234

227235
private org.apache.iceberg.catalog.TableIdentifier toIcebergTableIdentifier(

amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/InternalMixedIcebergCatalog.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.amoro.TableFormat;
2222
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
2323
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
24+
import org.apache.amoro.table.MixedTable;
2425
import org.apache.amoro.table.PrimaryKeySpec;
2526
import org.apache.amoro.table.TableMetaStore;
2627
import org.apache.hadoop.conf.Configuration;
@@ -43,7 +44,6 @@
4344
public class InternalMixedIcebergCatalog extends BasicMixedIcebergCatalog {
4445

4546
public static final String CHANGE_STORE_SEPARATOR = "@";
46-
public static final String CHANGE_STORE_NAME = "change";
4747

4848
public static final String HTTP_HEADER_LIST_TABLE_FILTER = "LIST-TABLE-FILTER";
4949

@@ -81,29 +81,23 @@ public List<org.apache.amoro.table.TableIdentifier> listTables(String database)
8181
@Override
8282
protected MixedTables newMixedTables(
8383
TableMetaStore metaStore, Map<String, String> catalogProperties, Catalog icebergCatalog) {
84-
return new InternalMixedTables(metaStore, catalogProperties, icebergCatalog);
84+
return new InternalMixedTables(
85+
metaStore, catalogProperties, icebergCatalog, tableStoreSeparator());
86+
}
87+
88+
@Override
89+
protected String tableStoreSeparator() {
90+
return CHANGE_STORE_SEPARATOR;
8591
}
8692

8793
static class InternalMixedTables extends MixedTables {
8894

8995
public InternalMixedTables(
90-
TableMetaStore tableMetaStore, Map<String, String> catalogProperties, Catalog catalog) {
91-
super(tableMetaStore, catalogProperties, catalog);
92-
}
93-
94-
/**
95-
* For internal table, using {table-name}@change as change store identifier, this identifier
96-
* cloud be recognized by AMS. Due to '@' is an invalid character of table name, the change
97-
* store identifier will never be conflict with other table name.
98-
*
99-
* @param baseIdentifier base store table identifier.
100-
* @return change store iceberg table identifier.
101-
*/
102-
@Override
103-
protected TableIdentifier generateChangeStoreIdentifier(TableIdentifier baseIdentifier) {
104-
return TableIdentifier.of(
105-
baseIdentifier.namespace(),
106-
baseIdentifier.name() + CHANGE_STORE_SEPARATOR + CHANGE_STORE_NAME);
96+
TableMetaStore tableMetaStore,
97+
Map<String, String> catalogProperties,
98+
Catalog catalog,
99+
String separator) {
100+
super(tableMetaStore, catalogProperties, catalog, separator);
107101
}
108102

109103
/**
@@ -121,6 +115,13 @@ protected Table createChangeStore(
121115
return tableMetaStore.doAs(() -> icebergCatalog.loadTable(changeIdentifier));
122116
}
123117

118+
@Override
119+
protected TableIdentifier generateChangeStoreIdentifier(TableIdentifier baseIdentifier) {
120+
return TableIdentifier.of(
121+
baseIdentifier.namespace(),
122+
baseIdentifier.name() + CHANGE_STORE_SEPARATOR + MixedTable.CHANGE_STORE_IDENTIFIER);
123+
}
124+
124125
/**
125126
* The change store will be dropped automatically by AMS when dropping the base store, so we do
126127
* nothing here

amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/MixedTables.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.amoro.TableFormat;
2222
import org.apache.amoro.io.AuthenticatedFileIO;
2323
import org.apache.amoro.io.AuthenticatedFileIOs;
24-
import org.apache.amoro.properties.CatalogMetaProperties;
2524
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
2625
import org.apache.amoro.table.BaseTable;
2726
import org.apache.amoro.table.BasicKeyedTable;
@@ -32,6 +31,7 @@
3231
import org.apache.amoro.table.TableMetaStore;
3332
import org.apache.amoro.table.UnkeyedTable;
3433
import org.apache.amoro.utils.MixedFormatCatalogUtil;
34+
import org.apache.amoro.utils.MixedTableUtil;
3535
import org.apache.amoro.utils.TablePropertyUtil;
3636
import org.apache.iceberg.PartitionSpec;
3737
import org.apache.iceberg.Schema;
@@ -47,16 +47,20 @@
4747
public class MixedTables {
4848
private static final Logger LOG = LoggerFactory.getLogger(MixedTables.class);
4949

50-
protected TableMetaStore tableMetaStore;
51-
protected Catalog icebergCatalog;
52-
53-
protected Map<String, String> catalogProperties;
50+
protected final TableMetaStore tableMetaStore;
51+
protected final Catalog icebergCatalog;
52+
protected final Map<String, String> catalogProperties;
53+
protected final String separator;
5454

5555
public MixedTables(
56-
TableMetaStore tableMetaStore, Map<String, String> catalogProperties, Catalog catalog) {
56+
TableMetaStore tableMetaStore,
57+
Map<String, String> catalogProperties,
58+
Catalog catalog,
59+
String separator) {
5760
this.tableMetaStore = tableMetaStore;
5861
this.icebergCatalog = catalog;
5962
this.catalogProperties = catalogProperties;
63+
this.separator = separator;
6064
}
6165

6266
/**
@@ -86,12 +90,9 @@ public TableIdentifier parseChangeIdentifier(Table base) {
8690
* @return change store table identifier.
8791
*/
8892
protected TableIdentifier generateChangeStoreIdentifier(TableIdentifier baseIdentifier) {
89-
String separator =
90-
catalogProperties.getOrDefault(
91-
CatalogMetaProperties.MIXED_FORMAT_TABLE_STORE_SEPARATOR,
92-
CatalogMetaProperties.MIXED_FORMAT_TABLE_STORE_SEPARATOR_DEFAULT);
9393
return TableIdentifier.of(
94-
baseIdentifier.namespace(), baseIdentifier.name() + separator + "change" + separator);
94+
baseIdentifier.namespace(),
95+
MixedTableUtil.changeStoreName(baseIdentifier.name(), separator));
9596
}
9697

9798
/**

amoro-format-iceberg/src/main/java/org/apache/amoro/table/MixedTable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
/** Represents an mixed-format table. */
3232
public interface MixedTable extends Serializable {
3333

34+
String CHANGE_STORE_IDENTIFIER = "change";
35+
3436
/** Returns the {@link TableIdentifier} of this table */
3537
TableIdentifier id();
3638

amoro-format-iceberg/src/main/java/org/apache/amoro/utils/MixedTableUtil.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,27 @@ public static PartitionSpec getMixedTablePartitionSpecById(MixedTable mixedTable
190190
return spec;
191191
}
192192
}
193+
194+
/**
195+
* Generate change store table name for mixed format tables.
196+
*
197+
* @param tableName mixed format table name.
198+
* @param separator change store separator.
199+
* @return change store table name.
200+
*/
201+
public static String changeStoreName(String tableName, String separator) {
202+
return tableName + separator + MixedTable.CHANGE_STORE_IDENTIFIER + separator;
203+
}
204+
205+
/**
206+
* Determine if it is a change store of a mixed format table by the table name.
207+
*
208+
* @param tableName table name.
209+
* @param separator change store separator.
210+
* @return if it is a change store.
211+
*/
212+
public static boolean isChangeStore(String tableName, String separator) {
213+
return tableName != null
214+
&& tableName.endsWith(separator + MixedTable.CHANGE_STORE_IDENTIFIER + separator);
215+
}
193216
}

amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestBase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,16 @@ protected void createHiveSource(
220220
}
221221

222222
protected void createViewSource(Schema schema, List<Record> data) {
223+
createViewSource(schema, data, Double.NaN);
224+
}
225+
226+
protected void createViewSource(Schema schema, List<Record> data, double version) {
223227
Dataset<Row> ds =
224228
spark()
225229
.createDataFrame(
226-
data.stream().map(TestTableUtil::recordToRow).collect(Collectors.toList()),
230+
data.stream()
231+
.map(r -> TestTableUtil.recordToRow(r, version))
232+
.collect(Collectors.toList()),
227233
SparkSchemaUtil.convert(schema));
228234

229235
ds.createOrReplaceTempView(sourceTable);

amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/utils/TestTableUtil.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,17 @@
7272
import java.util.stream.Collectors;
7373

7474
public class TestTableUtil {
75-
76-
private static Object[] recordToObjects(Record record) {
75+
private static Object[] recordToObjects(Record record, double sparkVersion) {
7776
Object[] values = new Object[record.size()];
7877
for (int i = 0; i < values.length; i++) {
7978
Object v = record.get(i);
8079
if (v instanceof LocalDateTime) {
81-
Timestamp ts =
82-
Timestamp.valueOf(((LocalDateTime) v).atZone(ZoneOffset.UTC).toLocalDateTime());
83-
Timestamp tsUTC = Timestamp.valueOf((LocalDateTime) v);
84-
values[i] = ts;
85-
continue;
80+
if (Double.isNaN(sparkVersion) || sparkVersion < 3.4) {
81+
Timestamp ts =
82+
Timestamp.valueOf(((LocalDateTime) v).atZone(ZoneOffset.UTC).toLocalDateTime());
83+
values[i] = ts;
84+
continue;
85+
}
8686
} else if (v instanceof OffsetDateTime) {
8787
v = new Timestamp(((OffsetDateTime) v).toInstant().toEpochMilli());
8888
}
@@ -92,7 +92,11 @@ private static Object[] recordToObjects(Record record) {
9292
}
9393

9494
public static Row recordToRow(Record record) {
95-
Object[] values = recordToObjects(record);
95+
return recordToRow(record, Double.NaN);
96+
}
97+
98+
public static Row recordToRow(Record record, double sparkVersion) {
99+
Object[] values = recordToObjects(record, sparkVersion);
96100
return RowFactory.create(values);
97101
}
98102

@@ -103,6 +107,10 @@ public static InternalRow recordToInternalRow(Schema schema, Record record) {
103107
}
104108

105109
public static Record rowToRecord(Row row, Types.StructType type) {
110+
return rowToRecord(row, type, Double.NaN);
111+
}
112+
113+
public static Record rowToRecord(Row row, Types.StructType type, double sparkVersion) {
106114
Record record = GenericRecord.create(type);
107115
for (int i = 0; i < type.fields().size(); i++) {
108116
Object v = row.get(i);
@@ -114,9 +122,14 @@ public static Record rowToRecord(Row row, Types.StructType type) {
114122
record.set(i, offsetDateTime);
115123
continue;
116124
} else if (field.type().equals(Types.TimestampType.withoutZone())) {
117-
Preconditions.checkArgument(v instanceof Timestamp);
118-
Object localDatetime = ((Timestamp) v).toLocalDateTime();
119-
record.set(i, localDatetime);
125+
if (Double.isNaN(sparkVersion) || sparkVersion < 3.4) {
126+
Preconditions.checkArgument(v instanceof Timestamp);
127+
Object localDatetime = ((Timestamp) v).toLocalDateTime();
128+
record.set(i, localDatetime);
129+
} else {
130+
Preconditions.checkArgument(v instanceof LocalDateTime);
131+
record.set(i, v);
132+
}
120133
continue;
121134
}
122135
record.set(i, v);

amoro-format-mixed/amoro-mixed-spark/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,7 @@
3737
<module>v3.2/amoro-mixed-spark-runtime-3.2</module>
3838
<module>v3.3/amoro-mixed-spark-3.3</module>
3939
<module>v3.3/amoro-mixed-spark-runtime-3.3</module>
40+
<module>v3.5/amoro-mixed-spark-3.5</module>
41+
<module>v3.5/amoro-mixed-spark-runtime-3.5</module>
4042
</modules>
4143
</project>

0 commit comments

Comments
 (0)