Skip to content

Commit dd4ba58

Browse files
committed
Avoid deserialization of record type field
1 parent 7447f71 commit dd4ba58

File tree

6 files changed

+31
-16
lines changed

6 files changed

+31
-16
lines changed

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,11 @@ private void readFromMDTFileSliceAndValidate(HoodieTableMetaClient metadataMetaC
227227
.withProps(new TypedProperties())
228228
.withEnableOptimizedLogBlockScan(hoodieWriteConfig.getMetadataConfig().isOptimizedLogBlocksScanEnabled())
229229
.build();
230+
int recordType = MetadataPartitionType.fromPartitionPath(fileSlice.getPartitionPath()).getRecordType();
230231
try (ClosableIterator<HoodieRecord<IndexedRecord>> records = fileGroupReader.getClosableHoodieRecordIterator()) {
231232
Map<String, HoodieRecord<HoodieMetadataPayload>> actualMdtRecordMap = new HashMap<>();
232233
records.forEachRemaining(record -> {
233-
HoodieMetadataPayload payload = new HoodieMetadataPayload(Option.of((GenericRecord) record.getData()));
234+
HoodieMetadataPayload payload = new HoodieMetadataPayload(Option.of((GenericRecord) record.getData()), recordType);
234235
actualMdtRecordMap.put(record.getRecordKey(), new HoodieAvroRecord<>(record.getKey(), payload));
235236
});
236237

hudi-common/src/main/java/org/apache/hudi/common/model/SerializableMetadataIndexedRecord.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,28 @@
4242
import java.util.concurrent.ConcurrentHashMap;
4343

4444
import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
45+
import static org.apache.hudi.metadata.HoodieMetadataPayload.KEY_FIELD_NAME;
4546

4647
public class SerializableMetadataIndexedRecord implements GenericRecord, KryoSerializable, Serializable {
4748
private static final long serialVersionUID = 1L;
4849
private static final ConcurrentHashMap<Schema, GenericDatumReader<GenericRecord>> CACHED_DATUM_READER_MAP = new ConcurrentHashMap<>();
4950
private static final ConcurrentHashMap<Schema, Schema.Field> CACHED_KEY_SCHEMA_MAP = new ConcurrentHashMap<>();
5051
private IndexedRecord record;
5152
private Schema schema;
53+
// TODO(yihua): need to have a better way of referencing this reader
54+
private GenericDatumReader<GenericRecord> datumReader;
5255
String key;
5356
private byte[] keyValueBytes;
5457
int valueOffset;
5558
int valueLength;
5659

5760
private SerializableMetadataIndexedRecord(Schema schema,
61+
GenericDatumReader<GenericRecord> datumReader,
5862
String key,
5963
byte[] keyValueBytes,
6064
int valueOffset,
6165
int valueLength) {
66+
this.datumReader = datumReader;
6267
this.key = key;
6368
this.keyValueBytes = keyValueBytes;
6469
this.valueOffset = valueOffset;
@@ -68,11 +73,13 @@ private SerializableMetadataIndexedRecord(Schema schema,
6873
}
6974

7075
public static SerializableMetadataIndexedRecord fromHFileKeyValueBytes(Schema schema,
76+
GenericDatumReader<GenericRecord> datumReader,
7177
Schema.Field keyFieldSchema,
7278
KeyValue hfileKeyValue) {
7379
CACHED_KEY_SCHEMA_MAP.computeIfAbsent(schema, k -> keyFieldSchema);
7480
return new SerializableMetadataIndexedRecord(
7581
schema,
82+
datumReader,
7683
fromUTF8Bytes(hfileKeyValue.getBytes(), hfileKeyValue.getKeyContentOffset(), hfileKeyValue.getKeyContentLength()),
7784
hfileKeyValue.getBytes(), hfileKeyValue.getValueOffset(),
7885
hfileKeyValue.getValueLength());
@@ -114,8 +121,8 @@ void decodeRecord(Schema schema) {
114121
public IndexedRecord getData() {
115122
if (record == null) {
116123
try {
117-
GenericDatumReader<GenericRecord> datumReader = CACHED_DATUM_READER_MAP.computeIfAbsent(
118-
schema, GenericDatumReader::new);
124+
//GenericDatumReader<GenericRecord> datumReader = CACHED_DATUM_READER_MAP.computeIfAbsent(
125+
// schema, GenericDatumReader::new);
119126
record = HoodieAvroHFileReaderImplBase.deserialize(
120127
key, keyValueBytes, valueOffset, valueLength, datumReader,
121128
CACHED_KEY_SCHEMA_MAP.get(schema));
@@ -163,7 +170,7 @@ public void put(String key, Object v) {
163170

164171
@Override
165172
public Object get(String key) {
166-
if (CACHED_KEY_SCHEMA_MAP.get(schema).name().equals(key)) {
173+
if (KEY_FIELD_NAME.equals(key)) {
167174
return key;
168175
}
169176
getData();

hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ public boolean hasNext() {
361361
return false;
362362
}
363363

364-
this.next = SerializableMetadataIndexedRecord.fromHFileKeyValueBytes(schema, keyFieldSchema, reader.getKeyValue().get());
364+
this.next = SerializableMetadataIndexedRecord.fromHFileKeyValueBytes(schema, datumReader, keyFieldSchema, reader.getKeyValue().get());
365365
return true;
366366
} catch (IOException io) {
367367
throw new HoodieIOException("unable to read next record from hfile ", io);
@@ -434,7 +434,7 @@ public boolean hasNext() {
434434
if (reader.seekTo(key) == HFileReader.SEEK_TO_FOUND) {
435435
// Key is found
436436
KeyValue keyValue = reader.getKeyValue().get();
437-
next = SerializableMetadataIndexedRecord.fromHFileKeyValueBytes(schema, keyFieldSchema, keyValue);
437+
next = SerializableMetadataIndexedRecord.fromHFileKeyValueBytes(schema, datumReader, keyFieldSchema, keyValue);
438438
return true;
439439
}
440440
}
@@ -586,7 +586,7 @@ public boolean hasNext() {
586586
if (!isPrefixOfKey(lookUpKeyPrefix, keyValue.getKey())) {
587587
return false;
588588
}
589-
next = SerializableMetadataIndexedRecord.fromHFileKeyValueBytes(writerSchema, keyFieldSchema, keyValue);
589+
next = SerializableMetadataIndexedRecord.fromHFileKeyValueBytes(writerSchema, datumReader, keyFieldSchema, keyValue);
590590
// In case scanner is not able to advance, it means we reached EOF
591591
eof = !reader.next();
592592
} catch (IOException e) {

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -245,14 +245,14 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(
245245
List<FileSlice> partitionFileSlices = partitionFileSliceMap.computeIfAbsent(partitionName,
246246
k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, getMetadataFileSystemView(), partitionName));
247247
checkState(!partitionFileSlices.isEmpty(), () -> "Number of file slices for partition " + partitionName + " should be > 0");
248-
248+
int recordType = MetadataPartitionType.fromPartitionPath(partitionName).getRecordType();
249249
return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) :
250250
getEngineContext().parallelize(partitionFileSlices))
251251
.flatMap(
252252
(SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice ->
253253
readSliceAndFilterByKeysIntoList(partitionName, sortedKeyPrefixes, fileSlice,
254254
metadataRecord -> {
255-
HoodieMetadataPayload payload = new HoodieMetadataPayload(Option.of(metadataRecord));
255+
HoodieMetadataPayload payload = new HoodieMetadataPayload(Option.of(metadataRecord), recordType);
256256
String rowKey = payload.key != null ? payload.key : metadataRecord.get(KEY_FIELD_NAME).toString();
257257
HoodieKey key = new HoodieKey(rowKey, partitionName);
258258
return new HoodieAvroRecord<>(key, payload);
@@ -587,10 +587,12 @@ private HoodieData<HoodieRecord<HoodieMetadataPayload>> readSliceAndFilterByKeys
587587
private ClosableIterator<Pair<String, HoodieRecord<HoodieMetadataPayload>>> readSliceAndFilterByKeys(String partitionName,
588588
List<String> sortedKeys,
589589
FileSlice fileSlice) {
590-
boolean isSecondaryIndex = MetadataPartitionType.fromPartitionPath(partitionName).equals(MetadataPartitionType.SECONDARY_INDEX);
590+
MetadataPartitionType metadataPartitionType = MetadataPartitionType.fromPartitionPath(partitionName);
591+
boolean isSecondaryIndex = metadataPartitionType.equals(MetadataPartitionType.SECONDARY_INDEX);
592+
int recordType = metadataPartitionType.getRecordType();
591593
return new CloseableFilterIterator<>(
592594
readSliceAndFilterByKeysIntoList(partitionName, sortedKeys, fileSlice, metadataRecord -> {
593-
HoodieMetadataPayload payload = new HoodieMetadataPayload(Option.of(metadataRecord));
595+
HoodieMetadataPayload payload = new HoodieMetadataPayload(Option.of(metadataRecord), recordType);
594596
String rowKey = payload.key != null ? payload.key : metadataRecord.get(KEY_FIELD_NAME).toString();
595597
HoodieKey hoodieKey = new HoodieKey(rowKey, partitionName);
596598
return Pair.of(rowKey, new HoodieAvroRecord<>(hoodieKey, payload));
@@ -602,9 +604,10 @@ private ClosableIterator<HoodieRecord<HoodieMetadataPayload>> lookupRecordsItr(S
602604
Collection<String> keys,
603605
FileSlice fileSlice,
604606
boolean isFullKey) {
607+
int recordType = MetadataPartitionType.fromPartitionPath(partitionName).getRecordType();
605608
return new CloseableFilterIterator<>(
606609
readSliceAndFilterByKeysIntoList(partitionName, keys, fileSlice, metadataRecord -> {
607-
HoodieMetadataPayload payload = new HoodieMetadataPayload(Option.of(metadataRecord));
610+
HoodieMetadataPayload payload = new HoodieMetadataPayload(Option.of(metadataRecord), recordType);
608611
return new HoodieAvroRecord<>(new HoodieKey(payload.key, partitionName), payload);
609612
}, isFullKey),
610613
r -> !r.getData().isDeleted());

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,9 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
205205
private boolean isDeletedRecord = false;
206206

207207
public HoodieMetadataPayload(@Nullable GenericRecord record, Comparable<?> orderingVal) {
208-
this(Option.ofNullable(record));
208+
// TODO(yihua): can record be null for metadata payload? fix the record type to be derived without deserialization
209+
// TODO(yihua): a new merger class might be the way to go?
210+
this(Option.ofNullable(record), record != null ? (int) record.get(SCHEMA_FIELD_NAME_TYPE) : 0);
209211
}
210212

211213
public HoodieMetadataPayload(Option<GenericRecord> recordOpt, int type) {
@@ -305,7 +307,7 @@ public static HoodieRecord<HoodieMetadataPayload> createPartitionFilesRecord(Str
305307
String partitionIdentifier = getPartitionIdentifierForFilesPartition(partition);
306308
HoodieKey key = new HoodieKey(partitionIdentifier, MetadataPartitionType.FILES.getPartitionPath());
307309
if (isPartitionDeleted) {
308-
return new HoodieAvroRecord<>(key, new HoodieMetadataPayload(Option.empty()));
310+
return new HoodieAvroRecord<>(key, new HoodieMetadataPayload(Option.empty(), MetadataPartitionType.FILES.getRecordType()));
309311
}
310312

311313
int size = filesAdded.size() + filesDeleted.size();
@@ -394,7 +396,7 @@ public static Option<HoodieRecord<HoodieMetadataPayload>> combineSecondaryIndexR
394396

395397
@Override
396398
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRecord, Schema schema, Properties properties) throws IOException {
397-
HoodieMetadataPayload anotherPayload = new HoodieMetadataPayload(Option.of((GenericRecord) oldRecord));
399+
HoodieMetadataPayload anotherPayload = new HoodieMetadataPayload(Option.of((GenericRecord) oldRecord), type);
398400
HoodieRecordPayload combinedPayload = preCombine(anotherPayload);
399401
return combinedPayload.getInsertValue(schema, properties);
400402
}

hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.hudi.exception.HoodieException;
3535
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
3636
import org.apache.hudi.metadata.HoodieMetadataPayload;
37+
import org.apache.hudi.metadata.MetadataPartitionType;
3738
import org.apache.hudi.stats.HoodieColumnRangeMetadata;
3839
import org.apache.hudi.stats.ValueMetadata;
3940
import org.apache.hudi.table.BulkInsertPartitioner;
@@ -304,7 +305,8 @@ public void testSerHoodieMetadataPayload() throws IOException {
304305
byte[] recordToBytes = HoodieAvroUtils.avroToBytes(record);
305306
GenericRecord genericRecord = HoodieAvroUtils.bytesToAvro(recordToBytes, record.getSchema());
306307

307-
HoodieMetadataPayload genericRecordHoodieMetadataPayload = new HoodieMetadataPayload(Option.of(genericRecord));
308+
HoodieMetadataPayload genericRecordHoodieMetadataPayload = new HoodieMetadataPayload(
309+
Option.of(genericRecord), MetadataPartitionType.COLUMN_STATS.getRecordType());
308310
byte[] bytes = SerializationUtils.serialize(genericRecordHoodieMetadataPayload);
309311
HoodieMetadataPayload deserGenericRecordHoodieMetadataPayload = SerializationUtils.deserialize(bytes);
310312

0 commit comments

Comments
 (0)