Skip to content

Commit 4ec539d

Browse files
authored
fix(plugin-iceberg): Incorrect Iceberg columnHandle type in Prestissimo (#26411)
## Description <!---Describe your changes in detail--> **Problem** When writing to Iceberg tables with partition transforms (e.g., bucket(), year(), month(), day(), hour(), truncate()), the IcebergColumnHandle type was incorrectly set to `REGULAR` for partition source columns instead of `PARTITION_KEY`. This caused issues in Prestissimo (native execution) where the partition column information is needed to correctly construct partition data when writing to Iceberg tables. For example, for a table partitioned by bucket(10, user_id) or year(event_time): The source columns should be marked as `PARTITION_KEY` during INSERT operations. Previously, only identity partition columns were marked as `PARTITION_KEY` Non-identity partition transforms were ignored, causing the source columns to be incorrectly marked as `REGULAR`. The existing getColumns() method only identified identity partition columns using getPartitionFields(partitionSpec, IDENTITY), which filters out all non-identity transforms. This PR introduces a new getColumnsForWrite() method specifically for write operations (INSERT queries) that: Identifies all partition source columns by examining partitionSpec.fields() and extracting source field IDs. Marks them as `PARTITION_KEY` so Prestissimo can access the correct column type information during writes. The existing getColumns() method is preserved for read operations (SELECT queries) where only identity partition columns should be marked as `PARTITION_KEY` to ensure partition source columns are read from data files. This is binding to Hive implementation and because it doesn't read partition column from data file. But for iceberg there is no way to reverse a partitioned value to original column value. ## Motivation and Context <!---Why is this change required? What problem does it solve?--> <!---If it fixes an open issue, please link to the issue here.--> This is a preliminary PR for supporting partition transform of Iceberg native insertion. ## Impact <!---Describe any public API or user-facing feature change or any performance impact--> ## Test Plan <!---Please fill in how you tested your change--> ## Contributor checklist - [ ] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [ ] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [ ] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [ ] Adequate tests were added if applicable. - [ ] CI passed. - [ ] If adding new dependencies, verified they have an [OpenSSF Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or higher (or obtained explicit TSC approval for lower scores). ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == NO RELEASE NOTE == ```
1 parent 5775ef3 commit 4ec539d

File tree

7 files changed

+34
-14
lines changed

7 files changed

+34
-14
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@
157157
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
158158
import static com.facebook.presto.iceberg.IcebergUtil.MIN_FORMAT_VERSION_FOR_DELETE;
159159
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
160+
import static com.facebook.presto.iceberg.IcebergUtil.getColumnsForWrite;
160161
import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode;
161162
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
162163
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionFields;
@@ -518,7 +519,7 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(ConnectorSession se
518519
table.getIcebergTableName(),
519520
toPrestoSchema(icebergTable.schema(), typeManager),
520521
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
521-
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
522+
getColumnsForWrite(icebergTable.schema(), icebergTable.spec(), typeManager),
522523
icebergTable.location(),
523524
getFileFormat(icebergTable),
524525
getCompressionCodec(session),

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@
124124
import static com.facebook.presto.iceberg.IcebergTableProperties.getTableLocation;
125125
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
126126
import static com.facebook.presto.iceberg.IcebergUtil.createIcebergViewProperties;
127-
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
127+
import static com.facebook.presto.iceberg.IcebergUtil.getColumnsForWrite;
128128
import static com.facebook.presto.iceberg.IcebergUtil.getHiveIcebergTable;
129129
import static com.facebook.presto.iceberg.IcebergUtil.isIcebergTable;
130130
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
@@ -390,7 +390,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
390390
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty()),
391391
toPrestoSchema(metadata.schema(), typeManager),
392392
toPrestoPartitionSpec(metadata.spec(), typeManager),
393-
getColumns(metadata.schema(), metadata.spec(), typeManager),
393+
getColumnsForWrite(metadata.schema(), metadata.spec(), typeManager),
394394
targetPath,
395395
fileFormat,
396396
getCompressionCodec(session),

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
6868
import static com.facebook.presto.iceberg.IcebergUtil.VIEW_OWNER;
6969
import static com.facebook.presto.iceberg.IcebergUtil.createIcebergViewProperties;
70-
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
70+
import static com.facebook.presto.iceberg.IcebergUtil.getColumnsForWrite;
7171
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
7272
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergView;
7373
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
@@ -386,7 +386,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
386386
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty()),
387387
toPrestoSchema(icebergTable.schema(), typeManager),
388388
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
389-
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
389+
getColumnsForWrite(icebergTable.schema(), icebergTable.spec(), typeManager),
390390
icebergTable.location(),
391391
fileFormat,
392392
getCompressionCodec(session),

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
7575
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_TOO_MANY_OPEN_PARTITIONS;
7676
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
77-
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
77+
import static com.facebook.presto.iceberg.IcebergUtil.getColumnsForWrite;
7878
import static com.facebook.presto.iceberg.PartitionTransforms.getColumnTransform;
7979
import static com.google.common.base.Preconditions.checkArgument;
8080
import static com.google.common.base.Verify.verify;
@@ -157,7 +157,7 @@ public IcebergPageSink(
157157
this.sortOrder = requireNonNull(sortOrder, "sortOrder is null");
158158
String tempDirectoryPath = locationProvider.newDataLocation("sort-tmp-files");
159159
this.tempDirectory = new Path(tempDirectoryPath);
160-
this.columnTypes = getColumns(outputSchema, partitionSpec, requireNonNull(sortParameters.getTypeManager(), "typeManager is null")).stream()
160+
this.columnTypes = getColumnsForWrite(outputSchema, partitionSpec, requireNonNull(sortParameters.getTypeManager(), "typeManager is null")).stream()
161161
.map(IcebergColumnHandle::getType)
162162
.collect(toImmutableList());
163163
this.sortParameters = sortParameters;

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,25 @@ public static List<IcebergColumnHandle> getColumns(Stream<Integer> fields, Schem
365365
.collect(toImmutableList());
366366
}
367367

368+
public static List<IcebergColumnHandle> getColumnsForWrite(Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
369+
{
370+
return getColumnsForWrite(schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager);
371+
}
372+
373+
private static List<IcebergColumnHandle> getColumnsForWrite(Stream<Integer> fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
374+
{
375+
Set<Integer> partitionSourceIds = partitionSpec.fields().stream()
376+
.map(PartitionField::sourceId)
377+
.collect(toImmutableSet());
378+
379+
return fields
380+
.map(schema::findField)
381+
.map(column -> partitionSourceIds.contains(column.fieldId()) ?
382+
IcebergColumnHandle.create(column, typeManager, PARTITION_KEY) :
383+
IcebergColumnHandle.create(column, typeManager, REGULAR))
384+
.collect(toImmutableList());
385+
}
386+
368387
public static Map<PartitionField, Integer> getIdentityPartitions(PartitionSpec partitionSpec)
369388
{
370389
// TODO: expose transform information in Iceberg library

presto-iceberg/src/main/java/com/facebook/presto/iceberg/PartitionTransformType.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
public enum PartitionTransformType
2121
{
2222
IDENTITY("identity", 0),
23-
YEAR("year", 1),
24-
MONTH("month", 2),
25-
DAY("day", 3),
26-
HOUR("hour", 4),
23+
HOUR("hour", 1),
24+
DAY("day", 2),
25+
MONTH("month", 3),
26+
YEAR("year", 4),
2727
BUCKET("bucket", 5),
2828
TRUNCATE("truncate", 6);
2929

presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,10 @@ void from_json(const json& j, IcebergTableName& p);
131131
namespace facebook::presto::protocol::iceberg {
132132
enum class PartitionTransformType {
133133
IDENTITY,
134-
YEAR,
135-
MONTH,
136-
DAY,
137134
HOUR,
135+
DAY,
136+
MONTH,
137+
YEAR,
138138
BUCKET,
139139
TRUNCATE
140140
};

0 commit comments

Comments
 (0)