Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public final class SystemSessionProperties
public static final String HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD = "hyperloglog_standard_error_warning_threshold";
public static final String PREFER_MERGE_JOIN_FOR_SORTED_INPUTS = "prefer_merge_join_for_sorted_inputs";
public static final String PREFER_SORT_MERGE_JOIN = "prefer_sort_merge_join";
public static final String ENABLE_SORTED_EXCHANGES = "enable_sorted_exchanges";
public static final String SEGMENTED_AGGREGATION_ENABLED = "segmented_aggregation_enabled";
public static final String USE_HISTORY_BASED_PLAN_STATISTICS = "use_history_based_plan_statistics";
public static final String TRACK_HISTORY_BASED_PLAN_STATISTICS = "track_history_based_plan_statistics";
Expand Down Expand Up @@ -1399,6 +1400,11 @@ public SystemSessionProperties(
"Prefer sort merge join for all joins. A SortNode is added if input is not already sorted.",
featuresConfig.isPreferSortMergeJoin(),
true),
booleanProperty(
ENABLE_SORTED_EXCHANGES,
"(Experimental) Enable pushing sort operations down to exchange nodes for distributed queries",
featuresConfig.isEnableSortedExchanges(),
false),
booleanProperty(
SEGMENTED_AGGREGATION_ENABLED,
"Enable segmented aggregation.",
Expand Down Expand Up @@ -2932,6 +2938,11 @@ public static boolean preferSortMergeJoin(Session session)
return session.getSystemProperty(PREFER_SORT_MERGE_JOIN, Boolean.class);
}

public static boolean isEnableSortedExchanges(Session session)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about rename:
enable_sorted_exchanges ->sorted_exchanges_enabled
isEnableSortedExchanges -> isSortedExchangesEnabled

{
return session.getSystemProperty(ENABLE_SORTED_EXCHANGES, Boolean.class);
}

public static boolean isSegmentedAggregationEnabled(Session session)
{
return session.getSystemProperty(SEGMENTED_AGGREGATION_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ private static StageInfo pruneStatsFromStageInfo(StageInfo stage)
plan.getPartitioning(),
plan.getTableScanSchedulingOrder(),
plan.getPartitioningScheme(),
plan.getOutputOrderingScheme(),
plan.getStageExecutionDescriptor(),
plan.isOutputTableWriterFragment(),
plan.getStatsAndCosts().map(QueryStateMachine::pruneHistogramsFromStatsAndCosts),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ private Optional<PlanFragment> performRuntimeOptimizations(StreamingSubPlan subP
fragment.getPartitioning(),
scheduleOrder(newRoot),
fragment.getPartitioningScheme(),
fragment.getOutputOrderingScheme(),
fragment.getStageExecutionDescriptor(),
fragment.isOutputTableWriterFragment(),
estimatedStatsAndCosts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public class FeaturesConfig
private boolean streamingForPartialAggregationEnabled;
private boolean preferMergeJoinForSortedInputs;
private boolean preferSortMergeJoin;
private boolean enableSortedExchanges;
private boolean segmentedAggregationEnabled;

private int maxStageCountForEagerScheduling = 25;
Expand Down Expand Up @@ -2290,6 +2291,19 @@ public FeaturesConfig setPreferSortMergeJoin(boolean preferSortMergeJoin)
return this;
}

public boolean isEnableSortedExchanges()
{
return enableSortedExchanges;
}

@Config("optimizer.experimental.enable-sorted-exchanges")
@ConfigDescription("(Experimental) Enable pushing sort operations down to exchange nodes for distributed queries")
public FeaturesConfig setEnableSortedExchanges(boolean enableSortedExchanges)
{
this.enableSortedExchanges = enableSortedExchanges;
return this;
}

public boolean isSegmentedAggregationEnabled()
{
return segmentedAggregationEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.facebook.presto.sql.planner;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.metadata.Metadata;
Expand All @@ -26,6 +27,7 @@
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.MetadataDeleteNode;
import com.facebook.presto.spi.plan.OrderingScheme;
import com.facebook.presto.spi.plan.OutputNode;
import com.facebook.presto.spi.plan.Partitioning;
import com.facebook.presto.spi.plan.PartitioningHandle;
Expand Down Expand Up @@ -93,6 +95,7 @@
public abstract class BasePlanFragmenter
extends SimplePlanRewriter<FragmentProperties>
{
private static final Logger log = Logger.get(BasePlanFragmenter.class);
private final Session session;
private final Metadata metadata;
private final PlanNodeIdAllocator idAllocator;
Expand Down Expand Up @@ -158,6 +161,7 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan
properties.getPartitioningHandle(),
schedulingOrder,
properties.getPartitioningScheme(),
properties.getOutputOrderingScheme(),
StageExecutionDescriptor.ungroupedExecution(),
outputTableWriterFragment,
Optional.of(statsAndCosts.getForSubplan(root)),
Expand Down Expand Up @@ -300,7 +304,21 @@ private PlanNode createRemoteStreamingExchange(ExchangeNode exchange, RewriteCon

ImmutableList.Builder<SubPlan> builder = ImmutableList.builder();
for (int sourceIndex = 0; sourceIndex < exchange.getSources().size(); sourceIndex++) {
FragmentProperties childProperties = new FragmentProperties(translateOutputLayout(partitioningScheme, exchange.getInputs().get(sourceIndex)));
PartitioningScheme childPartitioningScheme = translateOutputLayout(partitioningScheme, exchange.getInputs().get(sourceIndex));
FragmentProperties childProperties = new FragmentProperties(childPartitioningScheme);

// If the exchange has ordering requirements, translate them for the child fragment
Optional<OrderingScheme> childOutputOrderingScheme = Optional.empty();
if (exchange.getOrderingScheme().isPresent()) {
log.info("Found ordering scheme on ExchangeNode %s. Transferring to child", exchange.getId());
childOutputOrderingScheme = exchange.getOrderingScheme();
}
else {
log.info("No ordering scheme on ExchangeNode %s", exchange.getId());
}

// Set the output ordering scheme for the child fragment
childProperties.setOutputOrderingScheme(childOutputOrderingScheme);
builder.add(buildSubPlan(exchange.getSources().get(sourceIndex), childProperties, context));
}

Expand Down Expand Up @@ -435,11 +453,24 @@ public static class FragmentProperties
private Optional<PartitioningHandle> partitioningHandle = Optional.empty();
private final Set<PlanNodeId> partitionedSources = new HashSet<>();

// Output ordering scheme for the fragment - this gets transferred to the PlanFragment
private Optional<OrderingScheme> outputOrderingScheme = Optional.empty();

public FragmentProperties(PartitioningScheme partitioningScheme)
{
this.partitioningScheme = partitioningScheme;
}

public void setOutputOrderingScheme(Optional<OrderingScheme> outputOrderingScheme)
{
this.outputOrderingScheme = requireNonNull(outputOrderingScheme, "outputOrderingScheme is null");
}

public Optional<OrderingScheme> getOutputOrderingScheme()
{
return outputOrderingScheme;
}

public List<SubPlan> getChildren()
{
return children;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.airlift.json.Codec;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.spi.plan.OrderingScheme;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.plan.PartitioningScheme;
import com.facebook.presto.spi.plan.PlanFragmentId;
Expand Down Expand Up @@ -54,6 +55,10 @@ public class PlanFragment
private final PartitioningScheme partitioningScheme;
private final StageExecutionDescriptor stageExecutionDescriptor;

// Describes the ordering of the fragment's output data
// This is separate from partitioningScheme as ordering is orthogonal to partitioning
private final Optional<OrderingScheme> outputOrderingScheme;

// Only true for output table writer and false for temporary table writers
private final boolean outputTableWriterFragment;
private final Optional<StatsAndCosts> statsAndCosts;
Expand All @@ -73,6 +78,7 @@ public PlanFragment(
@JsonProperty("partitioning") PartitioningHandle partitioning,
@JsonProperty("tableScanSchedulingOrder") List<PlanNodeId> tableScanSchedulingOrder,
@JsonProperty("partitioningScheme") PartitioningScheme partitioningScheme,
@JsonProperty("outputOrderingScheme") Optional<OrderingScheme> outputOrderingScheme,
@JsonProperty("stageExecutionDescriptor") StageExecutionDescriptor stageExecutionDescriptor,
@JsonProperty("outputTableWriterFragment") boolean outputTableWriterFragment,
@JsonProperty("statsAndCosts") Optional<StatsAndCosts> statsAndCosts,
Expand All @@ -84,6 +90,7 @@ public PlanFragment(
this.partitioning = requireNonNull(partitioning, "partitioning is null");
this.tableScanSchedulingOrder = ImmutableList.copyOf(requireNonNull(tableScanSchedulingOrder, "tableScanSchedulingOrder is null"));
this.stageExecutionDescriptor = requireNonNull(stageExecutionDescriptor, "stageExecutionDescriptor is null");
this.outputOrderingScheme = requireNonNull(outputOrderingScheme, "outputOrderingScheme is null");
this.outputTableWriterFragment = outputTableWriterFragment;
this.statsAndCosts = requireNonNull(statsAndCosts, "statsAndCosts is null");
this.jsonRepresentation = requireNonNull(jsonRepresentation, "jsonRepresentation is null");
Expand Down Expand Up @@ -156,6 +163,12 @@ public Optional<StatsAndCosts> getStatsAndCosts()
return statsAndCosts;
}

@JsonProperty
public Optional<OrderingScheme> getOutputOrderingScheme()
{
return outputOrderingScheme;
}

@JsonProperty
public Optional<String> getJsonRepresentation()
{
Expand Down Expand Up @@ -187,6 +200,7 @@ private PlanFragment forTaskSerialization()
id, root, variables, partitioning,
tableScanSchedulingOrder,
partitioningScheme,
outputOrderingScheme,
stageExecutionDescriptor,
outputTableWriterFragment,
Optional.empty(),
Expand Down Expand Up @@ -246,6 +260,7 @@ public PlanFragment withBucketToPartition(Optional<int[]> bucketToPartition)
partitioning,
tableScanSchedulingOrder,
partitioningScheme.withBucketToPartition(bucketToPartition),
outputOrderingScheme,
stageExecutionDescriptor,
outputTableWriterFragment,
statsAndCosts,
Expand All @@ -261,6 +276,7 @@ public PlanFragment withFixedLifespanScheduleGroupedExecution(List<PlanNodeId> c
partitioning,
tableScanSchedulingOrder,
partitioningScheme,
outputOrderingScheme,
StageExecutionDescriptor.fixedLifespanScheduleGroupedExecution(capableTableScanNodes, totalLifespans),
outputTableWriterFragment,
statsAndCosts,
Expand All @@ -276,6 +292,7 @@ public PlanFragment withDynamicLifespanScheduleGroupedExecution(List<PlanNodeId>
partitioning,
tableScanSchedulingOrder,
partitioningScheme,
outputOrderingScheme,
StageExecutionDescriptor.dynamicLifespanScheduleGroupedExecution(capableTableScanNodes, totalLifespans),
outputTableWriterFragment,
statsAndCosts,
Expand All @@ -291,6 +308,7 @@ public PlanFragment withRecoverableGroupedExecution(List<PlanNodeId> capableTabl
partitioning,
tableScanSchedulingOrder,
partitioningScheme,
outputOrderingScheme,
StageExecutionDescriptor.recoverableGroupedExecution(capableTableScanNodes, totalLifespans),
outputTableWriterFragment,
statsAndCosts,
Expand All @@ -306,6 +324,7 @@ public PlanFragment withSubPlan(PlanNode subPlan)
partitioning,
tableScanSchedulingOrder,
partitioningScheme,
outputOrderingScheme,
stageExecutionDescriptor,
outputTableWriterFragment,
statsAndCosts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ private static SubPlan reassignPartitioningHandleIfNecessaryHelper(Metadata meta
outputPartitioningScheme.isScaleWriters(),
outputPartitioningScheme.getEncoding(),
outputPartitioningScheme.getBucketToPartition()),
fragment.getOutputOrderingScheme(),
fragment.getStageExecutionDescriptor(),
fragment.isOutputTableWriterFragment(),
fragment.getStatsAndCosts(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@
import com.facebook.presto.sql.planner.optimizations.ShardJoins;
import com.facebook.presto.sql.planner.optimizations.SimplifyPlanWithEmptyInput;
import com.facebook.presto.sql.planner.optimizations.SortMergeJoinOptimizer;
import com.facebook.presto.sql.planner.optimizations.SortedExchangeRule;
import com.facebook.presto.sql.planner.optimizations.StatsRecordingPlanOptimizer;
import com.facebook.presto.sql.planner.optimizations.TransformQuantifiedComparisonApplyToLateralJoin;
import com.facebook.presto.sql.planner.optimizations.UnaliasSymbolReferences;
Expand Down Expand Up @@ -947,8 +948,10 @@ public PlanOptimizers(
// MergeJoinForSortedInputOptimizer can avoid the local exchange for a join operation
// Should be placed after AddExchanges, but before AddLocalExchange
// To replace the JoinNode to MergeJoin ahead of AddLocalExchange to avoid adding extra local exchange
// SortedExchangeRule pushes sorts down to exchange nodes for distributed queries
builder.add(new MergeJoinForSortedInputOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()),
new SortMergeJoinOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()));
new SortMergeJoinOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()),
new SortedExchangeRule(metadata));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we gate this optimizer only for Presto-on-Spark? we can leverage featuresConfig.isPrestoSparkExecutionEnvironment().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new feature flag we have added. Like mentioned above so far we have refrained from adding presto-on-spark specific code into presto main.


// Optimizers above this don't understand local exchanges, so be careful moving this.
builder.add(new AddLocalExchanges(metadata, featuresConfig.isNativeExecutionEnabled()));
Expand Down
Loading
Loading