Skip to content

Comments

perf: Add HoodieROPathFilter support during file listing to prevent driver OOM#18136

Open
suryaprasanna wants to merge 3 commits intoapache:masterfrom
suryaprasanna:path-filter-during-listing
Open

perf: Add HoodieROPathFilter support during file listing to prevent driver OOM#18136
suryaprasanna wants to merge 3 commits intoapache:masterfrom
suryaprasanna:path-filter-during-listing

Conversation

@suryaprasanna
Copy link
Contributor

Describe the issue this Pull Request addresses

This PR addresses OOM (Out of Memory) issues on the Spark driver when querying large Hudi datasets that have multiple versions of files in the same partition. When file listing is performed without filtering, all file versions are loaded into memory, which can cause the driver to run out of memory.

Summary and Changelog

Users can now enable path filtering during file listing to avoid loading multiple file versions into memory on the driver. This is controlled by the new config hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter.

Changes:

  • Added new config FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER (default: false) to enable HoodieROPathFilter during file listing
  • Extended HoodieTableMetadata, BaseTableMetadata, and FileSystemBackedTableMetadata to support optional StoragePathFilter parameter
  • Added FSUtils.getAllDataFilesInPartition overload that accepts path filter option
  • Created HoodieROTableStoragePathFilter wrapper to adapt Hadoop PathFilter to Hudi's StoragePathFilter interface
  • Updated BaseHoodieTableFileIndex to use path filter when enabled, constructing FileSlices directly from filtered files
  • Modified SparkHoodieTableFileIndex to wrap and apply HoodieROTablePathFilter during partition file listing

Impact

Config Changes:

  • New config: hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter (default: false)
    • When enabled, applies HoodieROTablePathFilter during file listing to filter out older file versions
    • Helps prevent OOM issues on driver for large tables with multiple file versions

Performance:

  • Reduces memory pressure on Spark driver for datasets with multiple file versions per partition
  • Enables successful queries on very large tables that previously failed with OOM errors

Risk Level

Low - The feature is behind a config flag (default: false) and does not change existing behavior unless explicitly enabled.

Verification:

  • Existing unit tests pass
  • Tested on large production datasets with OOM issues - queries now succeed with filter enabled

Documentation Update

Config documentation is included in the withDocumentation method of the new config property.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Feb 8, 2026
@suryaprasanna suryaprasanna force-pushed the path-filter-during-listing branch 2 times, most recently from 95fda80 to 2dabf5d Compare February 9, 2026 16:16
@suryaprasanna
Copy link
Contributor Author

@nsivabalan seems like the checks are not running on the PR. Can you please chek?

…on the driver

Reviewers: O955 Project Hoodie Project Reviewer: Add blocking reviewers, pwason, jingli, meenalb, singh.sumit

Reviewed By: O955 Project Hoodie Project Reviewer: Add blocking reviewers, pwason

Tags: #has_java

JIRA Issues: HUDI-6646

Differential Revision: https://code.uberinternal.com/D17441111

Fix build failures

Fix checkstyle

Refactor code

Create unit tests
@suryaprasanna suryaprasanna force-pushed the path-filter-during-listing branch from 2dabf5d to 9b8395e Compare February 9, 2026 18:23
@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Feb 9, 2026
@nsivabalan
Copy link
Contributor

can we consider both table types, and all query types and ensure we wire in the config only wherever applicable.

@apache apache deleted a comment from hudi-bot Feb 10, 2026
@@ -143,6 +146,7 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
* @param configProperties unifying configuration (in the form of generic properties)
* @param queryType target query type
* @param queryPaths target DFS paths being queried
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add MOR condition.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

.fromProperties(configProperties)
.enable(configProperties.getBoolean(ENABLE.key(), DEFAULT_METADATA_ENABLE_FOR_READERS)
&& HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient))
&& HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @yihua : lets chat about this as well tomorrow.


if (useROPathFilterForListing && !shouldIncludePendingCommits) {
// Group files by partition path, then by file group ID
Map<String, PartitionPath> partitionsMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this to a private method.
generatePartitionFileSlicesPostROTablePathFilter

* By passing metaClient and completedTimeline, we can sync the view seen from this class against HoodieFileIndex class
*/
public HoodieROTablePathFilter(Configuration conf,
public HoodieROTablePathFilter(StorageConfiguration conf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @yihua : can you review the changes in this patch

" them (if possible).")

val FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER: ConfigProperty[Boolean] =
ConfigProperty.key("hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hoodie.datasource.read.file.index.optimize.listing.using.path.filter

properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride)
}

var hoodieROTablePathFilterBasedFileListingEnabled = getConfigValue(options, sqlConf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once we fix the config key, lets fix these vars as well

val result = spark.sql(s"select id, name, price, ts from $tableName order by id").collect()
// Should have deleted records where id % 3 = 0 (3, 6, 9)
// Should have doubled price for even ids (2, 4, 8, 10)
assert(result.length == 7) // 10 - 3 deleted = 7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think below assertion would work.

we can rename one of the earlier versions of a file slice so that HoodieBaseFile parsing will fail.
so, if RO table path filter works as intended, listing files from a given partition should not fail, since we won't even try to parse the file.

but if RO table path filter did not work, it would fail.

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work on adding an opt-in path filter to avoid driver OOM during file listing — the feature is well-motivated and the config is cleanly gated behind a default-off flag. The main concerns are correctness issues in the new loadFileSlicesForPartitions fast path: the partition path passed to FileSlice appears to be absolute rather than relative, and the partition map lookup can NPE if the key doesn't match. It's also worth clarifying MOR table compatibility and fixing the shared Hadoop config mutation in getPartitionPathFilter before merging.

Map<String, PartitionPath> partitionsMap = new HashMap<>();
partitions.forEach(p -> partitionsMap.put(p.path, p));
Map<PartitionPath, List<FileSlice>> partitionToFileSlices = new HashMap<>();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partitionPathStr here is the absolute path (pathInfo.getPath().getParent().toString()), but FileSlice expects a relative partition path. The existing code path via HoodieTableFileSystemView always uses relative paths. This would cause mismatches downstream wherever FileSlice.getPartitionPath() is used. Should this be relPartitionPath instead?

// Create FileSlice obj from StoragePathInfo.
String partitionPathStr = pathInfo.getPath().getParent().toString();
String relPartitionPath = FSUtils.getRelativePartitionPath(basePath, pathInfo.getPath().getParent());
HoodieBaseFile baseFile = new HoodieBaseFile(pathInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If relPartitionPath doesn't exactly match a key in partitionsMap, partitionPathObj will be null and the computeIfAbsent call below will throw NPE. This could happen with path normalization differences (trailing slashes, scheme differences). Could you add a null check or use getRelativePartitionPath consistently with how PartitionPath.path was originally set?

List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions, activeTimeline);
log.info("On {} with query instant as {}, it took {}ms to list all files {} Hudi partitions",
metaClient.getTableConfig().getTableName(), queryInstant.map(instant -> instant).orElse("N/A"),
timer.endTimer(), partitions.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered what happens with MOR tables here? The HoodieROTablePathFilter only returns base files (it calls fsView.getLatestBaseFiles()), so this path constructs FileSlices without log files. The !shouldIncludePendingCommits guard doesn't prevent MOR tables from reaching this code. It might be worth adding a table-type check (COW only) or documenting this limitation.

@@ -146,6 +147,12 @@ public List<StoragePathInfo> getAllFilesInPartition(StoragePath partitionPath) t
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the @Override annotation that belonged to getAllFilesInPartitions(Collection<String>) has been absorbed by the new method insertion. In the diff, the @Override on line 148 now applies to the new two-arg overload, while the original single-arg method (which is the actual interface abstract method) loses its @Override. Could you add @Override back to the original method?

val conf = HadoopFSUtils.getStorageConf(spark.sparkContext.hadoopConfiguration)
if (specifiedQueryInstant.isDefined) {
conf.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), specifiedQueryInstant.get)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HadoopFSUtils.getStorageConf(spark.sparkContext.hadoopConfiguration) wraps the shared Hadoop config without copying it. The subsequent conf.set(TIMESTAMP_AS_OF, ...) would mutate the global Spark session config, which could affect other queries in the same session. Could you use getStorageConfWithCopy instead?

hoodieROTablePathFilterBasedFileListingEnabled = getConfigValue(options, sqlConf,
"spark." + DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key, null)
if (hoodieROTablePathFilterBasedFileListingEnabled != null) {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the comment says "For 0.14 rollout" — looks like this was copied from the HMS listing config block. This is a new 1.2.0 config, so the comment is misleading.

return getAllFilesInPartitions(partitions);
}

public Map<String, List<StoragePathInfo>> getAllFilesInPartitions(Collection<String> partitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let this call getAllFilesInPartitions(partitions, Option.empty()) to be easier to read?

Comment on lines 157 to +164
Map<String, List<StoragePathInfo>> getAllFilesInPartitions(Collection<String> partitionPaths)
throws IOException;

default Map<String, List<StoragePathInfo>> getAllFilesInPartitions(Collection<String> partitionPaths,
Option<StoragePathFilter> pathFilterOption)
throws IOException {
return getAllFilesInPartitions(partitionPaths);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make Map<String, List<StoragePathInfo>> getAllFilesInPartitions(Collection<String> partitionPaths) to have default implementation of getAllFilesInPartitions(partitionPaths, Option.empty()) so subclasses can avoid the repeating code? Then getAllFilesInPartitions(Collection<String> partitionPaths, Option<StoragePathFilter> pathFilterOption) becomes an abstract method.

HoodieTimer timer = HoodieTimer.start();
List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions, activeTimeline);
log.info("On {} with query instant as {}, it took {}ms to list all files {} Hudi partitions",
metaClient.getTableConfig().getTableName(), queryInstant.map(instant -> instant).orElse("N/A"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: queryInstant.map(instant -> instant).orElse("N/A") — the .map(instant -> instant) is a no-op. You can simplify to queryInstant.orElse("N/A").


public HoodieROTablePathFilter() {
this(new Configuration());
this(HadoopFSUtils.getStorageConf());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieROTablePathFilter and BaseFileOnlyRelation should no longer be used based on the latest master; instead, HoodieCopyOnWriteSnapshotHadoopFsRelationFactory is used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

lets TAL at all implementations extending from HoodieBaseHadoopFsRelationFactory and we write them in

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better and general approach would be adding a file system view based on the latest snapshot only to limit the size of file slices in memory, which is used by the file index. That should solve the problem with better layering.

return getAllDataFilesInPartition(storage, partitionPath, Option.empty());
}

public static List<StoragePathInfo> getAllDataFilesInPartition(HoodieStorage storage,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might need to change the naming, now that its not all files.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add tests for time travel query as well

// Update data in first partition
spark.sql(s"update $tableName set price = 15.0 where id = 1")

// Query single partition with ROPathFilter
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add time travel test case. And use a different name for getAllFilesInPartitions

// Add the FileSlice to partitionToFileSlices
PartitionPath partitionPathObj = partitionsMap.get(relPartitionPath);
List<FileSlice> fileSlices = partitionToFileSlices.computeIfAbsent(partitionPathObj, k -> new ArrayList<>());
fileSlices.add(fileSlice);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid this special handling.
lets route all the files into FSV.
so that we maintain one flow for all cases.

just that the the input files could have already been filtered (if path filter is applied), or could be referring to all files(if no path filter).
much simpler from maintainability standpoint.


public HoodieROTablePathFilter() {
this(new Configuration());
this(HadoopFSUtils.getStorageConf());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

lets TAL at all implementations extending from HoodieBaseHadoopFsRelationFactory and we write them in

@nsivabalan
Copy link
Contributor

A better and general approach would be adding a file system view based on the latest snapshot only to limit the size of file slices in memory, which is used by the file index. That should solve the problem with better layering.

Hey @yihua : based on latest state of the patch, I feel it nicely sits w/n HoodieTableMetadata and so, we can leverage this w/ any of FSV.
Just that I see some special handling of FileIndex after filtering which we can avoid (shared feedback above).
otherwise, current layering seems ok to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants