Skip to content

Conversation

@shrinidhijoshi
Copy link
Collaborator

@shrinidhijoshi shrinidhijoshi commented Oct 22, 2025

Summary

This PR introduces sorted exchange functionality to Presto, enabling efficient sort-merge joins by allowing data to be sorted
during shuffle operations rather than requiring separate sort steps. This optimization eliminates redundant sorting, reduces
memory pressure, and improves query performance for distributed joins and aggregations that require sorted inputs.

Motivation

Currently, when Presto needs to perform a sort-merge join in a distributed query, it must:

  1. Shuffle data across workers (ExchangeNode)
  2. Explicitly sort the shuffled data (SortNode)

This approach is inefficient because sorting happens as a separate operation after data movement. By pushing the sort operation into the exchange itself, we can sort data during the shuffle, eliminating the redundant SortNode and improving overall query performance.

High-Level Changes

  1. Core Infrastructure (3161b24)
  • Add orderingScheme field to PlanFragment class (Java)
  • Add outputOrderingScheme field to C++ PlanFragment protocol
  • Implement JSON serialization/deserialization for C++ integration
  • Update PrestoToVeloxQueryPlan.cpp to consume ordering scheme and convert to sorting keys
  • Update all PlanFragment constructor call sites to support the new field
  1. Planner Support (130b14f)
  • Extend ExchangeNode to support SORTED partition type
  • Update BasePlanFragmenter to populate and propagate orderingScheme between fragments
  • Add PlanFragmenterUtils support for sorted exchanges
  • Enhance PlanPrinter to display sorted exchange information in EXPLAIN output
  1. Optimizer Rule (6951cab)
  • Introduce SortedExchangeRule optimizer that identifies and transforms Sort→Exchange patterns
  • Add enable_sorted_exchanges session property (experimental, default: false)
  • Add optimizer.experimental.enable-sorted-exchanges configuration property
  • Integrate into optimizer pipeline alongside existing join optimizers
  • Only applies to REMOTE REPARTITION exchanges
  • Validates ordering variables are available in exchange output
  1. Spark Integration (960bc93)
  • Update AbstractPrestoSparkQueryExecution to handle sorted exchanges
  • Add MutablePartitionIdOrdering class to track partition ordering in Spark
  • Update PrestoSparkRddFactory to preserve sort order during shuffles
  • Enable Spark-based queries to leverage sorted exchanges

Plan Transformation Example

Before:

  SortNode(orderBy: [a, b])
    └─ ExchangeNode(type: REPARTITION, scope: REMOTE)

After:

  ExchangeNode(type: REPARTITION, scope: REMOTE, orderingScheme: [a, b])

Configuration

The feature is controlled by:

  • Session property: enable_sorted_exchanges (experimental, default: false)
  • Config property: optimizer.experimental.enable-sorted-exchanges

Testing

  • Added TestSortedExchangeRule with test cases covering:
    • Basic sort push-down scenarios
    • Feature flag enable/disable behavior
    • Multi-column sorting with DESC ordering
    • NULLS FIRST/LAST handling
    • Interaction with JOINs, aggregations, and LIMITs
    • Multiple sorts in a single query
  • Updated TestFeaturesConfig to validate configuration properties
  • All existing tests pass with backward compatibility maintained

Performance Benefits

  • Reduced sorting overhead: Eliminates redundant SortNode operations
  • Lower memory usage: Avoids buffering data for explicit sorting

Backward Compatibility

  • Feature is disabled by default (experimental flag)
  • All existing queries continue to work without modification
  • No breaking changes to public APIs
  • Graceful degradation when feature is disabled

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
  * Add experimental support for sorted exchanges to improve sort-merge join performance. When enabled via the
  `enable_sorted_exchanges` session property or `optimizer.experimental.enable-sorted-exchanges` configuration property, the query
  planner will push sort operations into exchange nodes, eliminating redundant sorting steps and reducing memory usage for
  distributed queries with sort-merge joins. This feature is disabled by default.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Oct 22, 2025

Reviewer's Guide

This PR prototype adds support for sorted shuffle in Presto-on-Spark by pushing down sort operators into remote repartition exchanges for sort-merge joins, threading OrderingScheme through planner fragments, extending Spark partitioning to apply key ordering, introducing sorted ExchangeNode factories, enriching plan printing and logging with ordering information, and bolstering test coverage.

Sequence diagram for pushing sort into remote repartition exchange during sort-merge join

sequenceDiagram
participant "SortMergeJoinOptimizer"
participant "PlanNode (left/right)"
participant "ExchangeNode"
participant "SortNode"
"SortMergeJoinOptimizer"->>"PlanNode (left/right)": check if child is ExchangeNode
alt child is ExchangeNode and suitable
    "SortMergeJoinOptimizer"->>"ExchangeNode": pushSortToExchangeIfPossible(orderingScheme)
    "ExchangeNode"-->>"SortMergeJoinOptimizer": sortedPartitionedExchange
else not suitable
    "SortMergeJoinOptimizer"->>"SortNode": create explicit SortNode
end
Loading

Class diagram for updated PlanFragment and related classes

classDiagram
class PlanFragment {
  - Optional<OrderingScheme> outputOrderingScheme
  + getOutputOrderingScheme()
}
class FragmentProperties {
  - Optional<OrderingScheme> outputOrderingScheme
  + setOutputOrderingScheme(Optional<OrderingScheme>)
  + getOutputOrderingScheme()
}
class OrderingScheme {
}
PlanFragment --> OrderingScheme : outputOrderingScheme
FragmentProperties --> OrderingScheme : outputOrderingScheme
PlanFragment <|-- FragmentProperties
Loading

Class diagram for new and updated ExchangeNode methods

classDiagram
class ExchangeNode {
  + static sortedPartitionedExchange(..., OrderingScheme)
  + static sortedSystemPartitionedExchange(..., OrderingScheme)
}
ExchangeNode --> OrderingScheme : uses
Loading

Class diagram for Spark-side partitioning and ordering changes

classDiagram
class AbstractPrestoSparkQueryExecution {
  + static JavaPairRDD partitionBy(..., PartitioningScheme, Optional<OrderingScheme>)
  + static final Ordering PartitionOrdering
}
class MutablePartitionIdOrdering {
}
AbstractPrestoSparkQueryExecution --> MutablePartitionIdOrdering : uses
Loading

File-Level Changes

Change Details Files
Push sort into remote exchanges in the planner
  • Added pushSortToExchangeIfPossible helper
  • Wrapped SortNode creation to attempt sort push-down
  • Introduced logging for unsupported exchange conditions
SortMergeJoinOptimizer.java
Apply ordering in Spark shuffle
  • Extended partitionBy to accept Optional<OrderingScheme)
  • Called ShuffledRDD.setKeyOrdering when ordering is present
  • Implemented MutablePartitionId comparator and updated partitionBy invocations
AbstractPrestoSparkQueryExecution.java
Propagate ordering through plan fragmentation
  • Added outputOrderingScheme to FragmentProperties
  • Passed ordering to child fragments in BasePlanFragmenter
  • Enhanced PlanFragment constructors and JSON handling
  • Updated PlanFragmenterUtils to carry orderingScheme
BasePlanFragmenter.java
PlanFragment.java
PlanFragmenterUtils.java
Support creation of sorted exchanges
  • Added ExchangeNode.sortedPartitionedExchange and sortedSystemPartitionedExchange factories
  • Relaxed remote exchange distribution check to allow ordering
ExchangeNode.java
Surface ordering in plan printing
  • Appended outputOrderingScheme in fragment header
  • Changed visitRemoteSource to label and display ordering
PlanPrinter.java
Enhance execution and client logging for ordering
  • Logged orderingScheme in PrestoSparkHttpTaskClient
  • Injected FunctionAndTypeManager and logged plan text in NativeTaskExecutorFactory
  • Logged spark executor/task CPU info instead of strict validation
PrestoSparkHttpTaskClient.java
PrestoSparkNativeTaskExecutorFactory.java
PrestoSparkSettingsRequirements.java
Remove ordering restriction in RDD factory
  • Removed exception blocking order-sensitive exchanges
PrestoSparkRddFactory.java
Add end-to-end tests for sort-pushdown
  • Refined AbstractTestNativeJoinQueries to use sort-merge join
  • Added TestPrestoSparkNativeSortMergeJoinQueries for Spark cluster run
  • Added TestPrestoSparkSortMergeJoinQueries for local plan checks
AbstractTestNativeJoinQueries.java
TestPrestoSparkNativeSortMergeJoinQueries.java
TestPrestoSparkSortMergeJoinQueries.java

Possibly linked issues

  • #Need a way to schedule splits evenly for a task: The PR's sorted shuffle optimizes distributed processing, directly addressing the root cause of uneven split scheduling and performance skew.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • The custom Scala Ordering implementation (PartitionOrdering) contains stubbed methods and null returns—either fully implement all methods correctly or replace it with a standard ordering utility to avoid runtime errors.
  • There are numerous log.info statements in production code (e.g., in pushSortToExchangeIfPossible and settings verifiers) that will flood logs; please demote non-critical messages to debug level.
  • pushSortToExchangeIfPossible currently only works for single-source REPARTITION exchanges—consider extending support (or explicitly guarding) for multi-source exchanges so we don’t silently fall back to a SortNode in unexpected cases.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The custom Scala Ordering implementation (PartitionOrdering) contains stubbed methods and null returns—either fully implement all methods correctly or replace it with a standard ordering utility to avoid runtime errors.
- There are numerous log.info statements in production code (e.g., in pushSortToExchangeIfPossible and settings verifiers) that will flood logs; please demote non-critical messages to debug level.
- pushSortToExchangeIfPossible currently only works for single-source REPARTITION exchanges—consider extending support (or explicitly guarding) for multi-source exchanges so we don’t silently fall back to a SortNode in unexpected cases.

## Individual Comments

### Comment 1
<location> `presto-spark-base/src/main/java/com/facebook/presto/spark/execution/AbstractPrestoSparkQueryExecution.java:319-328` </location>
<code_context>
+    public static final Ordering<MutablePartitionId> PartitionOrdering = new Ordering<MutablePartitionId>() {
</code_context>

<issue_to_address>
**issue (bug_risk):** PartitionOrdering implementation returns null for 'on' and 'mkOrderingOps', which may cause runtime issues.

Null returns in these methods may trigger NullPointerExceptions or unpredictable behavior. Please implement them correctly or throw UnsupportedOperationException if they should not be used.
</issue_to_address>

### Comment 2
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/plan/ExchangeNode.java:148-149` </location>
<code_context>
         orderingScheme.ifPresent(ordering -> {
             PartitioningHandle partitioningHandle = partitioningScheme.getPartitioning().getHandle();
-            checkArgument(!scope.isRemote() || partitioningHandle.equals(SINGLE_DISTRIBUTION), "remote merging exchange requires single distribution");
+            // This is no longer true for Presto-on-Spark which can support sorting on external shuffle systems
+            //checkArgument(!scope.isRemote() || partitioningHandle.equals(SINGLE_DISTRIBUTION), "remote merging exchange requires single distribution");
             checkArgument(!scope.isLocal() || partitioningHandle.equals(FIXED_PASSTHROUGH_DISTRIBUTION), "local merging exchange requires passthrough distribution");
             checkArgument(partitioningScheme.getOutputLayout().containsAll(ordering.getOrderByVariables()), "Partitioning scheme does not supply all required ordering symbols");
</code_context>

<issue_to_address>
**suggestion:** Commented-out checkArgument may lead to confusion about remote merging exchange requirements.

If the change is intentional for Presto-on-Spark, please either remove the commented-out check or document when it should be used to prevent confusion for future maintainers.

```suggestion
            // For Presto-on-Spark, remote merging exchanges can support sorting on external shuffle systems,
            // so the single distribution requirement is not enforced here. If running on classic Presto,
            // ensure that remote merging exchanges use SINGLE_DISTRIBUTION.
```
</issue_to_address>

### Comment 3
<location> `presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSettingsRequirements.java:70-75` </location>
<code_context>
         verify(executorCoresString != null, "%s must be set", SPARK_EXECUTOR_CORES_PROPERTY);
         int taskCpus = parseInt(taskCpusString);
         int executorCores = parseInt(executorCoresString);
-        verify(
-                taskCpus == executorCores,
-                "%s (%s) must be equal to %s (%s)",
-                SPARK_TASK_CPUS_PROPERTY,
-                taskCpus,
-                SPARK_EXECUTOR_CORES_PROPERTY,
-                executorCores);
+        log.info("Task cpus: %s, executor cores: %s, tasksPerExecutor: %s", taskCpus, executorCores, executorCores / taskCpus);
</code_context>

<issue_to_address>
**question (bug_risk):** Replaced executor cores and task cpus equality check with logging, which may allow misconfiguration.

This change removes enforcement of configuration consistency, which could lead to resource issues. If this is intentional, please clarify the reasoning and document any risks.
</issue_to_address>

### Comment 4
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/SortMergeJoinOptimizer.java:216` </location>
<code_context>
+        {
+            // Check if this is a suitable exchange node for sort push-down
+            if (!(plan instanceof ExchangeNode)) {
+                log.info("Unable to push sorting to exchange because child is not exchange node");
+                return Optional.empty();
+            }
+
</code_context>

<issue_to_address>
**suggestion:** Logging at info level for expected plan structure may be too verbose.

Consider changing the log level to debug unless this message indicates an unexpected plan structure.

```suggestion
                log.debug("Unable to push sorting to exchange because child is not exchange node");
```
</issue_to_address>

### Comment 5
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/SortMergeJoinOptimizer.java:258-217` </location>
<code_context>
+                planChanged = true;
+                return Optional.of(sortedExchange);
+            }
+            catch (Exception e) {
+                log.warn("Failed to create sorted exchange: " + e.getMessage());
+                // If creating sorted exchange fails, fall back to explicit sort
+                return Optional.empty();
+            }
+        }
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Catching generic Exception may mask underlying issues.

Instead of catching Exception, catch only the specific exceptions that ExchangeNode.sortedPartitionedExchange may throw, or log and rethrow unexpected ones to avoid hiding errors.

Suggested implementation:

```java
            }
            catch (IllegalArgumentException | IllegalStateException e) {
                log.warn("Failed to create sorted exchange: " + e.getMessage());
                // If creating sorted exchange fails, fall back to explicit sort
                return Optional.empty();
            }
            catch (RuntimeException e) {
                log.error("Unexpected error while creating sorted exchange", e);
                throw e;
            }

```

- You may need to adjust the specific exception types (`IllegalArgumentException`, `IllegalStateException`, `RuntimeException`) to match those actually thrown by `ExchangeNode.sortedPartitionedExchange`.
- If there are custom exceptions defined in your codebase for this method, include them in the specific catch block.
- Ensure that your logging framework supports `log.error` for the rethrown case.
</issue_to_address>

### Comment 6
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java:439` </location>
<code_context>
                     Joiner.on(", ").join(partitioningScheme.getPartitioning().getArguments()),
                     formatHash(partitioningScheme.getHashColumn())));
         }
+        builder.append(indentString(1)).append(format("Output ordering: %s%n", fragment.getOutputOrderingScheme()));
         builder.append(indentString(1)).append(format("Output encoding: %s%n", fragment.getPartitioningScheme().getEncoding()));
         builder.append(indentString(1)).append(format("Stage Execution Strategy: %s%n", fragment.getStageExecutionDescriptor().getStageExecutionStrategy()));
</code_context>

<issue_to_address>
**nitpick:** Output ordering scheme is printed as a raw Optional, which may not be user-friendly.

Format the output to show the ordering scheme value or 'none' if absent, rather than printing the Optional wrapper.
</issue_to_address>

### Comment 7
<location> `presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/MutablePartitionIdOrdering.java:3` </location>
<code_context>
+package com.facebook.presto.spark.classloader_interface;
+
+public class MutablePartitionIdOrdering {
+}
</code_context>

<issue_to_address>
**suggestion:** MutablePartitionIdOrdering class is empty and may be unnecessary.

If this class is intended for future use, add a TODO comment; otherwise, consider removing it to keep the codebase clean.
</issue_to_address>

### Comment 8
<location> `presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeSortMergeJoinQueries.java:166-175` </location>
<code_context>
+    public void testSortMergeJoinWithSortPushDownExecution()
</code_context>

<issue_to_address>
**suggestion (testing):** Test only checks ordering by one column; consider multi-column ordering.

The test should validate sorting by both 'custkey' and 'totalprice' to match the query's order clause.
</issue_to_address>

### Comment 9
<location> `presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeSortMergeJoinQueries.java:207-216` </location>
<code_context>
+    public void testSortMergeJoinVsHashJoinPerformanceComparison()
</code_context>

<issue_to_address>
**suggestion (testing):** Performance comparison test does not account for variability or warm-up.

Run each query several times and use average or median timings to reduce the impact of JVM warm-up and environmental variability.

Suggested implementation:

```java
    public void testSortMergeJoinVsHashJoinPerformanceComparison()
    {
        // Compare execution between sort merge join (with sort push-down) and hash join
        final int NUM_RUNS = 5;
        List<Long> sortMergeJoinTimings = new ArrayList<>();
        List<Long> hashJoinTimings = new ArrayList<>();

        Session sortMergeJoinSession = Session.builder(getQueryRunner().getDefaultSession())
                .setSystemProperty(PREFER_SORT_MERGE_JOIN, "true")
                .setSystemProperty("task_concurrency", "4")
                .build();

        Session hashJoinSession = Session.builder(getQueryRunner().getDefaultSession())
                .setSystemProperty(PREFER_SORT_MERGE_JOIN, "false")
                .setSystemProperty("task_concurrency", "4")

```

```java
        Session hashJoinSession = Session.builder(getQueryRunner().getDefaultSession())
                .setSystemProperty(PREFER_SORT_MERGE_JOIN, "false")
                .setSystemProperty("task_concurrency", "4")
                .build();

        String query = "SELECT c.custkey, o.orderkey FROM customer c JOIN orders o ON c.custkey = o.custkey";

        // Warm-up phase
        getQueryRunner().execute(sortMergeJoinSession, query);
        getQueryRunner().execute(hashJoinSession, query);

        // Timed runs for sort merge join
        for (int i = 0; i < NUM_RUNS; i++) {
            long start = System.nanoTime();
            getQueryRunner().execute(sortMergeJoinSession, query);
            long end = System.nanoTime();
            sortMergeJoinTimings.add(end - start);
        }

        // Timed runs for hash join
        for (int i = 0; i < NUM_RUNS; i++) {
            long start = System.nanoTime();
            getQueryRunner().execute(hashJoinSession, query);
            long end = System.nanoTime();
            hashJoinTimings.add(end - start);
        }

        // Compute average timings
        double sortMergeJoinAvgMs = sortMergeJoinTimings.stream().mapToLong(Long::longValue).average().orElse(0.0) / 1_000_000.0;
        double hashJoinAvgMs = hashJoinTimings.stream().mapToLong(Long::longValue).average().orElse(0.0) / 1_000_000.0;

        // Optionally, compute median timings
        double sortMergeJoinMedianMs = getMedianMs(sortMergeJoinTimings);
        double hashJoinMedianMs = getMedianMs(hashJoinTimings);

        System.out.printf("Sort Merge Join: avg=%.2f ms, median=%.2f ms%n", sortMergeJoinAvgMs, sortMergeJoinMedianMs);
        System.out.printf("Hash Join: avg=%.2f ms, median=%.2f ms%n", hashJoinAvgMs, hashJoinMedianMs);
    }

    private static double getMedianMs(List<Long> timings) {
        List<Long> sorted = new ArrayList<>(timings);
        Collections.sort(sorted);
        int n = sorted.size();
        if (n == 0) {
            return 0.0;
        }
        if (n % 2 == 1) {
            return sorted.get(n / 2) / 1_000_000.0;
        } else {
            return (sorted.get(n / 2 - 1) + sorted.get(n / 2)) / 2_000_000.0;
        }
    }

```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 319 to 328
public static final Ordering<MutablePartitionId> PartitionOrdering = new Ordering<MutablePartitionId>() {
@Override
public Some<Object> tryCompare(MutablePartitionId mutablePartitionId, MutablePartitionId t1) {
return new scala.Some(new Object());
}

@Override
public int compare(MutablePartitionId p1, MutablePartitionId p2) {
return Integer.compare(p1.getPartition(), p2.getPartition());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): PartitionOrdering implementation returns null for 'on' and 'mkOrderingOps', which may cause runtime issues.

Null returns in these methods may trigger NullPointerExceptions or unpredictable behavior. Please implement them correctly or throw UnsupportedOperationException if they should not be used.

Comment on lines 70 to 75
verify(
taskCpus == executorCores,
"%s (%s) must be equal to %s (%s)",
SPARK_TASK_CPUS_PROPERTY,
taskCpus,
SPARK_EXECUTOR_CORES_PROPERTY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (bug_risk): Replaced executor cores and task cpus equality check with logging, which may allow misconfiguration.

This change removes enforcement of configuration consistency, which could lead to resource issues. If this is intentional, please clarify the reasoning and document any risks.

// Check if this is a suitable exchange node for sort push-down
if (!(plan instanceof ExchangeNode)) {
log.info("Unable to push sorting to exchange because child is not exchange node");
return Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Catching generic Exception may mask underlying issues.

Instead of catching Exception, catch only the specific exceptions that ExchangeNode.sortedPartitionedExchange may throw, or log and rethrow unexpected ones to avoid hiding errors.

Suggested implementation:

            }
            catch (IllegalArgumentException | IllegalStateException e) {
                log.warn("Failed to create sorted exchange: " + e.getMessage());
                // If creating sorted exchange fails, fall back to explicit sort
                return Optional.empty();
            }
            catch (RuntimeException e) {
                log.error("Unexpected error while creating sorted exchange", e);
                throw e;
            }
  • You may need to adjust the specific exception types (IllegalArgumentException, IllegalStateException, RuntimeException) to match those actually thrown by ExchangeNode.sortedPartitionedExchange.
  • If there are custom exceptions defined in your codebase for this method, include them in the specific catch block.
  • Ensure that your logging framework supports log.error for the rethrown case.

Joiner.on(", ").join(partitioningScheme.getPartitioning().getArguments()),
formatHash(partitioningScheme.getHashColumn())));
}
builder.append(indentString(1)).append(format("Output ordering: %s%n", fragment.getOutputOrderingScheme()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Output ordering scheme is printed as a raw Optional, which may not be user-friendly.

Format the output to show the ordering scheme value or 'none' if absent, rather than printing the Optional wrapper.

@shrinidhijoshi shrinidhijoshi force-pushed the sorted-shuffle branch 11 times, most recently from 31b5838 to bdef4b3 Compare October 23, 2025 19:44
This commit adds the orderingScheme field to the PlanFragment class in
both Java and C++ to support sorted data exchanges in distributed query
execution. This field allows plan fragments to declare and track sort
ordering requirements for their output data.

Key changes:
- Add orderingScheme field to PlanFragment.java with getter method
- Add outputOrderingScheme field to C++ PlanFragment protocol struct
- Add JSON serialization/deserialization for outputOrderingScheme in C++
- Update protocol special includes for PlanFragment
- Integrate orderingScheme consumption in PrestoToVeloxQueryPlan.cpp
- Convert outputOrderingScheme to sorting keys/orders for PartitionAndSerializeNode
- Update all PlanFragment constructor call sites to pass outputOrderingScheme
- Remove backward compatibility constructor for cleaner API

All existing call sites have been updated to pass Optional.empty() for
the outputOrderingScheme parameter, maintaining backward compatibility
while using a single unified constructor interface.

The orderingScheme field contains the sort order specification that will
be used to maintain data order across stage boundaries during shuffle
operations. This is a prerequisite for implementing efficient sort-merge
joins that can leverage pre-sorted partitions.
This commit implements the planner infrastructure to support sorted
exchanges by extending the exchange node types. These changes enable the
query planner to create sorted exchanges and propagate ordering
information through plan fragments.

Key changes:
- Extend ExchangeNode to support SORTED partition type
- Update BasePlanFragmenter to populate and propagate orderingScheme
- Add PlanFragmenterUtils support for sorted exchanges
- Enhance PlanPrinter to display sorted exchange information

The sorted exchange infrastructure allows stages to maintain sort order
during data shuffle operations, eliminating redundant sorting and
enabling more efficient distributed query execution, particularly for
sort-merge joins.
@shrinidhijoshi shrinidhijoshi force-pushed the sorted-shuffle branch 2 times, most recently from 3916697 to 6bea08f Compare October 27, 2025 16:46
… tests

This commit introduces a new optimizer rule that pushes sort operations
down to exchange nodes where possible. This optimization is beneficial
for distributed queries where data needs to be sorted after shuffling,
as it allows sorting to happen during the shuffle operation itself
rather than requiring an explicit SortNode afterward.

Key changes:
- Create new SortedExchangeRule optimizer
- Add enable_sorted_exchanges feature flag (session property, experimental, default: false)
- Add optimizer.experimental.enable-sorted-exchanges config property in FeaturesConfig
- Integrate SortedExchangeRule into optimizer pipeline alongside
  MergeJoinForSortedInputOptimizer and SortMergeJoinOptimizer
- Look for SortNode → ExchangeNode patterns
- Merge them into single sorted exchange nodes when beneficial
- Only applies to REMOTE REPARTITION exchanges
- Validates ordering variables are available in exchange output

The rule transforms plans like:
  SortNode(orderBy: [a, b])
    └─ ExchangeNode(type: REPARTITION, scope: REMOTE)

Into:
  ExchangeNode(type: REPARTITION, scope: REMOTE, orderingScheme: [a, b])

Test coverage:
- Added TestSortedExchangeRule with 10 test cases covering:
  * Basic sort push-down to remote repartition exchanges
  * Feature flag enable/disable behavior
  * Multi-column sorting
  * DESC ordering
  * NULLS FIRST/LAST handling
  * Interaction with JOINs, aggregations, and LIMITs
  * Multiple sorts in a single query

Tests use programmatic plan inspection with PlanNodeSearcher to verify
that sorted exchanges are created with the correct ordering schemes.

By leveraging sorted exchanges, queries can avoid redundant sorting
operations and improve overall query performance, particularly for
distributed joins and aggregations that require sorted inputs.
This commit integrates the sorted shuffle functionality into the
Presto-on-Spark execution engine, enabling Spark-based query execution
to leverage sorted data exchanges for improved sort-merge join
performance.

Key changes:
- Update AbstractPrestoSparkQueryExecution to handle sorted exchanges
- Add MutablePartitionIdOrdering to track partition ordering in Spark
- Update PrestoSparkRddFactory to preserve sort order during shuffles
- Fix checkstyle violations (unused imports, brace formatting)

This provides the core infrastructure for efficient sort-merge joins
in Presto-on-Spark deployments.
@shrinidhijoshi shrinidhijoshi changed the title [DNR] proof-of-concept sorted shuffle in presto-on-spark [DNR] feat(sorted-exchange): Add sorted exchange support in presto-on-spark Oct 28, 2025
@shrinidhijoshi shrinidhijoshi changed the title [DNR] feat(sorted-exchange): Add sorted exchange support in presto-on-spark [DNR] feat(pos-sorted-exchange): Add sorted exchange support in presto-on-spark Oct 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

from:Meta PR from Meta

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants