Skip to content

Commit 9ad7bcf

Browse files
add field-id for array and map
1 parent cda7ce2 commit 9ad7bcf

File tree

4 files changed

+180
-49
lines changed

4 files changed

+180
-49
lines changed

src/iceberg/avro/avro_reader.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,10 @@ class AvroReader::Impl {
108108
if (options.name_mapping) {
109109
ICEBERG_ASSIGN_OR_RAISE(
110110
auto new_root_node,
111-
CreateAvroNodeWithFieldIds(file_schema.root(), *options.name_mapping));
112-
113-
// Create a new schema with the updated root node
114-
auto new_schema = ::avro::ValidSchema(new_root_node);
111+
MakeAvroNodeWithFieldIds(file_schema.root(), *options.name_mapping));
115112

116113
// Update the file schema to use the new schema with field IDs
117-
file_schema = new_schema;
114+
file_schema = ::avro::ValidSchema(new_root_node);
118115
} else {
119116
return InvalidSchema(
120117
"Avro file schema has no field IDs and no name mapping provided");

src/iceberg/avro/avro_schema_util.cc

Lines changed: 77 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -798,10 +798,6 @@ Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& orig
798798
original_node->names());
799799
}
800800
const std::string& field_name = original_node->nameAt(i);
801-
if (i >= original_node->leaves()) {
802-
return InvalidSchema("Index {} is out of bounds for leaves (size: {})", i,
803-
original_node->leaves());
804-
}
805801
::avro::NodePtr field_node = original_node->leafAt(i);
806802

807803
// TODO(liuxiaoyu): Add support for case sensitivity in name matching.
@@ -831,6 +827,7 @@ Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& orig
831827
const auto& original_attrs = original_node->customAttributesAt(i);
832828
const auto& existing_attrs = original_attrs.attributes();
833829
for (const auto& attr_pair : existing_attrs) {
830+
// Copy each existing attribute to preserve original metadata
834831
attributes.addAttribute(attr_pair.first, attr_pair.second, false);
835832
}
836833
}
@@ -843,7 +840,7 @@ Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& orig
843840

844841
// Recursively apply field IDs to nested fields
845842
ICEBERG_ASSIGN_OR_RAISE(auto new_nested_node,
846-
CreateAvroNodeWithFieldIds(field_node, *nested_field));
843+
MakeAvroNodeWithFieldIds(field_node, *nested_field));
847844
new_record_node->addName(field_name);
848845
new_record_node->addLeaf(new_nested_node);
849846
} else {
@@ -866,7 +863,7 @@ Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& origi
866863
// Check if this is a map represented as array
867864
if (HasMapLogicalType(original_node)) {
868865
ICEBERG_ASSIGN_OR_RAISE(auto new_element_node,
869-
CreateAvroNodeWithFieldIds(original_node->leafAt(0), field));
866+
MakeAvroNodeWithFieldIds(original_node->leafAt(0), field));
870867
new_array_node->addLeaf(new_element_node);
871868
return new_array_node;
872869
}
@@ -891,8 +888,14 @@ Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& origi
891888

892889
ICEBERG_ASSIGN_OR_RAISE(
893890
auto new_element_node,
894-
CreateAvroNodeWithFieldIds(original_node->leafAt(0), *element_field));
891+
MakeAvroNodeWithFieldIds(original_node->leafAt(0), *element_field));
895892
new_array_node->addLeaf(new_element_node);
893+
894+
// Add element field ID as custom attribute
895+
::avro::CustomAttributes element_attributes;
896+
element_attributes.addAttribute(std::string(kFieldIdProp),
897+
std::to_string(*element_field->field_id), false);
898+
new_array_node->addCustomAttributesForField(element_attributes);
896899
} else {
897900
// If no element field found, this is an error
898901
return InvalidSchema("Element field not found in nested mapping for array");
@@ -909,32 +912,77 @@ Result<::avro::NodePtr> CreateMapNodeWithFieldIds(const ::avro::NodePtr& origina
909912

910913
auto new_map_node = std::make_shared<::avro::NodeMap>();
911914

912-
// For map types, we use fixed field IDs for key and value
913-
// Key field gets field ID 0, value field gets field ID 1
914-
constexpr int32_t kMapKeyFieldId = 0;
915-
constexpr int32_t kMapValueFieldId = 1;
915+
// For map types, we need to extract key and value field mappings from the nested
916+
// mapping
917+
if (!field.nested_mapping) {
918+
return InvalidSchema("Map type requires nested mapping for key and value fields");
919+
}
920+
921+
// Find key and value field mappings by name
922+
std::optional<int32_t> key_id = field.nested_mapping->Id("key");
923+
std::optional<int32_t> value_id = field.nested_mapping->Id("value");
924+
925+
if (!key_id || !value_id) {
926+
return InvalidSchema("Map type requires both 'key' and 'value' field mappings");
927+
}
928+
929+
std::optional<MappedFieldConstRef> key_field_ref = field.nested_mapping->Field(*key_id);
930+
std::optional<MappedFieldConstRef> value_field_ref =
931+
field.nested_mapping->Field(*value_id);
932+
933+
if (!key_field_ref || !value_field_ref) {
934+
return InvalidSchema("Map type requires both key and value field mappings");
935+
}
936+
937+
const auto& key_mapped_field = key_field_ref->get();
938+
const auto& value_mapped_field = value_field_ref->get();
916939

917-
// Create key field with fixed field ID
940+
if (!key_mapped_field.field_id || !value_mapped_field.field_id) {
941+
return InvalidSchema("Map key and value fields must have field IDs");
942+
}
943+
944+
// Create key field with mapped field ID
918945
MappedField key_field;
919-
key_field.field_id = kMapKeyFieldId;
920-
key_field.nested_mapping =
921-
field.nested_mapping; // Pass through nested mapping for complex key types
946+
key_field.field_id = *key_mapped_field.field_id;
947+
key_field.nested_mapping = key_mapped_field.nested_mapping;
922948

923-
// Create value field with fixed field ID
949+
// Create value field with mapped field ID
924950
MappedField value_field;
925-
value_field.field_id = kMapValueFieldId;
926-
value_field.nested_mapping =
927-
field.nested_mapping; // Pass through nested mapping for complex value types
951+
value_field.field_id = *value_mapped_field.field_id;
952+
value_field.nested_mapping = value_mapped_field.nested_mapping;
928953

929954
// Add key and value nodes
930-
ICEBERG_ASSIGN_OR_RAISE(
931-
auto new_key_node, CreateAvroNodeWithFieldIds(original_node->leafAt(0), key_field));
955+
ICEBERG_ASSIGN_OR_RAISE(auto new_key_node,
956+
MakeAvroNodeWithFieldIds(original_node->leafAt(0), key_field));
932957
ICEBERG_ASSIGN_OR_RAISE(
933958
auto new_value_node,
934-
CreateAvroNodeWithFieldIds(original_node->leafAt(1), value_field));
959+
MakeAvroNodeWithFieldIds(original_node->leafAt(1), value_field));
935960
new_map_node->addLeaf(new_key_node);
936961
new_map_node->addLeaf(new_value_node);
937962

963+
// Preserve existing custom attributes from the original node and add field ID
964+
// attributes Copy existing attributes from the original node (if any)
965+
if (original_node->customAttributes() > 0) {
966+
const auto& original_attrs = original_node->customAttributesAt(0);
967+
const auto& existing_attrs = original_attrs.attributes();
968+
for (const auto& attr_pair : existing_attrs) {
969+
// Copy each existing attribute to preserve original metadata
970+
::avro::CustomAttributes attributes;
971+
attributes.addAttribute(attr_pair.first, attr_pair.second, false);
972+
new_map_node->addCustomAttributesForField(attributes);
973+
}
974+
}
975+
976+
::avro::CustomAttributes key_attributes;
977+
key_attributes.addAttribute(std::string(kFieldIdProp),
978+
std::to_string(*key_mapped_field.field_id), false);
979+
new_map_node->addCustomAttributesForField(key_attributes);
980+
981+
::avro::CustomAttributes value_attributes;
982+
value_attributes.addAttribute(std::string(kFieldIdProp),
983+
std::to_string(*value_mapped_field.field_id), false);
984+
new_map_node->addCustomAttributesForField(value_attributes);
985+
938986
return new_map_node;
939987
}
940988

@@ -952,16 +1000,14 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi
9521000

9531001
if (branch_0_is_null && !branch_1_is_null) {
9541002
// branch_0 is null, branch_1 is not null
955-
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1,
956-
CreateAvroNodeWithFieldIds(branch_1, field));
1003+
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1, MakeAvroNodeWithFieldIds(branch_1, field));
9571004
auto new_union_node = std::make_shared<::avro::NodeUnion>();
9581005
new_union_node->addLeaf(branch_0); // null branch
9591006
new_union_node->addLeaf(new_branch_1);
9601007
return new_union_node;
9611008
} else if (!branch_0_is_null && branch_1_is_null) {
9621009
// branch_0 is not null, branch_1 is null
963-
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0,
964-
CreateAvroNodeWithFieldIds(branch_0, field));
1010+
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0, MakeAvroNodeWithFieldIds(branch_0, field));
9651011
auto new_union_node = std::make_shared<::avro::NodeUnion>();
9661012
new_union_node->addLeaf(new_branch_0);
9671013
new_union_node->addLeaf(branch_1); // null branch
@@ -977,8 +1023,8 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi
9771023

9781024
} // namespace
9791025

980-
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
981-
const MappedField& mapped_field) {
1026+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
1027+
const MappedField& mapped_field) {
9821028
switch (original_node->type()) {
9831029
case ::avro::AVRO_RECORD:
9841030
return CreateRecordNodeWithFieldIds(original_node, mapped_field);
@@ -1006,11 +1052,11 @@ Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& origin
10061052
}
10071053
}
10081054

1009-
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
1010-
const NameMapping& mapping) {
1055+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
1056+
const NameMapping& mapping) {
10111057
MappedField mapped_field;
10121058
mapped_field.nested_mapping = std::make_shared<MappedFields>(mapping.AsMappedFields());
1013-
return CreateAvroNodeWithFieldIds(original_node, mapped_field);
1059+
return MakeAvroNodeWithFieldIds(original_node, mapped_field);
10141060
}
10151061

10161062
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,14 @@ bool HasMapLogicalType(const ::avro::NodePtr& node);
149149
/// \param original_node The original Avro node to copy.
150150
/// \param mapped_field The mapped field to apply field IDs from.
151151
/// \return A new Avro node with field IDs applied, or an error.
152-
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
153-
const MappedField& mapped_field);
152+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
153+
const MappedField& mapped_field);
154154

155155
/// \brief Create a new Avro node with field IDs from name mapping.
156156
/// \param original_node The original Avro node to copy.
157157
/// \param mapping The name mapping to apply field IDs from.
158158
/// \return A new Avro node with field IDs applied, or an error.
159-
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
160-
const NameMapping& mapping);
159+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
160+
const NameMapping& mapping);
161161

162162
} // namespace iceberg::avro

0 commit comments

Comments
 (0)