perf: Add HoodieROPathFilter support during file listing to prevent driver OOM#18136
perf: Add HoodieROPathFilter support during file listing to prevent driver OOM#18136suryaprasanna wants to merge 3 commits intoapache:masterfrom
Conversation
95fda80 to
2dabf5d
Compare
|
@nsivabalan seems like the checks are not running on the PR. Can you please chek? |
…on the driver Reviewers: O955 Project Hoodie Project Reviewer: Add blocking reviewers, pwason, jingli, meenalb, singh.sumit Reviewed By: O955 Project Hoodie Project Reviewer: Add blocking reviewers, pwason Tags: #has_java JIRA Issues: HUDI-6646 Differential Revision: https://code.uberinternal.com/D17441111 Fix build failures Fix checkstyle Refactor code Create unit tests
2dabf5d to
9b8395e
Compare
|
can we consider both table types, and all query types and ensure we wire in the config only wherever applicable. |
| @@ -143,6 +146,7 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable { | |||
| * @param configProperties unifying configuration (in the form of generic properties) | |||
| * @param queryType target query type | |||
| * @param queryPaths target DFS paths being queried | |||
There was a problem hiding this comment.
Add MOR condition.
| .fromProperties(configProperties) | ||
| .enable(configProperties.getBoolean(ENABLE.key(), DEFAULT_METADATA_ENABLE_FOR_READERS) | ||
| && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient)) | ||
| && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient) |
There was a problem hiding this comment.
hey @yihua : lets chat about this as well tomorrow.
|
|
||
| if (useROPathFilterForListing && !shouldIncludePendingCommits) { | ||
| // Group files by partition path, then by file group ID | ||
| Map<String, PartitionPath> partitionsMap = new HashMap<>(); |
There was a problem hiding this comment.
can we move this to a private method.
generatePartitionFileSlicesPostROTablePathFilter
| * By passing metaClient and completedTimeline, we can sync the view seen from this class against HoodieFileIndex class | ||
| */ | ||
| public HoodieROTablePathFilter(Configuration conf, | ||
| public HoodieROTablePathFilter(StorageConfiguration conf, |
There was a problem hiding this comment.
hey @yihua : can you review the changes in this patch
| " them (if possible).") | ||
|
|
||
| val FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER: ConfigProperty[Boolean] = | ||
| ConfigProperty.key("hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter") |
There was a problem hiding this comment.
hoodie.datasource.read.file.index.optimize.listing.using.path.filter
| properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride) | ||
| } | ||
|
|
||
| var hoodieROTablePathFilterBasedFileListingEnabled = getConfigValue(options, sqlConf, |
There was a problem hiding this comment.
once we fix the config key, lets fix these vars as well
| val result = spark.sql(s"select id, name, price, ts from $tableName order by id").collect() | ||
| // Should have deleted records where id % 3 = 0 (3, 6, 9) | ||
| // Should have doubled price for even ids (2, 4, 8, 10) | ||
| assert(result.length == 7) // 10 - 3 deleted = 7 |
There was a problem hiding this comment.
do you think below assertion would work.
we can rename one of the earlier versions of a file slice so that HoodieBaseFile parsing will fail.
so, if RO table path filter works as intended, listing files from a given partition should not fail, since we won't even try to parse the file.
but if RO table path filter did not work, it would fail.
yihua
left a comment
There was a problem hiding this comment.
Nice work on adding an opt-in path filter to avoid driver OOM during file listing — the feature is well-motivated and the config is cleanly gated behind a default-off flag. The main concerns are correctness issues in the new loadFileSlicesForPartitions fast path: the partition path passed to FileSlice appears to be absolute rather than relative, and the partition map lookup can NPE if the key doesn't match. It's also worth clarifying MOR table compatibility and fixing the shared Hadoop config mutation in getPartitionPathFilter before merging.
| Map<String, PartitionPath> partitionsMap = new HashMap<>(); | ||
| partitions.forEach(p -> partitionsMap.put(p.path, p)); | ||
| Map<PartitionPath, List<FileSlice>> partitionToFileSlices = new HashMap<>(); | ||
|
|
There was a problem hiding this comment.
The partitionPathStr here is the absolute path (pathInfo.getPath().getParent().toString()), but FileSlice expects a relative partition path. The existing code path via HoodieTableFileSystemView always uses relative paths. This would cause mismatches downstream wherever FileSlice.getPartitionPath() is used. Should this be relPartitionPath instead?
| // Create FileSlice obj from StoragePathInfo. | ||
| String partitionPathStr = pathInfo.getPath().getParent().toString(); | ||
| String relPartitionPath = FSUtils.getRelativePartitionPath(basePath, pathInfo.getPath().getParent()); | ||
| HoodieBaseFile baseFile = new HoodieBaseFile(pathInfo); |
There was a problem hiding this comment.
If relPartitionPath doesn't exactly match a key in partitionsMap, partitionPathObj will be null and the computeIfAbsent call below will throw NPE. This could happen with path normalization differences (trailing slashes, scheme differences). Could you add a null check or use getRelativePartitionPath consistently with how PartitionPath.path was originally set?
| List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions, activeTimeline); | ||
| log.info("On {} with query instant as {}, it took {}ms to list all files {} Hudi partitions", | ||
| metaClient.getTableConfig().getTableName(), queryInstant.map(instant -> instant).orElse("N/A"), | ||
| timer.endTimer(), partitions.size()); |
There was a problem hiding this comment.
Have you considered what happens with MOR tables here? The HoodieROTablePathFilter only returns base files (it calls fsView.getLatestBaseFiles()), so this path constructs FileSlices without log files. The !shouldIncludePendingCommits guard doesn't prevent MOR tables from reaching this code. It might be worth adding a table-type check (COW only) or documenting this limitation.
| @@ -146,6 +147,12 @@ public List<StoragePathInfo> getAllFilesInPartition(StoragePath partitionPath) t | |||
| } | |||
|
|
|||
| @Override | |||
There was a problem hiding this comment.
It looks like the @Override annotation that belonged to getAllFilesInPartitions(Collection<String>) has been absorbed by the new method insertion. In the diff, the @Override on line 148 now applies to the new two-arg overload, while the original single-arg method (which is the actual interface abstract method) loses its @Override. Could you add @Override back to the original method?
| val conf = HadoopFSUtils.getStorageConf(spark.sparkContext.hadoopConfiguration) | ||
| if (specifiedQueryInstant.isDefined) { | ||
| conf.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), specifiedQueryInstant.get) | ||
| } |
There was a problem hiding this comment.
HadoopFSUtils.getStorageConf(spark.sparkContext.hadoopConfiguration) wraps the shared Hadoop config without copying it. The subsequent conf.set(TIMESTAMP_AS_OF, ...) would mutate the global Spark session config, which could affect other queries in the same session. Could you use getStorageConfWithCopy instead?
| hoodieROTablePathFilterBasedFileListingEnabled = getConfigValue(options, sqlConf, | ||
| "spark." + DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key, null) | ||
| if (hoodieROTablePathFilterBasedFileListingEnabled != null) { | ||
| properties.setProperty(DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key, |
There was a problem hiding this comment.
nit: the comment says "For 0.14 rollout" — looks like this was copied from the HMS listing config block. This is a new 1.2.0 config, so the comment is misleading.
| return getAllFilesInPartitions(partitions); | ||
| } | ||
|
|
||
| public Map<String, List<StoragePathInfo>> getAllFilesInPartitions(Collection<String> partitions) |
There was a problem hiding this comment.
nit: Let this call getAllFilesInPartitions(partitions, Option.empty()) to be easier to read?
| Map<String, List<StoragePathInfo>> getAllFilesInPartitions(Collection<String> partitionPaths) | ||
| throws IOException; | ||
|
|
||
| default Map<String, List<StoragePathInfo>> getAllFilesInPartitions(Collection<String> partitionPaths, | ||
| Option<StoragePathFilter> pathFilterOption) | ||
| throws IOException { | ||
| return getAllFilesInPartitions(partitionPaths); | ||
| } |
There was a problem hiding this comment.
Make Map<String, List<StoragePathInfo>> getAllFilesInPartitions(Collection<String> partitionPaths) to have default implementation of getAllFilesInPartitions(partitionPaths, Option.empty()) so subclasses can avoid the repeating code? Then getAllFilesInPartitions(Collection<String> partitionPaths, Option<StoragePathFilter> pathFilterOption) becomes an abstract method.
| HoodieTimer timer = HoodieTimer.start(); | ||
| List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions, activeTimeline); | ||
| log.info("On {} with query instant as {}, it took {}ms to list all files {} Hudi partitions", | ||
| metaClient.getTableConfig().getTableName(), queryInstant.map(instant -> instant).orElse("N/A"), |
There was a problem hiding this comment.
nit: queryInstant.map(instant -> instant).orElse("N/A") — the .map(instant -> instant) is a no-op. You can simplify to queryInstant.orElse("N/A").
|
|
||
| public HoodieROTablePathFilter() { | ||
| this(new Configuration()); | ||
| this(HadoopFSUtils.getStorageConf()); |
There was a problem hiding this comment.
HoodieROTablePathFilter and BaseFileOnlyRelation should no longer be used based on the latest master; instead, HoodieCopyOnWriteSnapshotHadoopFsRelationFactory is used.
There was a problem hiding this comment.
+1
lets TAL at all implementations extending from HoodieBaseHadoopFsRelationFactory and we write them in
yihua
left a comment
There was a problem hiding this comment.
A better and general approach would be adding a file system view based on the latest snapshot only to limit the size of file slices in memory, which is used by the file index. That should solve the problem with better layering.
| return getAllDataFilesInPartition(storage, partitionPath, Option.empty()); | ||
| } | ||
|
|
||
| public static List<StoragePathInfo> getAllDataFilesInPartition(HoodieStorage storage, |
There was a problem hiding this comment.
we might need to change the naming, now that its not all files.
nsivabalan
left a comment
There was a problem hiding this comment.
lets add tests for time travel query as well
| // Update data in first partition | ||
| spark.sql(s"update $tableName set price = 15.0 where id = 1") | ||
|
|
||
| // Query single partition with ROPathFilter |
There was a problem hiding this comment.
Add time travel test case. And use a different name for getAllFilesInPartitions
| // Add the FileSlice to partitionToFileSlices | ||
| PartitionPath partitionPathObj = partitionsMap.get(relPartitionPath); | ||
| List<FileSlice> fileSlices = partitionToFileSlices.computeIfAbsent(partitionPathObj, k -> new ArrayList<>()); | ||
| fileSlices.add(fileSlice); |
There was a problem hiding this comment.
can we avoid this special handling.
lets route all the files into FSV.
so that we maintain one flow for all cases.
just that the the input files could have already been filtered (if path filter is applied), or could be referring to all files(if no path filter).
much simpler from maintainability standpoint.
|
|
||
| public HoodieROTablePathFilter() { | ||
| this(new Configuration()); | ||
| this(HadoopFSUtils.getStorageConf()); |
There was a problem hiding this comment.
+1
lets TAL at all implementations extending from HoodieBaseHadoopFsRelationFactory and we write them in
Hey @yihua : based on latest state of the patch, I feel it nicely sits w/n HoodieTableMetadata and so, we can leverage this w/ any of FSV. |
Describe the issue this Pull Request addresses
This PR addresses OOM (Out of Memory) issues on the Spark driver when querying large Hudi datasets that have multiple versions of files in the same partition. When file listing is performed without filtering, all file versions are loaded into memory, which can cause the driver to run out of memory.
Summary and Changelog
Users can now enable path filtering during file listing to avoid loading multiple file versions into memory on the driver. This is controlled by the new config
hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter.Changes:
FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER(default: false) to enable HoodieROPathFilter during file listingHoodieTableMetadata,BaseTableMetadata, andFileSystemBackedTableMetadatato support optionalStoragePathFilterparameterFSUtils.getAllDataFilesInPartitionoverload that accepts path filter optionHoodieROTableStoragePathFilterwrapper to adapt Hadoop PathFilter to Hudi's StoragePathFilter interfaceBaseHoodieTableFileIndexto use path filter when enabled, constructing FileSlices directly from filtered filesSparkHoodieTableFileIndexto wrap and apply HoodieROTablePathFilter during partition file listingImpact
Config Changes:
hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter(default: false)Performance:
Risk Level
Low - The feature is behind a config flag (default: false) and does not change existing behavior unless explicitly enabled.
Verification:
Documentation Update
Config documentation is included in the
withDocumentationmethod of the new config property.Contributor's checklist