-
Couldn't load subscription status.
- Fork 5.5k
[DNR] feat(pos-sorted-exchange): Add sorted exchange support in presto-on-spark #26403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis PR prototype adds support for sorted shuffle in Presto-on-Spark by pushing down sort operators into remote repartition exchanges for sort-merge joins, threading OrderingScheme through planner fragments, extending Spark partitioning to apply key ordering, introducing sorted ExchangeNode factories, enriching plan printing and logging with ordering information, and bolstering test coverage. Sequence diagram for pushing sort into remote repartition exchange during sort-merge joinsequenceDiagram
participant "SortMergeJoinOptimizer"
participant "PlanNode (left/right)"
participant "ExchangeNode"
participant "SortNode"
"SortMergeJoinOptimizer"->>"PlanNode (left/right)": check if child is ExchangeNode
alt child is ExchangeNode and suitable
"SortMergeJoinOptimizer"->>"ExchangeNode": pushSortToExchangeIfPossible(orderingScheme)
"ExchangeNode"-->>"SortMergeJoinOptimizer": sortedPartitionedExchange
else not suitable
"SortMergeJoinOptimizer"->>"SortNode": create explicit SortNode
end
Class diagram for updated PlanFragment and related classesclassDiagram
class PlanFragment {
- Optional<OrderingScheme> outputOrderingScheme
+ getOutputOrderingScheme()
}
class FragmentProperties {
- Optional<OrderingScheme> outputOrderingScheme
+ setOutputOrderingScheme(Optional<OrderingScheme>)
+ getOutputOrderingScheme()
}
class OrderingScheme {
}
PlanFragment --> OrderingScheme : outputOrderingScheme
FragmentProperties --> OrderingScheme : outputOrderingScheme
PlanFragment <|-- FragmentProperties
Class diagram for new and updated ExchangeNode methodsclassDiagram
class ExchangeNode {
+ static sortedPartitionedExchange(..., OrderingScheme)
+ static sortedSystemPartitionedExchange(..., OrderingScheme)
}
ExchangeNode --> OrderingScheme : uses
Class diagram for Spark-side partitioning and ordering changesclassDiagram
class AbstractPrestoSparkQueryExecution {
+ static JavaPairRDD partitionBy(..., PartitioningScheme, Optional<OrderingScheme>)
+ static final Ordering PartitionOrdering
}
class MutablePartitionIdOrdering {
}
AbstractPrestoSparkQueryExecution --> MutablePartitionIdOrdering : uses
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- The custom Scala Ordering implementation (PartitionOrdering) contains stubbed methods and null returns—either fully implement all methods correctly or replace it with a standard ordering utility to avoid runtime errors.
- There are numerous log.info statements in production code (e.g., in pushSortToExchangeIfPossible and settings verifiers) that will flood logs; please demote non-critical messages to debug level.
- pushSortToExchangeIfPossible currently only works for single-source REPARTITION exchanges—consider extending support (or explicitly guarding) for multi-source exchanges so we don’t silently fall back to a SortNode in unexpected cases.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The custom Scala Ordering implementation (PartitionOrdering) contains stubbed methods and null returns—either fully implement all methods correctly or replace it with a standard ordering utility to avoid runtime errors.
- There are numerous log.info statements in production code (e.g., in pushSortToExchangeIfPossible and settings verifiers) that will flood logs; please demote non-critical messages to debug level.
- pushSortToExchangeIfPossible currently only works for single-source REPARTITION exchanges—consider extending support (or explicitly guarding) for multi-source exchanges so we don’t silently fall back to a SortNode in unexpected cases.
## Individual Comments
### Comment 1
<location> `presto-spark-base/src/main/java/com/facebook/presto/spark/execution/AbstractPrestoSparkQueryExecution.java:319-328` </location>
<code_context>
+ public static final Ordering<MutablePartitionId> PartitionOrdering = new Ordering<MutablePartitionId>() {
</code_context>
<issue_to_address>
**issue (bug_risk):** PartitionOrdering implementation returns null for 'on' and 'mkOrderingOps', which may cause runtime issues.
Null returns in these methods may trigger NullPointerExceptions or unpredictable behavior. Please implement them correctly or throw UnsupportedOperationException if they should not be used.
</issue_to_address>
### Comment 2
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/plan/ExchangeNode.java:148-149` </location>
<code_context>
orderingScheme.ifPresent(ordering -> {
PartitioningHandle partitioningHandle = partitioningScheme.getPartitioning().getHandle();
- checkArgument(!scope.isRemote() || partitioningHandle.equals(SINGLE_DISTRIBUTION), "remote merging exchange requires single distribution");
+ // This is no longer true for Presto-on-Spark which can support sorting on external shuffle systems
+ //checkArgument(!scope.isRemote() || partitioningHandle.equals(SINGLE_DISTRIBUTION), "remote merging exchange requires single distribution");
checkArgument(!scope.isLocal() || partitioningHandle.equals(FIXED_PASSTHROUGH_DISTRIBUTION), "local merging exchange requires passthrough distribution");
checkArgument(partitioningScheme.getOutputLayout().containsAll(ordering.getOrderByVariables()), "Partitioning scheme does not supply all required ordering symbols");
</code_context>
<issue_to_address>
**suggestion:** Commented-out checkArgument may lead to confusion about remote merging exchange requirements.
If the change is intentional for Presto-on-Spark, please either remove the commented-out check or document when it should be used to prevent confusion for future maintainers.
```suggestion
// For Presto-on-Spark, remote merging exchanges can support sorting on external shuffle systems,
// so the single distribution requirement is not enforced here. If running on classic Presto,
// ensure that remote merging exchanges use SINGLE_DISTRIBUTION.
```
</issue_to_address>
### Comment 3
<location> `presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSettingsRequirements.java:70-75` </location>
<code_context>
verify(executorCoresString != null, "%s must be set", SPARK_EXECUTOR_CORES_PROPERTY);
int taskCpus = parseInt(taskCpusString);
int executorCores = parseInt(executorCoresString);
- verify(
- taskCpus == executorCores,
- "%s (%s) must be equal to %s (%s)",
- SPARK_TASK_CPUS_PROPERTY,
- taskCpus,
- SPARK_EXECUTOR_CORES_PROPERTY,
- executorCores);
+ log.info("Task cpus: %s, executor cores: %s, tasksPerExecutor: %s", taskCpus, executorCores, executorCores / taskCpus);
</code_context>
<issue_to_address>
**question (bug_risk):** Replaced executor cores and task cpus equality check with logging, which may allow misconfiguration.
This change removes enforcement of configuration consistency, which could lead to resource issues. If this is intentional, please clarify the reasoning and document any risks.
</issue_to_address>
### Comment 4
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/SortMergeJoinOptimizer.java:216` </location>
<code_context>
+ {
+ // Check if this is a suitable exchange node for sort push-down
+ if (!(plan instanceof ExchangeNode)) {
+ log.info("Unable to push sorting to exchange because child is not exchange node");
+ return Optional.empty();
+ }
+
</code_context>
<issue_to_address>
**suggestion:** Logging at info level for expected plan structure may be too verbose.
Consider changing the log level to debug unless this message indicates an unexpected plan structure.
```suggestion
log.debug("Unable to push sorting to exchange because child is not exchange node");
```
</issue_to_address>
### Comment 5
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/SortMergeJoinOptimizer.java:258-217` </location>
<code_context>
+ planChanged = true;
+ return Optional.of(sortedExchange);
+ }
+ catch (Exception e) {
+ log.warn("Failed to create sorted exchange: " + e.getMessage());
+ // If creating sorted exchange fails, fall back to explicit sort
+ return Optional.empty();
+ }
+ }
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Catching generic Exception may mask underlying issues.
Instead of catching Exception, catch only the specific exceptions that ExchangeNode.sortedPartitionedExchange may throw, or log and rethrow unexpected ones to avoid hiding errors.
Suggested implementation:
```java
}
catch (IllegalArgumentException | IllegalStateException e) {
log.warn("Failed to create sorted exchange: " + e.getMessage());
// If creating sorted exchange fails, fall back to explicit sort
return Optional.empty();
}
catch (RuntimeException e) {
log.error("Unexpected error while creating sorted exchange", e);
throw e;
}
```
- You may need to adjust the specific exception types (`IllegalArgumentException`, `IllegalStateException`, `RuntimeException`) to match those actually thrown by `ExchangeNode.sortedPartitionedExchange`.
- If there are custom exceptions defined in your codebase for this method, include them in the specific catch block.
- Ensure that your logging framework supports `log.error` for the rethrown case.
</issue_to_address>
### Comment 6
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java:439` </location>
<code_context>
Joiner.on(", ").join(partitioningScheme.getPartitioning().getArguments()),
formatHash(partitioningScheme.getHashColumn())));
}
+ builder.append(indentString(1)).append(format("Output ordering: %s%n", fragment.getOutputOrderingScheme()));
builder.append(indentString(1)).append(format("Output encoding: %s%n", fragment.getPartitioningScheme().getEncoding()));
builder.append(indentString(1)).append(format("Stage Execution Strategy: %s%n", fragment.getStageExecutionDescriptor().getStageExecutionStrategy()));
</code_context>
<issue_to_address>
**nitpick:** Output ordering scheme is printed as a raw Optional, which may not be user-friendly.
Format the output to show the ordering scheme value or 'none' if absent, rather than printing the Optional wrapper.
</issue_to_address>
### Comment 7
<location> `presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/MutablePartitionIdOrdering.java:3` </location>
<code_context>
+package com.facebook.presto.spark.classloader_interface;
+
+public class MutablePartitionIdOrdering {
+}
</code_context>
<issue_to_address>
**suggestion:** MutablePartitionIdOrdering class is empty and may be unnecessary.
If this class is intended for future use, add a TODO comment; otherwise, consider removing it to keep the codebase clean.
</issue_to_address>
### Comment 8
<location> `presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeSortMergeJoinQueries.java:166-175` </location>
<code_context>
+ public void testSortMergeJoinWithSortPushDownExecution()
</code_context>
<issue_to_address>
**suggestion (testing):** Test only checks ordering by one column; consider multi-column ordering.
The test should validate sorting by both 'custkey' and 'totalprice' to match the query's order clause.
</issue_to_address>
### Comment 9
<location> `presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeSortMergeJoinQueries.java:207-216` </location>
<code_context>
+ public void testSortMergeJoinVsHashJoinPerformanceComparison()
</code_context>
<issue_to_address>
**suggestion (testing):** Performance comparison test does not account for variability or warm-up.
Run each query several times and use average or median timings to reduce the impact of JVM warm-up and environmental variability.
Suggested implementation:
```java
public void testSortMergeJoinVsHashJoinPerformanceComparison()
{
// Compare execution between sort merge join (with sort push-down) and hash join
final int NUM_RUNS = 5;
List<Long> sortMergeJoinTimings = new ArrayList<>();
List<Long> hashJoinTimings = new ArrayList<>();
Session sortMergeJoinSession = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(PREFER_SORT_MERGE_JOIN, "true")
.setSystemProperty("task_concurrency", "4")
.build();
Session hashJoinSession = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(PREFER_SORT_MERGE_JOIN, "false")
.setSystemProperty("task_concurrency", "4")
```
```java
Session hashJoinSession = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(PREFER_SORT_MERGE_JOIN, "false")
.setSystemProperty("task_concurrency", "4")
.build();
String query = "SELECT c.custkey, o.orderkey FROM customer c JOIN orders o ON c.custkey = o.custkey";
// Warm-up phase
getQueryRunner().execute(sortMergeJoinSession, query);
getQueryRunner().execute(hashJoinSession, query);
// Timed runs for sort merge join
for (int i = 0; i < NUM_RUNS; i++) {
long start = System.nanoTime();
getQueryRunner().execute(sortMergeJoinSession, query);
long end = System.nanoTime();
sortMergeJoinTimings.add(end - start);
}
// Timed runs for hash join
for (int i = 0; i < NUM_RUNS; i++) {
long start = System.nanoTime();
getQueryRunner().execute(hashJoinSession, query);
long end = System.nanoTime();
hashJoinTimings.add(end - start);
}
// Compute average timings
double sortMergeJoinAvgMs = sortMergeJoinTimings.stream().mapToLong(Long::longValue).average().orElse(0.0) / 1_000_000.0;
double hashJoinAvgMs = hashJoinTimings.stream().mapToLong(Long::longValue).average().orElse(0.0) / 1_000_000.0;
// Optionally, compute median timings
double sortMergeJoinMedianMs = getMedianMs(sortMergeJoinTimings);
double hashJoinMedianMs = getMedianMs(hashJoinTimings);
System.out.printf("Sort Merge Join: avg=%.2f ms, median=%.2f ms%n", sortMergeJoinAvgMs, sortMergeJoinMedianMs);
System.out.printf("Hash Join: avg=%.2f ms, median=%.2f ms%n", hashJoinAvgMs, hashJoinMedianMs);
}
private static double getMedianMs(List<Long> timings) {
List<Long> sorted = new ArrayList<>(timings);
Collections.sort(sorted);
int n = sorted.size();
if (n == 0) {
return 0.0;
}
if (n % 2 == 1) {
return sorted.get(n / 2) / 1_000_000.0;
} else {
return (sorted.get(n / 2 - 1) + sorted.get(n / 2)) / 2_000_000.0;
}
}
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| public static final Ordering<MutablePartitionId> PartitionOrdering = new Ordering<MutablePartitionId>() { | ||
| @Override | ||
| public Some<Object> tryCompare(MutablePartitionId mutablePartitionId, MutablePartitionId t1) { | ||
| return new scala.Some(new Object()); | ||
| } | ||
|
|
||
| @Override | ||
| public int compare(MutablePartitionId p1, MutablePartitionId p2) { | ||
| return Integer.compare(p1.getPartition(), p2.getPartition()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): PartitionOrdering implementation returns null for 'on' and 'mkOrderingOps', which may cause runtime issues.
Null returns in these methods may trigger NullPointerExceptions or unpredictable behavior. Please implement them correctly or throw UnsupportedOperationException if they should not be used.
| verify( | ||
| taskCpus == executorCores, | ||
| "%s (%s) must be equal to %s (%s)", | ||
| SPARK_TASK_CPUS_PROPERTY, | ||
| taskCpus, | ||
| SPARK_EXECUTOR_CORES_PROPERTY, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (bug_risk): Replaced executor cores and task cpus equality check with logging, which may allow misconfiguration.
This change removes enforcement of configuration consistency, which could lead to resource issues. If this is intentional, please clarify the reasoning and document any risks.
...base/src/main/java/com/facebook/presto/sql/planner/optimizations/SortMergeJoinOptimizer.java
Outdated
Show resolved
Hide resolved
| // Check if this is a suitable exchange node for sort push-down | ||
| if (!(plan instanceof ExchangeNode)) { | ||
| log.info("Unable to push sorting to exchange because child is not exchange node"); | ||
| return Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Catching generic Exception may mask underlying issues.
Instead of catching Exception, catch only the specific exceptions that ExchangeNode.sortedPartitionedExchange may throw, or log and rethrow unexpected ones to avoid hiding errors.
Suggested implementation:
}
catch (IllegalArgumentException | IllegalStateException e) {
log.warn("Failed to create sorted exchange: " + e.getMessage());
// If creating sorted exchange fails, fall back to explicit sort
return Optional.empty();
}
catch (RuntimeException e) {
log.error("Unexpected error while creating sorted exchange", e);
throw e;
}- You may need to adjust the specific exception types (
IllegalArgumentException,IllegalStateException,RuntimeException) to match those actually thrown byExchangeNode.sortedPartitionedExchange. - If there are custom exceptions defined in your codebase for this method, include them in the specific catch block.
- Ensure that your logging framework supports
log.errorfor the rethrown case.
| Joiner.on(", ").join(partitioningScheme.getPartitioning().getArguments()), | ||
| formatHash(partitioningScheme.getHashColumn()))); | ||
| } | ||
| builder.append(indentString(1)).append(format("Output ordering: %s%n", fragment.getOutputOrderingScheme())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Output ordering scheme is printed as a raw Optional, which may not be user-friendly.
Format the output to show the ordering scheme value or 'none' if absent, rather than printing the Optional wrapper.
...rc/main/java/com/facebook/presto/spark/classloader_interface/MutablePartitionIdOrdering.java
Outdated
Show resolved
Hide resolved
31b5838 to
bdef4b3
Compare
This commit adds the orderingScheme field to the PlanFragment class in both Java and C++ to support sorted data exchanges in distributed query execution. This field allows plan fragments to declare and track sort ordering requirements for their output data. Key changes: - Add orderingScheme field to PlanFragment.java with getter method - Add outputOrderingScheme field to C++ PlanFragment protocol struct - Add JSON serialization/deserialization for outputOrderingScheme in C++ - Update protocol special includes for PlanFragment - Integrate orderingScheme consumption in PrestoToVeloxQueryPlan.cpp - Convert outputOrderingScheme to sorting keys/orders for PartitionAndSerializeNode - Update all PlanFragment constructor call sites to pass outputOrderingScheme - Remove backward compatibility constructor for cleaner API All existing call sites have been updated to pass Optional.empty() for the outputOrderingScheme parameter, maintaining backward compatibility while using a single unified constructor interface. The orderingScheme field contains the sort order specification that will be used to maintain data order across stage boundaries during shuffle operations. This is a prerequisite for implementing efficient sort-merge joins that can leverage pre-sorted partitions.
This commit implements the planner infrastructure to support sorted exchanges by extending the exchange node types. These changes enable the query planner to create sorted exchanges and propagate ordering information through plan fragments. Key changes: - Extend ExchangeNode to support SORTED partition type - Update BasePlanFragmenter to populate and propagate orderingScheme - Add PlanFragmenterUtils support for sorted exchanges - Enhance PlanPrinter to display sorted exchange information The sorted exchange infrastructure allows stages to maintain sort order during data shuffle operations, eliminating redundant sorting and enabling more efficient distributed query execution, particularly for sort-merge joins.
3916697 to
6bea08f
Compare
… tests
This commit introduces a new optimizer rule that pushes sort operations
down to exchange nodes where possible. This optimization is beneficial
for distributed queries where data needs to be sorted after shuffling,
as it allows sorting to happen during the shuffle operation itself
rather than requiring an explicit SortNode afterward.
Key changes:
- Create new SortedExchangeRule optimizer
- Add enable_sorted_exchanges feature flag (session property, experimental, default: false)
- Add optimizer.experimental.enable-sorted-exchanges config property in FeaturesConfig
- Integrate SortedExchangeRule into optimizer pipeline alongside
MergeJoinForSortedInputOptimizer and SortMergeJoinOptimizer
- Look for SortNode → ExchangeNode patterns
- Merge them into single sorted exchange nodes when beneficial
- Only applies to REMOTE REPARTITION exchanges
- Validates ordering variables are available in exchange output
The rule transforms plans like:
SortNode(orderBy: [a, b])
└─ ExchangeNode(type: REPARTITION, scope: REMOTE)
Into:
ExchangeNode(type: REPARTITION, scope: REMOTE, orderingScheme: [a, b])
Test coverage:
- Added TestSortedExchangeRule with 10 test cases covering:
* Basic sort push-down to remote repartition exchanges
* Feature flag enable/disable behavior
* Multi-column sorting
* DESC ordering
* NULLS FIRST/LAST handling
* Interaction with JOINs, aggregations, and LIMITs
* Multiple sorts in a single query
Tests use programmatic plan inspection with PlanNodeSearcher to verify
that sorted exchanges are created with the correct ordering schemes.
By leveraging sorted exchanges, queries can avoid redundant sorting
operations and improve overall query performance, particularly for
distributed joins and aggregations that require sorted inputs.
This commit integrates the sorted shuffle functionality into the Presto-on-Spark execution engine, enabling Spark-based query execution to leverage sorted data exchanges for improved sort-merge join performance. Key changes: - Update AbstractPrestoSparkQueryExecution to handle sorted exchanges - Add MutablePartitionIdOrdering to track partition ordering in Spark - Update PrestoSparkRddFactory to preserve sort order during shuffles - Fix checkstyle violations (unused imports, brace formatting) This provides the core infrastructure for efficient sort-merge joins in Presto-on-Spark deployments.
6bea08f to
960bc93
Compare
Summary
This PR introduces sorted exchange functionality to Presto, enabling efficient sort-merge joins by allowing data to be sorted
during shuffle operations rather than requiring separate sort steps. This optimization eliminates redundant sorting, reduces
memory pressure, and improves query performance for distributed joins and aggregations that require sorted inputs.
Motivation
Currently, when Presto needs to perform a sort-merge join in a distributed query, it must:
This approach is inefficient because sorting happens as a separate operation after data movement. By pushing the sort operation into the exchange itself, we can sort data during the shuffle, eliminating the redundant SortNode and improving overall query performance.
High-Level Changes
orderingSchemefield toPlanFragmentclass (Java)outputOrderingSchemefield to C++ PlanFragment protocolPrestoToVeloxQueryPlan.cppto consume ordering scheme and convert to sorting keysPlanFragmentconstructor call sites to support the new fieldExchangeNodeto support SORTED partition typeBasePlanFragmenterto populate and propagate orderingScheme between fragmentsPlanFragmenterUtilssupport for sorted exchangesPlanPrinterto display sorted exchange information in EXPLAIN outputenable_sorted_exchangessession property (experimental, default: false)optimizer.experimental.enable-sorted-exchangesconfiguration propertyAbstractPrestoSparkQueryExecutionto handle sorted exchangesMutablePartitionIdOrderingclass to track partition ordering in SparkPrestoSparkRddFactoryto preserve sort order during shufflesPlan Transformation Example
Before:
After:
Configuration
The feature is controlled by:
Testing
Performance Benefits
Backward Compatibility
Release Notes
Please follow release notes guidelines and fill in the release notes below.