Skip to content

Commit 94cee21

Browse files
add field-id for array and map
1 parent bd8638f commit 94cee21

File tree

4 files changed

+180
-49
lines changed

4 files changed

+180
-49
lines changed

src/iceberg/avro/avro_reader.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,10 @@ class AvroReader::Impl {
108108
if (options.name_mapping) {
109109
ICEBERG_ASSIGN_OR_RAISE(
110110
auto new_root_node,
111-
CreateAvroNodeWithFieldIds(file_schema.root(), *options.name_mapping));
112-
113-
// Create a new schema with the updated root node
114-
auto new_schema = ::avro::ValidSchema(new_root_node);
111+
MakeAvroNodeWithFieldIds(file_schema.root(), *options.name_mapping));
115112

116113
// Update the file schema to use the new schema with field IDs
117-
file_schema = new_schema;
114+
file_schema = ::avro::ValidSchema(new_root_node);
118115
} else {
119116
return InvalidSchema(
120117
"Avro file schema has no field IDs and no name mapping provided");

src/iceberg/avro/avro_schema_util.cc

Lines changed: 77 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -788,10 +788,6 @@ Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& orig
788788
original_node->names());
789789
}
790790
const std::string& field_name = original_node->nameAt(i);
791-
if (i >= original_node->leaves()) {
792-
return InvalidSchema("Index {} is out of bounds for leaves (size: {})", i,
793-
original_node->leaves());
794-
}
795791
::avro::NodePtr field_node = original_node->leafAt(i);
796792

797793
// TODO(liuxiaoyu): Add support for case sensitivity in name matching.
@@ -821,6 +817,7 @@ Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& orig
821817
const auto& original_attrs = original_node->customAttributesAt(i);
822818
const auto& existing_attrs = original_attrs.attributes();
823819
for (const auto& attr_pair : existing_attrs) {
820+
// Copy each existing attribute to preserve original metadata
824821
attributes.addAttribute(attr_pair.first, attr_pair.second, false);
825822
}
826823
}
@@ -833,7 +830,7 @@ Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& orig
833830

834831
// Recursively apply field IDs to nested fields
835832
ICEBERG_ASSIGN_OR_RAISE(auto new_nested_node,
836-
CreateAvroNodeWithFieldIds(field_node, *nested_field));
833+
MakeAvroNodeWithFieldIds(field_node, *nested_field));
837834
new_record_node->addName(field_name);
838835
new_record_node->addLeaf(new_nested_node);
839836
} else {
@@ -856,7 +853,7 @@ Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& origi
856853
// Check if this is a map represented as array
857854
if (HasMapLogicalType(original_node)) {
858855
ICEBERG_ASSIGN_OR_RAISE(auto new_element_node,
859-
CreateAvroNodeWithFieldIds(original_node->leafAt(0), field));
856+
MakeAvroNodeWithFieldIds(original_node->leafAt(0), field));
860857
new_array_node->addLeaf(new_element_node);
861858
return new_array_node;
862859
}
@@ -881,8 +878,14 @@ Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& origi
881878

882879
ICEBERG_ASSIGN_OR_RAISE(
883880
auto new_element_node,
884-
CreateAvroNodeWithFieldIds(original_node->leafAt(0), *element_field));
881+
MakeAvroNodeWithFieldIds(original_node->leafAt(0), *element_field));
885882
new_array_node->addLeaf(new_element_node);
883+
884+
// Add element field ID as custom attribute
885+
::avro::CustomAttributes element_attributes;
886+
element_attributes.addAttribute(std::string(kFieldIdProp),
887+
std::to_string(*element_field->field_id), false);
888+
new_array_node->addCustomAttributesForField(element_attributes);
886889
} else {
887890
// If no element field found, this is an error
888891
return InvalidSchema("Element field not found in nested mapping for array");
@@ -899,32 +902,77 @@ Result<::avro::NodePtr> CreateMapNodeWithFieldIds(const ::avro::NodePtr& origina
899902

900903
auto new_map_node = std::make_shared<::avro::NodeMap>();
901904

902-
// For map types, we use fixed field IDs for key and value
903-
// Key field gets field ID 0, value field gets field ID 1
904-
constexpr int32_t kMapKeyFieldId = 0;
905-
constexpr int32_t kMapValueFieldId = 1;
905+
// For map types, we need to extract key and value field mappings from the nested
906+
// mapping
907+
if (!field.nested_mapping) {
908+
return InvalidSchema("Map type requires nested mapping for key and value fields");
909+
}
910+
911+
// Find key and value field mappings by name
912+
std::optional<int32_t> key_id = field.nested_mapping->Id("key");
913+
std::optional<int32_t> value_id = field.nested_mapping->Id("value");
914+
915+
if (!key_id || !value_id) {
916+
return InvalidSchema("Map type requires both 'key' and 'value' field mappings");
917+
}
918+
919+
std::optional<MappedFieldConstRef> key_field_ref = field.nested_mapping->Field(*key_id);
920+
std::optional<MappedFieldConstRef> value_field_ref =
921+
field.nested_mapping->Field(*value_id);
922+
923+
if (!key_field_ref || !value_field_ref) {
924+
return InvalidSchema("Map type requires both key and value field mappings");
925+
}
926+
927+
const auto& key_mapped_field = key_field_ref->get();
928+
const auto& value_mapped_field = value_field_ref->get();
906929

907-
// Create key field with fixed field ID
930+
if (!key_mapped_field.field_id || !value_mapped_field.field_id) {
931+
return InvalidSchema("Map key and value fields must have field IDs");
932+
}
933+
934+
// Create key field with mapped field ID
908935
MappedField key_field;
909-
key_field.field_id = kMapKeyFieldId;
910-
key_field.nested_mapping =
911-
field.nested_mapping; // Pass through nested mapping for complex key types
936+
key_field.field_id = *key_mapped_field.field_id;
937+
key_field.nested_mapping = key_mapped_field.nested_mapping;
912938

913-
// Create value field with fixed field ID
939+
// Create value field with mapped field ID
914940
MappedField value_field;
915-
value_field.field_id = kMapValueFieldId;
916-
value_field.nested_mapping =
917-
field.nested_mapping; // Pass through nested mapping for complex value types
941+
value_field.field_id = *value_mapped_field.field_id;
942+
value_field.nested_mapping = value_mapped_field.nested_mapping;
918943

919944
// Add key and value nodes
920-
ICEBERG_ASSIGN_OR_RAISE(
921-
auto new_key_node, CreateAvroNodeWithFieldIds(original_node->leafAt(0), key_field));
945+
ICEBERG_ASSIGN_OR_RAISE(auto new_key_node,
946+
MakeAvroNodeWithFieldIds(original_node->leafAt(0), key_field));
922947
ICEBERG_ASSIGN_OR_RAISE(
923948
auto new_value_node,
924-
CreateAvroNodeWithFieldIds(original_node->leafAt(1), value_field));
949+
MakeAvroNodeWithFieldIds(original_node->leafAt(1), value_field));
925950
new_map_node->addLeaf(new_key_node);
926951
new_map_node->addLeaf(new_value_node);
927952

953+
// Preserve existing custom attributes from the original node and add field ID
954+
// attributes Copy existing attributes from the original node (if any)
955+
if (original_node->customAttributes() > 0) {
956+
const auto& original_attrs = original_node->customAttributesAt(0);
957+
const auto& existing_attrs = original_attrs.attributes();
958+
for (const auto& attr_pair : existing_attrs) {
959+
// Copy each existing attribute to preserve original metadata
960+
::avro::CustomAttributes attributes;
961+
attributes.addAttribute(attr_pair.first, attr_pair.second, false);
962+
new_map_node->addCustomAttributesForField(attributes);
963+
}
964+
}
965+
966+
::avro::CustomAttributes key_attributes;
967+
key_attributes.addAttribute(std::string(kFieldIdProp),
968+
std::to_string(*key_mapped_field.field_id), false);
969+
new_map_node->addCustomAttributesForField(key_attributes);
970+
971+
::avro::CustomAttributes value_attributes;
972+
value_attributes.addAttribute(std::string(kFieldIdProp),
973+
std::to_string(*value_mapped_field.field_id), false);
974+
new_map_node->addCustomAttributesForField(value_attributes);
975+
928976
return new_map_node;
929977
}
930978

@@ -942,16 +990,14 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi
942990

943991
if (branch_0_is_null && !branch_1_is_null) {
944992
// branch_0 is null, branch_1 is not null
945-
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1,
946-
CreateAvroNodeWithFieldIds(branch_1, field));
993+
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1, MakeAvroNodeWithFieldIds(branch_1, field));
947994
auto new_union_node = std::make_shared<::avro::NodeUnion>();
948995
new_union_node->addLeaf(branch_0); // null branch
949996
new_union_node->addLeaf(new_branch_1);
950997
return new_union_node;
951998
} else if (!branch_0_is_null && branch_1_is_null) {
952999
// branch_0 is not null, branch_1 is null
953-
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0,
954-
CreateAvroNodeWithFieldIds(branch_0, field));
1000+
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0, MakeAvroNodeWithFieldIds(branch_0, field));
9551001
auto new_union_node = std::make_shared<::avro::NodeUnion>();
9561002
new_union_node->addLeaf(new_branch_0);
9571003
new_union_node->addLeaf(branch_1); // null branch
@@ -967,8 +1013,8 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi
9671013

9681014
} // namespace
9691015

970-
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
971-
const MappedField& mapped_field) {
1016+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
1017+
const MappedField& mapped_field) {
9721018
switch (original_node->type()) {
9731019
case ::avro::AVRO_RECORD:
9741020
return CreateRecordNodeWithFieldIds(original_node, mapped_field);
@@ -996,11 +1042,11 @@ Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& origin
9961042
}
9971043
}
9981044

999-
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
1000-
const NameMapping& mapping) {
1045+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
1046+
const NameMapping& mapping) {
10011047
MappedField mapped_field;
10021048
mapped_field.nested_mapping = std::make_shared<MappedFields>(mapping.AsMappedFields());
1003-
return CreateAvroNodeWithFieldIds(original_node, mapped_field);
1049+
return MakeAvroNodeWithFieldIds(original_node, mapped_field);
10041050
}
10051051

10061052
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,14 @@ bool HasMapLogicalType(const ::avro::NodePtr& node);
153153
/// \param original_node The original Avro node to copy.
154154
/// \param mapped_field The mapped field to apply field IDs from.
155155
/// \return A new Avro node with field IDs applied, or an error.
156-
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
157-
const MappedField& mapped_field);
156+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
157+
const MappedField& mapped_field);
158158

159159
/// \brief Create a new Avro node with field IDs from name mapping.
160160
/// \param original_node The original Avro node to copy.
161161
/// \param mapping The name mapping to apply field IDs from.
162162
/// \return A new Avro node with field IDs applied, or an error.
163-
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
164-
const NameMapping& mapping);
163+
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
164+
const NameMapping& mapping);
165165

166166
} // namespace iceberg::avro

0 commit comments

Comments
 (0)