From 759aab4ef3cc714a169abe1e17a3c83c0217d7d7 Mon Sep 17 00:00:00 2001 From: xinyual Date: Fri, 26 Sep 2025 13:57:12 +0800 Subject: [PATCH 01/22] add demos Signed-off-by: xinyual --- .../org/opensearch/sql/analysis/Analyzer.java | 6 ++ .../sql/ast/AbstractNodeVisitor.java | 5 ++ .../opensearch/sql/ast/tree/AppendPipe.java | 48 ++++++++++++++ .../sql/calcite/CalciteRelNodeVisitor.java | 62 +++++++++++++++++++ ppl/src/main/antlr/OpenSearchPPLLexer.g4 | 1 + ppl/src/main/antlr/OpenSearchPPLParser.g4 | 10 +++ .../opensearch/sql/ppl/parser/AstBuilder.java | 22 +++++++ 7 files changed, 154 insertions(+) create mode 100644 core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 3efdc3fd514..53f8e4376b1 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -60,6 +60,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.Dedupe; @@ -803,6 +804,11 @@ public LogicalPlan visitAppendCol(AppendCol node, AnalysisContext context) { throw getOnlyForCalciteException("Appendcol"); } + @Override + public LogicalPlan visitAppendPipe(AppendPipe node, AnalysisContext context) { + throw getOnlyForCalciteException("AppendPipe"); + } + @Override public LogicalPlan visitAppend(Append node, AnalysisContext context) { throw getOnlyForCalciteException("Append"); diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 84f7bdbd4a6..d2022d41a5e 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -48,6 +48,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.Dedupe; @@ -136,6 +137,10 @@ public T visitSearch(Search node, C context) { return visitChildren(node, context); } + public T visitAppendPipe(AppendPipe node, C context) { + return visitChildren(node, context); + } + public T visitFilter(Filter node, C context) { return visitChildren(node, context); } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java b/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java new file mode 100644 index 00000000000..adf78108748 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import javax.annotation.Nullable; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.UnresolvedExpression; + +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +public class AppendPipe extends UnresolvedPlan { + + private UnresolvedPlan subQuery; + + private UnresolvedPlan child; + + public AppendPipe(UnresolvedPlan subQuery) { + this.subQuery = subQuery; + } + + @Override + public AppendPipe attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + + @Override + public List getChild() { + return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitAppendPipe(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index cf2fe0c9b2b..4511b7e98ba 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -46,7 +47,9 @@ import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalValues; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexCorrelVariable; import org.apache.calcite.rex.RexInputRef; @@ -93,6 +96,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.Dedupe; @@ -209,6 +213,21 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) { return context.relBuilder.peek(); } + @Override + public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context){ + visitChildren(node, context); + final RelNode base = context.relBuilder.peek(); + node.getSubQuery().accept(this, context); + RelNode sub = context.relBuilder.peek(); + + sub = alignToBaseSchemaOrThrow(base, sub, context); + + context.relBuilder.push(base); + context.relBuilder.union(true); + + return context.relBuilder.peek(); + } + @Override public RelNode visitRegex(Regex node, CalcitePlanContext context) { visitChildren(node, context); @@ -2575,4 +2594,47 @@ private RexNode createOptimizedTransliteration( throw new RuntimeException("Failed to optimize sed expression: " + sedExpression, e); } } + + private RelNode alignToBaseSchemaOrThrow(RelNode base, RelNode sub, CalcitePlanContext context) { + final List baseFields = base.getRowType().getFieldList(); + final Map subFieldByName = sub.getRowType().getFieldList().stream() + .collect(Collectors.toMap(RelDataTypeField::getName, f -> f, (a, b) -> a, LinkedHashMap::new)); + + final RexBuilder rex = context.rexBuilder; + final RelBuilder b = context.relBuilder; + + // 把 builder 栈顶替换成 sub,便于用 field(name) 构造投影 + // (此时栈顶本来就是 sub;如果你在其它地方用过 build() 清栈,则先 push(sub)) + // 这里稳妥起见,确保一下: + if (b.peek() != sub) { + b.push(sub); + } + + List projects = new ArrayList<>(baseFields.size()); + List names = new ArrayList<>(baseFields.size()); + + for (RelDataTypeField bf : baseFields) { + final String name = bf.getName(); + final RelDataType targetType = bf.getType(); + RexNode expr; + RelDataTypeField sf = subFieldByName.get(name); + if (sf != null) { + // 有同名列:取之并必要时 CAST 到 base 的类型 + expr = b.field(name); + if (!expr.getType().equals(targetType)) { + expr = rex.makeCast(targetType, expr, /* matchNullability */ true); + } + } else { + // 缺列:补 NULL 并 CAST 到目标类型 + RexNode nullLit = rex.makeNullLiteral(targetType); + expr = nullLit; + } + projects.add(expr); + names.add(name); + } + + b.project(projects, names); + return b.peek(); + } + } diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 48b051e456b..77572eb8b22 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -59,6 +59,7 @@ OFFSET_FIELD: 'OFFSET_FIELD'; BUFFER_LIMIT: 'BUFFER_LIMIT'; LABEL: 'LABEL'; AGGREGATION: 'AGGREGATION'; +APPENDPIPE: 'APPENDPIPE'; //Native JOIN KEYWORDS JOIN: 'JOIN'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 0bc7b784338..9023dbb164d 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -19,6 +19,10 @@ pplStatement | queryStatement ; +subPipeline + : commands (PIPE commands)* + ; + queryStatement : pplCommands (PIPE commands)* ; @@ -78,6 +82,7 @@ commands | regexCommand | timechartCommand | rexCommand + | appendPipeCommand ; commandName @@ -115,6 +120,7 @@ commandName | REGEX | APPEND | REX + | APPENDPIPE ; searchCommand @@ -204,6 +210,10 @@ statsCommand : STATS statsArgs statsAggTerm (COMMA statsAggTerm)* (statsByClause)? (dedupSplitArg)? ; +appendPipeCommand + : APPENDPIPE LT_SQR_PRTHS subPipeline RT_SQR_PRTHS + ; + statsArgs : (partitionsArg | allnumArg | delimArg | bucketNullableArg)* ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 774eb73dff6..7f519f46b11 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -70,6 +70,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.CountBin; import org.opensearch.sql.ast.tree.Dedupe; import org.opensearch.sql.ast.tree.DefaultBin; @@ -156,6 +157,19 @@ public UnresolvedPlan visitQueryStatement(OpenSearchPPLParser.QueryStatementCont .reduce(pplCommand, (r, e) -> e.attach(e instanceof Join ? projectExceptMeta(r) : r)); } + @Override + public UnresolvedPlan visitSubPipeline(OpenSearchPPLParser.SubPipelineContext ctx) { + List cmds = ctx.commands(); + if (cmds.isEmpty()) { + throw new IllegalArgumentException("appendpipe [] is empty"); + } + UnresolvedPlan seed = visit(cmds.getFirst()); + return cmds.stream() + .skip(1) + .map(this::visit) + .reduce(seed, (left, op) -> op.attach(left)); + } + @Override public UnresolvedPlan visitSubSearch(OpenSearchPPLParser.SubSearchContext ctx) { UnresolvedPlan searchCommand = visit(ctx.searchCommand()); @@ -164,6 +178,8 @@ public UnresolvedPlan visitSubSearch(OpenSearchPPLParser.SubSearchContext ctx) { ctx.commands().stream().map(this::visit).reduce(searchCommand, (r, e) -> e.attach(r))); } + + /** Search command. */ @Override public UnresolvedPlan visitSearchFrom(SearchFromContext ctx) { @@ -227,6 +243,12 @@ public UnresolvedPlan visitWhereCommand(WhereCommandContext ctx) { return new Filter(internalVisitExpression(ctx.logicalExpression())); } + @Override + public UnresolvedPlan visitAppendPipeCommand(OpenSearchPPLParser.AppendPipeCommandContext ctx) { + UnresolvedPlan plan = visit(ctx.subPipeline()); + return new AppendPipe(plan); + } + @Override public UnresolvedPlan visitJoinCommand(OpenSearchPPLParser.JoinCommandContext ctx) { // a sql-like syntax if join criteria existed From 00a1563919ec39f1072535d0de0e6f0446d9f774 Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 29 Sep 2025 11:30:52 +0800 Subject: [PATCH 02/22] add missing column Signed-off-by: xinyual --- .../sql/calcite/CalciteRelNodeVisitor.java | 99 ++++++++++++------- 1 file changed, 64 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 4511b7e98ba..f06364996ff 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -39,6 +39,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.ViewExpanders; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; @@ -217,15 +218,14 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) { public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context){ visitChildren(node, context); final RelNode base = context.relBuilder.peek(); - node.getSubQuery().accept(this, context); - RelNode sub = context.relBuilder.peek(); - sub = alignToBaseSchemaOrThrow(base, sub, context); + CalcitePlanContext subqueryContext = context.clone(); + node.getSubQuery().accept(this, subqueryContext); - context.relBuilder.push(base); - context.relBuilder.union(true); + RelNode sub = subqueryContext.relBuilder.peek(); - return context.relBuilder.peek(); + RelNode out = unionAllPreserveAllColumns(base, sub, context, subqueryContext); + return out; } @Override @@ -2595,46 +2595,75 @@ private RexNode createOptimizedTransliteration( } } - private RelNode alignToBaseSchemaOrThrow(RelNode base, RelNode sub, CalcitePlanContext context) { - final List baseFields = base.getRowType().getFieldList(); - final Map subFieldByName = sub.getRowType().getFieldList().stream() - .collect(Collectors.toMap(RelDataTypeField::getName, f -> f, (a, b) -> a, LinkedHashMap::new)); + private RelNode unionAllPreserveAllColumns(RelNode base, RelNode sub, CalcitePlanContext baseContext, CalcitePlanContext subContext) { + LinkedHashMap targetColumnNameToType = collectTargetColumnNameToType(base, sub, baseContext); + RelNode castedBase = projectToSchema(base, targetColumnNameToType, baseContext); + RelNode castedSub = projectToSchema(base, targetColumnNameToType, subContext); - final RexBuilder rex = context.rexBuilder; - final RelBuilder b = context.relBuilder; + baseContext.relBuilder.push(castedBase); + baseContext.relBuilder.push(castedSub); + baseContext.relBuilder.union(true, 2); + return baseContext.relBuilder.peek(); + } - // 把 builder 栈顶替换成 sub,便于用 field(name) 构造投影 - // (此时栈顶本来就是 sub;如果你在其它地方用过 build() 清栈,则先 push(sub)) - // 这里稳妥起见,确保一下: - if (b.peek() != sub) { - b.push(sub); + private LinkedHashMap collectTargetColumnNameToType(RelNode left, RelNode right, CalcitePlanContext context) { + LinkedHashMap cols = new LinkedHashMap<>(); // name to type + for (RelDataTypeField f : left.getRowType().getFieldList()) { + cols.put(f.getName(), f.getType()); } + for (RelDataTypeField f : right.getRowType().getFieldList()) { + if (!cols.containsKey(f.getName())) { + cols.put(f.getName(), f.getType()); + } else { + RelDataType merged = + context.relBuilder.getTypeFactory().leastRestrictive(List.of(cols.get(f.getName()), f.getType())); + if (merged != null) cols.put(f.getName(), merged); + } + } + return cols; + } - List projects = new ArrayList<>(baseFields.size()); - List names = new ArrayList<>(baseFields.size()); - for (RelDataTypeField bf : baseFields) { - final String name = bf.getName(); - final RelDataType targetType = bf.getType(); + + private RelNode projectToSchema(RelNode input, + LinkedHashMap targetCols, + CalcitePlanContext context) { + RexBuilder rex = context.rexBuilder; + RelBuilder rel = context.relBuilder; + Map idx = + new LinkedHashMap<>(); // name to index + List inFields = input.getRowType().getFieldList(); + for (int i = 0; i < inFields.size(); i++) { + String n = inFields.get(i).getName(); + idx.put(n, i); + } + + List exprs = new ArrayList<>(targetCols.size()); + List names = new ArrayList<>(targetCols.size()); + + for (Map.Entry e : targetCols.entrySet()) { + String normName = e.getKey(); + RelDataType toType = e.getValue(); + RexNode expr; - RelDataTypeField sf = subFieldByName.get(name); - if (sf != null) { - // 有同名列:取之并必要时 CAST 到 base 的类型 - expr = b.field(name); - if (!expr.getType().equals(targetType)) { - expr = rex.makeCast(targetType, expr, /* matchNullability */ true); + Integer pos = idx.get(normName); + if (pos != null) { + RexNode ref = RexInputRef.of(pos, input.getRowType()); + if (!ref.getType().equals(toType)) { + ref = rex.makeCast(toType, ref, true); } + expr = ref; + names.add(inFields.get(pos).getName()); } else { - // 缺列:补 NULL 并 CAST 到目标类型 - RexNode nullLit = rex.makeNullLiteral(targetType); - expr = nullLit; + expr = rex.makeNullLiteral(toType); + names.add(normName); } - projects.add(expr); - names.add(name); + exprs.add(expr); } - b.project(projects, names); - return b.peek(); + rel.project(exprs, names); + + return rel.peek(); } } From ddf26a493403427c5c25a2f6dd1fd6f34ede1ef9 Mon Sep 17 00:00:00 2001 From: xinyual Date: Fri, 10 Oct 2025 11:23:27 +0800 Subject: [PATCH 03/22] add appendpipe poc Signed-off-by: xinyual --- .../opensearch/sql/ast/tree/AppendPipe.java | 39 +++++++------- .../sql/calcite/CalcitePlanContext.java | 51 +++++++++++++++++++ .../sql/calcite/CalciteRelNodeVisitor.java | 36 ++++++------- .../sql/calcite/CalciteRexNodeVisitor.java | 4 +- .../opensearch/sql/ppl/parser/AstBuilder.java | 7 +-- 5 files changed, 91 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java b/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java index adf78108748..0ea1cb9b453 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java @@ -7,13 +7,11 @@ import com.google.common.collect.ImmutableList; import java.util.List; -import javax.annotation.Nullable; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; import org.opensearch.sql.ast.AbstractNodeVisitor; -import org.opensearch.sql.ast.expression.UnresolvedExpression; @Getter @Setter @@ -21,28 +19,27 @@ @EqualsAndHashCode(callSuper = false) public class AppendPipe extends UnresolvedPlan { - private UnresolvedPlan subQuery; + private UnresolvedPlan subQuery; - private UnresolvedPlan child; + private UnresolvedPlan child; - public AppendPipe(UnresolvedPlan subQuery) { - this.subQuery = subQuery; - } + public AppendPipe(UnresolvedPlan subQuery) { + this.subQuery = subQuery; + } - @Override - public AppendPipe attach(UnresolvedPlan child) { - this.child = child; - return this; - } + @Override + public AppendPipe attach(UnresolvedPlan child) { + this.child = child; + return this; + } + @Override + public List getChild() { + return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); + } - @Override - public List getChild() { - return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); - } - - @Override - public T accept(AbstractNodeVisitor nodeVisitor, C context) { - return nodeVisitor.visitAppendPipe(this, context); - } + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitAppendPipe(this, context); + } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 47735bc4281..4913a0823bd 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -16,11 +16,14 @@ import java.util.function.BiFunction; import lombok.Getter; import lombok.Setter; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.rex.RexCorrelVariable; import org.apache.calcite.rex.RexLambdaRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.calcite.utils.CalciteToolsHelper; import org.opensearch.sql.executor.QueryType; @@ -67,6 +70,25 @@ private CalcitePlanContext(FrameworkConfig config, Integer querySizeLimit, Query this.rexLambdaRefMap = new HashMap<>(); } + private CalcitePlanContext( + FrameworkConfig config, + Integer querySizeLimit, + QueryType queryType, + Connection connection, + RelBuilder relBuilder, + ExtendedRexBuilder rexBuilder, + FunctionProperties functionProperties, + Map rexLambdaRefMap) { + this.config = config; + this.querySizeLimit = querySizeLimit; + this.queryType = queryType; + this.connection = connection; + this.relBuilder = relBuilder; + this.rexBuilder = rexBuilder; + this.functionProperties = functionProperties; + this.rexLambdaRefMap = rexLambdaRefMap; + } + public RexNode resolveJoinCondition( UnresolvedExpression expr, BiFunction transformFunction) { @@ -100,6 +122,35 @@ public CalcitePlanContext clone() { return new CalcitePlanContext(config, querySizeLimit, queryType); } + /** + * A deep copy to create a totally same one calciteContext + * + * @return a deep clone calcite context and current context + */ + public CalcitePlanContext deepClone() { + RelOptCluster cluster = this.relBuilder.getCluster(); + RelBuilderFactory factory = RelBuilder.proto(config.getContext()); + RelOptSchema schema = + this.relBuilder.getCluster().getPlanner().getContext().unwrap(RelOptSchema.class); + RelBuilder siblingRelBuilder = factory.create(cluster, schema); + siblingRelBuilder.push(this.relBuilder.peek()); // Add current logical plan as base + CalcitePlanContext clone = + new CalcitePlanContext( + config, + querySizeLimit, + queryType, + connection, + siblingRelBuilder, + new ExtendedRexBuilder(siblingRelBuilder.getRexBuilder()), + functionProperties, + rexLambdaRefMap); + clone.inCoalesceFunction = this.inCoalesceFunction; + clone.isProjectVisited = this.isProjectVisited; + clone.isResolvingJoinCondition = this.isResolvingJoinCondition; + clone.isResolvingSubquery = this.isResolvingSubquery; + return clone; + } + public static CalcitePlanContext create( FrameworkConfig config, Integer querySizeLimit, QueryType queryType) { return new CalcitePlanContext(config, querySizeLimit, queryType); diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index f06364996ff..c210e1715e5 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -39,7 +39,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.ViewExpanders; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; @@ -215,11 +214,11 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) { } @Override - public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context){ + public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) { visitChildren(node, context); final RelNode base = context.relBuilder.peek(); - CalcitePlanContext subqueryContext = context.clone(); + CalcitePlanContext subqueryContext = context.deepClone(); node.getSubQuery().accept(this, subqueryContext); RelNode sub = subqueryContext.relBuilder.peek(); @@ -2595,10 +2594,12 @@ private RexNode createOptimizedTransliteration( } } - private RelNode unionAllPreserveAllColumns(RelNode base, RelNode sub, CalcitePlanContext baseContext, CalcitePlanContext subContext) { - LinkedHashMap targetColumnNameToType = collectTargetColumnNameToType(base, sub, baseContext); + private RelNode unionAllPreserveAllColumns( + RelNode base, RelNode sub, CalcitePlanContext baseContext, CalcitePlanContext subContext) { + LinkedHashMap targetColumnNameToType = + collectTargetColumnNameToType(base, sub, baseContext); RelNode castedBase = projectToSchema(base, targetColumnNameToType, baseContext); - RelNode castedSub = projectToSchema(base, targetColumnNameToType, subContext); + RelNode castedSub = projectToSchema(sub, targetColumnNameToType, subContext); baseContext.relBuilder.push(castedBase); baseContext.relBuilder.push(castedSub); @@ -2606,7 +2607,8 @@ private RelNode unionAllPreserveAllColumns(RelNode base, RelNode sub, CalcitePla return baseContext.relBuilder.peek(); } - private LinkedHashMap collectTargetColumnNameToType(RelNode left, RelNode right, CalcitePlanContext context) { + private LinkedHashMap collectTargetColumnNameToType( + RelNode left, RelNode right, CalcitePlanContext context) { LinkedHashMap cols = new LinkedHashMap<>(); // name to type for (RelDataTypeField f : left.getRowType().getFieldList()) { cols.put(f.getName(), f.getType()); @@ -2616,22 +2618,21 @@ private LinkedHashMap collectTargetColumnNameToType(RelNode cols.put(f.getName(), f.getType()); } else { RelDataType merged = - context.relBuilder.getTypeFactory().leastRestrictive(List.of(cols.get(f.getName()), f.getType())); + context + .relBuilder + .getTypeFactory() + .leastRestrictive(List.of(cols.get(f.getName()), f.getType())); if (merged != null) cols.put(f.getName(), merged); } } return cols; } - - - private RelNode projectToSchema(RelNode input, - LinkedHashMap targetCols, - CalcitePlanContext context) { + private RelNode projectToSchema( + RelNode input, LinkedHashMap targetCols, CalcitePlanContext context) { RexBuilder rex = context.rexBuilder; RelBuilder rel = context.relBuilder; - Map idx = - new LinkedHashMap<>(); // name to index + Map idx = new LinkedHashMap<>(); // name to index List inFields = input.getRowType().getFieldList(); for (int i = 0; i < inFields.size(); i++) { String n = inFields.get(i).getName(); @@ -2639,7 +2640,7 @@ private RelNode projectToSchema(RelNode input, } List exprs = new ArrayList<>(targetCols.size()); - List names = new ArrayList<>(targetCols.size()); + List names = new ArrayList<>(targetCols.size()); for (Map.Entry e : targetCols.entrySet()) { String normName = e.getKey(); @@ -2650,7 +2651,7 @@ private RelNode projectToSchema(RelNode input, if (pos != null) { RexNode ref = RexInputRef.of(pos, input.getRowType()); if (!ref.getType().equals(toType)) { - ref = rex.makeCast(toType, ref, true); + ref = rex.makeCast(toType, ref, true); } expr = ref; names.add(inFields.get(pos).getName()); @@ -2665,5 +2666,4 @@ private RelNode projectToSchema(RelNode input, return rel.peek(); } - } diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java index f82e63302ab..6a066cbc86f 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java @@ -10,6 +10,7 @@ import static org.apache.commons.lang3.StringUtils.substringAfterLast; import static org.opensearch.sql.ast.expression.SpanUnit.NONE; import static org.opensearch.sql.ast.expression.SpanUnit.UNKNOWN; +import static org.opensearch.sql.calcite.CalcitePlanContext.create; import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY; import java.math.BigDecimal; @@ -410,7 +411,8 @@ public CalcitePlanContext prepareLambdaContext( String functionName, @Nullable RelDataType defaultTypeForReduceAcc) { try { - CalcitePlanContext lambdaContext = context.clone(); + CalcitePlanContext lambdaContext = + create(context.config, context.querySizeLimit, context.queryType); List candidateType = new ArrayList<>(); candidateType.add( ((ArraySqlType) previousArgument.get(0).getType()) diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 7f519f46b11..429917f0e5b 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -164,10 +164,7 @@ public UnresolvedPlan visitSubPipeline(OpenSearchPPLParser.SubPipelineContext ct throw new IllegalArgumentException("appendpipe [] is empty"); } UnresolvedPlan seed = visit(cmds.getFirst()); - return cmds.stream() - .skip(1) - .map(this::visit) - .reduce(seed, (left, op) -> op.attach(left)); + return cmds.stream().skip(1).map(this::visit).reduce(seed, (left, op) -> op.attach(left)); } @Override @@ -178,8 +175,6 @@ public UnresolvedPlan visitSubSearch(OpenSearchPPLParser.SubSearchContext ctx) { ctx.commands().stream().map(this::visit).reduce(searchCommand, (r, e) -> e.attach(r))); } - - /** Search command. */ @Override public UnresolvedPlan visitSearchFrom(SearchFromContext ctx) { From a8dd2ca5da8bb536541aba70243c596175596d1e Mon Sep 17 00:00:00 2001 From: xinyual Date: Fri, 10 Oct 2025 13:32:49 +0800 Subject: [PATCH 04/22] slighty change syntax Signed-off-by: xinyual --- ppl/src/main/antlr/OpenSearchPPLParser.g4 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index c40b7fa9e1d..31a201a07e7 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -20,7 +20,7 @@ pplStatement ; subPipeline - : commands (PIPE commands)* + : PIPE? commands (PIPE commands)* ; queryStatement From c3549f578555955013c05c6444911e86fc90ce89 Mon Sep 17 00:00:00 2001 From: xinyual Date: Sat, 11 Oct 2025 12:30:21 +0800 Subject: [PATCH 05/22] add unresolved plan Signed-off-by: xinyual --- .../sql/calcite/CalciteRelNodeVisitor.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 8c839c309ea..192bcc4d7a5 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -238,15 +238,20 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) { @Override public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) { visitChildren(node, context); - final RelNode base = context.relBuilder.peek(); + UnresolvedPlan subqueryPlan = + node.getSubQuery().accept(new EmptySourcePropagateVisitor(), null); + UnresolvedPlan childNode = subqueryPlan; + while (childNode.getChild() != null && !childNode.getChild().isEmpty()) { + childNode = (UnresolvedPlan) childNode.getChild().getFirst(); + } + childNode.attach(node.getChild().getFirst()); - CalcitePlanContext subqueryContext = context.deepClone(); - node.getSubQuery().accept(this, subqueryContext); + subqueryPlan.accept(this, context); - RelNode sub = subqueryContext.relBuilder.peek(); + RelNode subPipelineNode = context.relBuilder.build(); + RelNode mainNode = context.relBuilder.build(); + return mergeTableAndResolveColumnConflict(mainNode, subPipelineNode, context); - RelNode out = unionAllPreserveAllColumns(base, sub, context, subqueryContext); - return out; } @Override @@ -1710,11 +1715,14 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) { // 3. Merge two query schemas using shared logic RelNode subsearchNode = context.relBuilder.build(); RelNode mainNode = context.relBuilder.build(); + return mergeTableAndResolveColumnConflict(mainNode, subsearchNode, context); + } + private RelNode mergeTableAndResolveColumnConflict(RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) { // Use shared schema merging logic that handles type conflicts via field renaming - List nodesToMerge = Arrays.asList(mainNode, subsearchNode); + List nodesToMerge = Arrays.asList(mainNode, subqueryNode); List projectedNodes = - SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context); + SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context); // 4. Union the projected plans for (RelNode projectedNode : projectedNodes) { From 9dfc6263c65e6de6795c42731865e1fe50201cd9 Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 13 Oct 2025 13:48:47 +0800 Subject: [PATCH 06/22] add IT Signed-off-by: xinyual --- .../sql/calcite/CalciteRelNodeVisitor.java | 3 +- .../remote/CalcitePPLAppendPipeCommandIT.java | 240 ++++++++++++++++++ .../sql/ppl/utils/PPLQueryDataAnonymizer.java | 22 +- .../ppl/calcite/CalcitePPLAppendPipeTest.java | 120 +++++++++ .../ppl/utils/PPLQueryDataAnonymizerTest.java | 8 + 5 files changed, 389 insertions(+), 4 deletions(-) create mode 100644 integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java create mode 100644 ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 192bcc4d7a5..bbe70d1cb47 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -238,8 +238,7 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) { @Override public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) { visitChildren(node, context); - UnresolvedPlan subqueryPlan = - node.getSubQuery().accept(new EmptySourcePropagateVisitor(), null); + UnresolvedPlan subqueryPlan = node.getSubQuery(); UnresolvedPlan childNode = subqueryPlan; while (childNode.getChild() != null && !childNode.getChild().isEmpty()) { childNode = (UnresolvedPlan) childNode.getChild().getFirst(); diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java new file mode 100644 index 00000000000..aa49f8a5cf6 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java @@ -0,0 +1,240 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder; + +public class CalcitePPLAppendPipeCommandIT extends PPLIntegTestCase { + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + loadIndex(Index.ACCOUNT); + loadIndex(Index.BANK); + } + + @Test + public void testAppendPipe() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | appendpipe [ " + + " stats sum(age) as sum_age_by_state by state | sort sum_age_by_state ] |" + + " head 5", + TEST_INDEX_ACCOUNT)); + verifySchemaInOrder( + actual, + schema("sum_age_by_gender", "bigint"), + schema("gender", "string"), + schema("sum_age_by_state", "bigint"), + schema("state", "string")); + verifyDataRows( + actual, + rows(14947, "F", null, null), + rows(15224, "M", null, null), + rows(null, null, 369, "NV"), + rows(null, null, 412, "NM"), + rows(null, null, 414, "AZ")); + } + + @Test + public void testAppendEmptySearchCommand() throws IOException { + List emptySourcePPLs = + Arrays.asList( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ |" + + " stats sum(age) as sum_age_by_state by state ]", + TEST_INDEX_ACCOUNT), + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ ]", + TEST_INDEX_ACCOUNT), + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where age >" + + " 10 | append [ ] ]", + TEST_INDEX_ACCOUNT), + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where age >" + + " 10 | lookup %s gender as igender ]", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT)); + + for (String ppl : emptySourcePPLs) { + JSONObject actual = executeQuery(ppl); + verifySchemaInOrder( + actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string")); + verifyDataRows(actual, rows(14947, "F"), rows(15224, "M")); + } + } + + @Test + public void testAppendEmptySearchWithJoin() throws IOException { + withSettings( + Settings.Key.CALCITE_SUPPORT_ALL_JOIN_TYPES, + "true", + () -> { + List emptySourceWithJoinPPLs = + Arrays.asList( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | " + + " join left=L right=R on L.gender = R.gender %s ]", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT), + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | " + + " cross join left=L right=R on L.gender = R.gender %s ]", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT), + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | " + + " left join left=L right=R on L.gender = R.gender %s ]", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT), + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | " + + " semi join left=L right=R on L.gender = R.gender %s ]", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT)); + + for (String ppl : emptySourceWithJoinPPLs) { + JSONObject actual = null; + try { + actual = executeQuery(ppl); + } catch (IOException e) { + throw new RuntimeException(e); + } + verifySchemaInOrder( + actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string")); + verifyDataRows(actual, rows(14947, "F"), rows(15224, "M")); + } + + List emptySourceWithRightOrFullJoinPPLs = + Arrays.asList( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where" + + " gender = 'F' | right join on gender = gender [source=%s | stats" + + " count() as cnt by gender ] ]", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT), + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where" + + " gender = 'F' | full join on gender = gender [source=%s | stats" + + " count() as cnt by gender ] ]", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT)); + + for (String ppl : emptySourceWithRightOrFullJoinPPLs) { + JSONObject actual = null; + try { + actual = executeQuery(ppl); + } catch (IOException e) { + throw new RuntimeException(e); + } + verifySchemaInOrder( + actual, + schema("sum_age_by_gender", "bigint"), + schema("gender", "string"), + schema("cnt", "bigint")); + verifyDataRows( + actual, + rows(14947, "F", null), + rows(15224, "M", null), + rows(null, "F", 493), + rows(null, "M", 507)); + } + }); + } + + @Test + public void testAppendDifferentIndex() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats" + + " sum(age) as bank_sum_age ]", + TEST_INDEX_ACCOUNT, + TEST_INDEX_BANK)); + verifySchemaInOrder( + actual, + schema("sum", "bigint"), + schema("gender", "string"), + schema("bank_sum_age", "bigint")); + verifyDataRows(actual, rows(14947, "F", null), rows(15224, "M", null), rows(null, null, 238)); + } + + @Test + public void testAppendWithMergedColumn() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender |" + + " append [ source=%s | stats sum(age) as sum by state | sort sum ] | head 5", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT)); + verifySchemaInOrder( + actual, schema("sum", "bigint"), schema("gender", "string"), schema("state", "string")); + verifyDataRows( + actual, + rows(14947, "F", null), + rows(15224, "M", null), + rows(369, null, "NV"), + rows(412, null, "NM"), + rows(414, null, "AZ")); + } + + @Test + public void testAppendWithConflictTypeColumn() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats sum(age)" + + " as sum by state | sort sum | eval sum = cast(sum as double) ] | head 5", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT)); + verifySchemaInOrder( + actual, + schema("sum", "bigint"), + schema("gender", "string"), + schema("state", "string"), + schema("sum0", "double")); + verifyDataRows( + actual, + rows(14947, "F", null, null), + rows(15224, "M", null, null), + rows(null, null, "NV", 369d), + rows(null, null, "NM", 412d), + rows(null, null, "AZ", 414d)); + } +} + diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index bfa263a913a..811f25da06b 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -55,6 +55,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.CountBin; import org.opensearch.sql.ast.tree.Dedupe; @@ -255,8 +256,12 @@ public String visitSearch(Search node, String context) { @Override public String visitFilter(Filter node, String context) { - String child = node.getChild().get(0).accept(this, context); String condition = visitExpression(node.getCondition()); + if (node.getChild().isEmpty()) { + return StringUtils.format("where %s", condition); + } + String child = node.getChild().get(0).accept(this, context); + return StringUtils.format("%s | where %s", child, condition); } @@ -280,7 +285,6 @@ public String visitRename(Rename node, String context) { /** Build {@link LogicalAggregation}. */ @Override public String visitAggregation(Aggregation node, String context) { - String child = node.getChild().get(0).accept(this, context); UnresolvedExpression span = node.getSpan(); List groupByExprList = new ArrayList<>(); if (!Objects.isNull(span)) { @@ -288,6 +292,11 @@ public String visitAggregation(Aggregation node, String context) { } groupByExprList.addAll(node.getGroupExprList()); final String group = visitExpressionList(groupByExprList); + if (node.getChild().isEmpty()) { + return StringUtils.format( + "stats %s", String.join(" ", visitExpressionList(node.getAggExprList()), groupBy(group)).trim()); + } + String child = node.getChild().get(0).accept(this, context); return StringUtils.format( "%s | stats %s", child, String.join(" ", visitExpressionList(node.getAggExprList()), groupBy(group)).trim()); @@ -637,6 +646,15 @@ private String visitExpression(UnresolvedExpression expression) { return expressionAnalyzer.analyze(expression, null); } + @Override + public String visitAppendPipe(AppendPipe node, String context) { + String child = node.getChild().get(0).accept(this, context); + String subPipeline = node.getSubQuery().accept(this, context); + return StringUtils.format( + "%s | appendpipe [%s]", child, subPipeline + ); + } + @Override public String visitFillNull(FillNull node, String context) { String child = node.getChild().get(0).accept(this, context); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java new file mode 100644 index 00000000000..48e272e49e8 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.test.CalciteAssert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class CalcitePPLAppendPipeTest extends CalcitePPLAbstractTest{ + public CalcitePPLAppendPipeTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Test + public void testAppendPipe() { + String ppl = "source=EMP | appendpipe [ where DEPTNO = 20 ]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalFilter(condition=[=($7, 20)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + verifyResultCount(root, 19); // 14 original table rows + 5 filtered subquery rows + + String expectedSparkSql = + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 20"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testAppendPipeEmptySearchCommand() { + List emptySourcePPLs = + Arrays.asList( + "source=EMP | append [ | where DEPTNO = 20 ]", + "source=EMP | append [ ]", + "source=EMP | append [ | where DEPTNO = 20 | append [ ] ]", + "source=EMP | append [ | where DEPTNO = 10 | lookup DEPT DEPTNO append LOC as JOB ]"); + + for (String ppl : emptySourcePPLs) { + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalValues(tuples=[[]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM (VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) `t` (`EMPNO`," + + " `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`)\n" + + "WHERE 1 = 0"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + } + + + + @Test + public void testAppendPipeWithMergedColumns() { + String ppl = + "source=EMP | fields DEPTNO | appendpipe [ fields DEPTNO | eval DEPTNO_PLUS =" + + " DEPTNO + 10 ]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalProject(DEPTNO=[$7], DEPTNO_PLUS=[null:INTEGER])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(DEPTNO=[$7], DEPTNO_PLUS=[+($7, 10)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + verifyResultCount(root, 28); + + String expectedSparkSql = + "SELECT `DEPTNO`, CAST(NULL AS INTEGER) `DEPTNO_PLUS`\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT `DEPTNO`, `DEPTNO` + 10 `DEPTNO_PLUS`\n" + + "FROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testAppendPipeWithConflictTypeColumn() { + String ppl = + "source=EMP | fields DEPTNO | appendpipe [ | fields DEPTNO | eval DEPTNO = 20 ]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalProject(DEPTNO=[$7], DEPTNO0=[null:INTEGER])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(DEPTNO=[null:TINYINT], DEPTNO0=[20])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + verifyResultCount(root, 28); + + String expectedSparkSql = + "SELECT `DEPTNO`, CAST(NULL AS INTEGER) `DEPTNO0`\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT CAST(NULL AS TINYINT) `DEPTNO`, 20 `DEPTNO0`\n" + + "FROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index dec05cdb2e8..aed575f1f06 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -652,6 +652,14 @@ public void testRegex() { anonymize("source=t | regex email='.*@domain.com' | fields email")); } + @Test + public void testAppendPipe() { + assertEquals( + "source=table | appendpipe [stats count()]", anonymize("source=t | appendpipe [stats count()]")); + assertEquals( + "source=table | appendpipe [where identifier = ***]", anonymize("source=t | appendpipe [where fieldname=='pattern']")); + } + @Test public void testRexCommand() { when(settings.getSettingValue(Key.PPL_REX_MAX_MATCH_LIMIT)).thenReturn(10); From f88730fd598db8fc7f174a4300f24889a32d4bd2 Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 13 Oct 2025 16:36:19 +0800 Subject: [PATCH 07/22] add tests Signed-off-by: xinyual --- .../sql/calcite/CalciteRelNodeVisitor.java | 10 +- .../remote/CalcitePPLAppendPipeCommandIT.java | 302 +++++------------- .../sql/ppl/utils/PPLQueryDataAnonymizer.java | 42 +-- .../ppl/calcite/CalcitePPLAppendPipeTest.java | 209 ++++++------ .../ppl/utils/PPLQueryDataAnonymizerTest.java | 9 +- 5 files changed, 203 insertions(+), 369 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index bbe70d1cb47..bfcb5e73ce1 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -240,7 +240,9 @@ public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) { visitChildren(node, context); UnresolvedPlan subqueryPlan = node.getSubQuery(); UnresolvedPlan childNode = subqueryPlan; - while (childNode.getChild() != null && !childNode.getChild().isEmpty()) { + while (childNode.getChild() != null + && !childNode.getChild().isEmpty() + && !(childNode.getChild().getFirst() instanceof Values)) { childNode = (UnresolvedPlan) childNode.getChild().getFirst(); } childNode.attach(node.getChild().getFirst()); @@ -250,7 +252,6 @@ public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) { RelNode subPipelineNode = context.relBuilder.build(); RelNode mainNode = context.relBuilder.build(); return mergeTableAndResolveColumnConflict(mainNode, subPipelineNode, context); - } @Override @@ -1717,11 +1718,12 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) { return mergeTableAndResolveColumnConflict(mainNode, subsearchNode, context); } - private RelNode mergeTableAndResolveColumnConflict(RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) { + private RelNode mergeTableAndResolveColumnConflict( + RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) { // Use shared schema merging logic that handles type conflicts via field renaming List nodesToMerge = Arrays.asList(mainNode, subqueryNode); List projectedNodes = - SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context); + SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context); // 4. Union the projected plans for (RelNode projectedNode : projectedNodes) { diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java index aa49f8a5cf6..5366cb83145 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java @@ -5,16 +5,6 @@ package org.opensearch.sql.calcite.remote; -import org.json.JSONObject; -import org.junit.Test; -import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.ppl.PPLIntegTestCase; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Locale; - import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; import static org.opensearch.sql.util.MatcherUtils.rows; @@ -22,219 +12,83 @@ import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder; -public class CalcitePPLAppendPipeCommandIT extends PPLIntegTestCase { - @Override - public void init() throws Exception { - super.init(); - enableCalcite(); - loadIndex(Index.ACCOUNT); - loadIndex(Index.BANK); - } - - @Test - public void testAppendPipe() throws IOException { - JSONObject actual = - executeQuery( - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum_age_by_gender by gender | appendpipe [ " - + " stats sum(age) as sum_age_by_state by state | sort sum_age_by_state ] |" - + " head 5", - TEST_INDEX_ACCOUNT)); - verifySchemaInOrder( - actual, - schema("sum_age_by_gender", "bigint"), - schema("gender", "string"), - schema("sum_age_by_state", "bigint"), - schema("state", "string")); - verifyDataRows( - actual, - rows(14947, "F", null, null), - rows(15224, "M", null, null), - rows(null, null, 369, "NV"), - rows(null, null, 412, "NM"), - rows(null, null, 414, "AZ")); - } - - @Test - public void testAppendEmptySearchCommand() throws IOException { - List emptySourcePPLs = - Arrays.asList( - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ |" - + " stats sum(age) as sum_age_by_state by state ]", - TEST_INDEX_ACCOUNT), - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ ]", - TEST_INDEX_ACCOUNT), - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where age >" - + " 10 | append [ ] ]", - TEST_INDEX_ACCOUNT), - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where age >" - + " 10 | lookup %s gender as igender ]", - TEST_INDEX_ACCOUNT, - TEST_INDEX_ACCOUNT)); - - for (String ppl : emptySourcePPLs) { - JSONObject actual = executeQuery(ppl); - verifySchemaInOrder( - actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string")); - verifyDataRows(actual, rows(14947, "F"), rows(15224, "M")); - } - } - - @Test - public void testAppendEmptySearchWithJoin() throws IOException { - withSettings( - Settings.Key.CALCITE_SUPPORT_ALL_JOIN_TYPES, - "true", - () -> { - List emptySourceWithJoinPPLs = - Arrays.asList( - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | " - + " join left=L right=R on L.gender = R.gender %s ]", - TEST_INDEX_ACCOUNT, - TEST_INDEX_ACCOUNT), - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | " - + " cross join left=L right=R on L.gender = R.gender %s ]", - TEST_INDEX_ACCOUNT, - TEST_INDEX_ACCOUNT), - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | " - + " left join left=L right=R on L.gender = R.gender %s ]", - TEST_INDEX_ACCOUNT, - TEST_INDEX_ACCOUNT), - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | " - + " semi join left=L right=R on L.gender = R.gender %s ]", - TEST_INDEX_ACCOUNT, - TEST_INDEX_ACCOUNT)); - - for (String ppl : emptySourceWithJoinPPLs) { - JSONObject actual = null; - try { - actual = executeQuery(ppl); - } catch (IOException e) { - throw new RuntimeException(e); - } - verifySchemaInOrder( - actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string")); - verifyDataRows(actual, rows(14947, "F"), rows(15224, "M")); - } - - List emptySourceWithRightOrFullJoinPPLs = - Arrays.asList( - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where" - + " gender = 'F' | right join on gender = gender [source=%s | stats" - + " count() as cnt by gender ] ]", - TEST_INDEX_ACCOUNT, - TEST_INDEX_ACCOUNT), - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where" - + " gender = 'F' | full join on gender = gender [source=%s | stats" - + " count() as cnt by gender ] ]", - TEST_INDEX_ACCOUNT, - TEST_INDEX_ACCOUNT)); - - for (String ppl : emptySourceWithRightOrFullJoinPPLs) { - JSONObject actual = null; - try { - actual = executeQuery(ppl); - } catch (IOException e) { - throw new RuntimeException(e); - } - verifySchemaInOrder( - actual, - schema("sum_age_by_gender", "bigint"), - schema("gender", "string"), - schema("cnt", "bigint")); - verifyDataRows( - actual, - rows(14947, "F", null), - rows(15224, "M", null), - rows(null, "F", 493), - rows(null, "M", 507)); - } - }); - } - - @Test - public void testAppendDifferentIndex() throws IOException { - JSONObject actual = - executeQuery( - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats" - + " sum(age) as bank_sum_age ]", - TEST_INDEX_ACCOUNT, - TEST_INDEX_BANK)); - verifySchemaInOrder( - actual, - schema("sum", "bigint"), - schema("gender", "string"), - schema("bank_sum_age", "bigint")); - verifyDataRows(actual, rows(14947, "F", null), rows(15224, "M", null), rows(null, null, 238)); - } - - @Test - public void testAppendWithMergedColumn() throws IOException { - JSONObject actual = - executeQuery( - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum by gender |" - + " append [ source=%s | stats sum(age) as sum by state | sort sum ] | head 5", - TEST_INDEX_ACCOUNT, - TEST_INDEX_ACCOUNT)); - verifySchemaInOrder( - actual, schema("sum", "bigint"), schema("gender", "string"), schema("state", "string")); - verifyDataRows( - actual, - rows(14947, "F", null), - rows(15224, "M", null), - rows(369, null, "NV"), - rows(412, null, "NM"), - rows(414, null, "AZ")); - } +import java.io.IOException; +import java.util.Locale; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.sql.ppl.PPLIntegTestCase; - @Test - public void testAppendWithConflictTypeColumn() throws IOException { - JSONObject actual = - executeQuery( - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats sum(age)" - + " as sum by state | sort sum | eval sum = cast(sum as double) ] | head 5", - TEST_INDEX_ACCOUNT, - TEST_INDEX_ACCOUNT)); - verifySchemaInOrder( - actual, - schema("sum", "bigint"), - schema("gender", "string"), - schema("state", "string"), - schema("sum0", "double")); - verifyDataRows( - actual, - rows(14947, "F", null, null), - rows(15224, "M", null, null), - rows(null, null, "NV", 369d), - rows(null, null, "NM", 412d), - rows(null, null, "AZ", 414d)); - } +public class CalcitePPLAppendPipeCommandIT extends PPLIntegTestCase { + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + loadIndex(Index.ACCOUNT); + loadIndex(Index.BANK); + } + + @Test + public void testAppendPipe() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | appendpipe [ " + + " sort -sum_age_by_gender ] |" + + " head 5", + TEST_INDEX_ACCOUNT)); + verifySchemaInOrder(actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string")); + verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(15224, "M"), rows(14947, "F")); + } + + @Test + public void testAppendDifferentIndex() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats" + + " sum(age) as bank_sum_age ]", + TEST_INDEX_ACCOUNT, + TEST_INDEX_BANK)); + verifySchemaInOrder( + actual, + schema("sum", "bigint"), + schema("gender", "string"), + schema("bank_sum_age", "bigint")); + verifyDataRows(actual, rows(14947, "F", null), rows(15224, "M", null), rows(null, null, 238)); + } + + @Test + public void testAppendpipeWithMergedColumn() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender |" + + " appendpipe [ stats sum(sum) as sum ] | head 5", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT)); + verifySchemaInOrder(actual, schema("sum", "bigint"), schema("gender", "string")); + verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(30171, null)); + } + + @Test + public void testAppendpipeWithConflictTypeColumn() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender | appendpipe [ eval sum = cast(sum as" + + " double) ] | head 5", + TEST_INDEX_ACCOUNT)); + verifySchemaInOrder( + actual, schema("sum", "bigint"), schema("gender", "string"), schema("sum0", "double")); + verifyDataRows( + actual, + rows(14947, "F", null), + rows(15224, "M", null), + rows(null, "F", 14947d), + rows(null, "M", 15224d)); + } } - diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 250d3f1c07e..9c6e6684df2 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -81,7 +81,6 @@ import org.opensearch.sql.ast.tree.Rename; import org.opensearch.sql.ast.tree.Reverse; import org.opensearch.sql.ast.tree.Rex; -import org.opensearch.sql.ast.tree.SPath; import org.opensearch.sql.ast.tree.Search; import org.opensearch.sql.ast.tree.Sort; import org.opensearch.sql.ast.tree.SpanBin; @@ -257,12 +256,8 @@ public String visitSearch(Search node, String context) { @Override public String visitFilter(Filter node, String context) { - String condition = visitExpression(node.getCondition()); - if (node.getChild().isEmpty()) { - return StringUtils.format("where %s", condition); - } String child = node.getChild().get(0).accept(this, context); - + String condition = visitExpression(node.getCondition()); return StringUtils.format("%s | where %s", child, condition); } @@ -286,6 +281,7 @@ public String visitRename(Rename node, String context) { /** Build {@link LogicalAggregation}. */ @Override public String visitAggregation(Aggregation node, String context) { + String child = node.getChild().get(0).accept(this, context); UnresolvedExpression span = node.getSpan(); List groupByExprList = new ArrayList<>(); if (!Objects.isNull(span)) { @@ -293,11 +289,6 @@ public String visitAggregation(Aggregation node, String context) { } groupByExprList.addAll(node.getGroupExprList()); final String group = visitExpressionList(groupByExprList); - if (node.getChild().isEmpty()) { - return StringUtils.format( - "stats %s", String.join(" ", visitExpressionList(node.getAggExprList()), groupBy(group)).trim()); - } - String child = node.getChild().get(0).accept(this, context); return StringUtils.format( "%s | stats %s", child, String.join(" ", visitExpressionList(node.getAggExprList()), groupBy(group)).trim()); @@ -649,11 +640,15 @@ private String visitExpression(UnresolvedExpression expression) { @Override public String visitAppendPipe(AppendPipe node, String context) { + Values emptyValue = new Values(null); + UnresolvedPlan childNode = node.getSubQuery(); + while (childNode != null && !childNode.getChild().isEmpty()) { + childNode = (UnresolvedPlan) childNode.getChild().get(0); + } + childNode.attach(emptyValue); String child = node.getChild().get(0).accept(this, context); - String subPipeline = node.getSubQuery().accept(this, context); - return StringUtils.format( - "%s | appendpipe [%s]", child, subPipeline - ); + String subPipeline = anonymizeData(node.getSubQuery()); + return StringUtils.format("%s | appendpipe [%s]", child, subPipeline); } @Override @@ -700,23 +695,6 @@ public String visitFillNull(FillNull node, String context) { } } - @Override - public String visitSpath(SPath node, String context) { - String child = node.getChild().get(0).accept(this, context); - StringBuilder builder = new StringBuilder(); - builder.append(child).append(" | spath"); - if (node.getInField() != null) { - builder.append(" input=").append(MASK_COLUMN); - } - if (node.getOutField() != null) { - builder.append(" output=").append(MASK_COLUMN); - } - if (node.getPath() != null) { - builder.append(" path=").append(MASK_COLUMN); - } - return builder.toString(); - } - @Override public String visitPatterns(Patterns node, String context) { String child = node.getChild().get(0).accept(this, context); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java index 48e272e49e8..b6eb0693036 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java @@ -5,116 +5,111 @@ package org.opensearch.sql.ppl.calcite; +import java.util.Arrays; +import java.util.List; import org.apache.calcite.rel.RelNode; import org.apache.calcite.test.CalciteAssert; import org.junit.Test; -import java.util.Arrays; -import java.util.List; - -public class CalcitePPLAppendPipeTest extends CalcitePPLAbstractTest{ - public CalcitePPLAppendPipeTest() { - super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); +public class CalcitePPLAppendPipeTest extends CalcitePPLAbstractTest { + public CalcitePPLAppendPipeTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Test + public void testAppendPipe() { + String ppl = "source=EMP | appendpipe [ where DEPTNO = 20 ]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalFilter(condition=[=($7, 20)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + verifyResultCount(root, 19); // 14 original table rows + 5 filtered subquery rows + + String expectedSparkSql = + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 20"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testAppendPipeEmptySearchCommand() { + List emptySourcePPLs = + Arrays.asList( + "source=EMP | append [ | where DEPTNO = 20 ]", + "source=EMP | append [ ]", + "source=EMP | append [ | where DEPTNO = 20 | append [ ] ]", + "source=EMP | append [ | where DEPTNO = 10 | lookup DEPT DEPTNO append LOC as JOB ]"); + + for (String ppl : emptySourcePPLs) { + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalValues(tuples=[[]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM (VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) `t` (`EMPNO`," + + " `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`)\n" + + "WHERE 1 = 0"; + verifyPPLToSparkSQL(root, expectedSparkSql); } - - @Test - public void testAppendPipe() { - String ppl = "source=EMP | appendpipe [ where DEPTNO = 20 ]"; - RelNode root = getRelNode(ppl); - String expectedLogical = - "LogicalUnion(all=[true])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalFilter(condition=[=($7, 20)])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; - verifyLogical(root, expectedLogical); - verifyResultCount(root, 19); // 14 original table rows + 5 filtered subquery rows - - String expectedSparkSql = - "SELECT *\n" - + "FROM `scott`.`EMP`\n" - + "UNION ALL\n" - + "SELECT *\n" - + "FROM `scott`.`EMP`\n" - + "WHERE `DEPTNO` = 20"; - verifyPPLToSparkSQL(root, expectedSparkSql); - } - - @Test - public void testAppendPipeEmptySearchCommand() { - List emptySourcePPLs = - Arrays.asList( - "source=EMP | append [ | where DEPTNO = 20 ]", - "source=EMP | append [ ]", - "source=EMP | append [ | where DEPTNO = 20 | append [ ] ]", - "source=EMP | append [ | where DEPTNO = 10 | lookup DEPT DEPTNO append LOC as JOB ]"); - - for (String ppl : emptySourcePPLs) { - RelNode root = getRelNode(ppl); - String expectedLogical = - "LogicalUnion(all=[true])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalValues(tuples=[[]])\n"; - verifyLogical(root, expectedLogical); - - String expectedSparkSql = - "SELECT *\n" - + "FROM `scott`.`EMP`\n" - + "UNION ALL\n" - + "SELECT *\n" - + "FROM (VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) `t` (`EMPNO`," - + " `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`)\n" - + "WHERE 1 = 0"; - verifyPPLToSparkSQL(root, expectedSparkSql); - } - } - - - - @Test - public void testAppendPipeWithMergedColumns() { - String ppl = - "source=EMP | fields DEPTNO | appendpipe [ fields DEPTNO | eval DEPTNO_PLUS =" - + " DEPTNO + 10 ]"; - RelNode root = getRelNode(ppl); - String expectedLogical = - "LogicalUnion(all=[true])\n" - + " LogicalProject(DEPTNO=[$7], DEPTNO_PLUS=[null:INTEGER])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(DEPTNO=[$7], DEPTNO_PLUS=[+($7, 10)])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; - verifyLogical(root, expectedLogical); - verifyResultCount(root, 28); - - String expectedSparkSql = - "SELECT `DEPTNO`, CAST(NULL AS INTEGER) `DEPTNO_PLUS`\n" - + "FROM `scott`.`EMP`\n" - + "UNION ALL\n" - + "SELECT `DEPTNO`, `DEPTNO` + 10 `DEPTNO_PLUS`\n" - + "FROM `scott`.`EMP`"; - verifyPPLToSparkSQL(root, expectedSparkSql); - } - - @Test - public void testAppendPipeWithConflictTypeColumn() { - String ppl = - "source=EMP | fields DEPTNO | appendpipe [ | fields DEPTNO | eval DEPTNO = 20 ]"; - RelNode root = getRelNode(ppl); - String expectedLogical = - "LogicalUnion(all=[true])\n" - + " LogicalProject(DEPTNO=[$7], DEPTNO0=[null:INTEGER])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(DEPTNO=[null:TINYINT], DEPTNO0=[20])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; - verifyLogical(root, expectedLogical); - verifyResultCount(root, 28); - - String expectedSparkSql = - "SELECT `DEPTNO`, CAST(NULL AS INTEGER) `DEPTNO0`\n" - + "FROM `scott`.`EMP`\n" - + "UNION ALL\n" - + "SELECT CAST(NULL AS TINYINT) `DEPTNO`, 20 `DEPTNO0`\n" - + "FROM `scott`.`EMP`"; - verifyPPLToSparkSQL(root, expectedSparkSql); - } - + } + + @Test + public void testAppendPipeWithMergedColumns() { + String ppl = + "source=EMP | fields DEPTNO | appendpipe [ fields DEPTNO | eval DEPTNO_PLUS =" + + " DEPTNO + 10 ]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalProject(DEPTNO=[$7], DEPTNO_PLUS=[null:INTEGER])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(DEPTNO=[$7], DEPTNO_PLUS=[+($7, 10)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + verifyResultCount(root, 28); + + String expectedSparkSql = + "SELECT `DEPTNO`, CAST(NULL AS INTEGER) `DEPTNO_PLUS`\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT `DEPTNO`, `DEPTNO` + 10 `DEPTNO_PLUS`\n" + + "FROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testAppendPipeWithConflictTypeColumn() { + String ppl = "source=EMP | fields DEPTNO | appendpipe [ | fields DEPTNO | eval DEPTNO = 20 ]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalProject(DEPTNO=[$7], DEPTNO0=[null:INTEGER])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(DEPTNO=[null:TINYINT], DEPTNO0=[20])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + verifyResultCount(root, 28); + + String expectedSparkSql = + "SELECT `DEPTNO`, CAST(NULL AS INTEGER) `DEPTNO0`\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT CAST(NULL AS TINYINT) `DEPTNO`, 20 `DEPTNO0`\n" + + "FROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 4c1af24bd8c..07bd1c136e4 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -655,9 +655,14 @@ public void testRegex() { @Test public void testAppendPipe() { assertEquals( - "source=table | appendpipe [stats count()]", anonymize("source=t | appendpipe [stats count()]")); + "source=table | appendpipe [ | stats count()]", + anonymize("source=t | appendpipe [stats count()]")); assertEquals( - "source=table | appendpipe [where identifier = ***]", anonymize("source=t | appendpipe [where fieldname=='pattern']")); + "source=table | appendpipe [ | where identifier = ***]", + anonymize("source=t | appendpipe [where fieldname=='pattern']")); + assertEquals( + "source=table | appendpipe [ | sort identifier]", + anonymize("source=t | appendpipe [sort fieldname]")); } @Test From 4c281cc67296f34b5467ebccd38232bf9951b644 Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 20 Oct 2025 15:50:13 +0800 Subject: [PATCH 08/22] remove useless ut Signed-off-by: xinyual --- .../ppl/calcite/CalcitePPLAppendPipeTest.java | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java index b6eb0693036..8887ae803a9 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java @@ -5,8 +5,6 @@ package org.opensearch.sql.ppl.calcite; -import java.util.Arrays; -import java.util.List; import org.apache.calcite.rel.RelNode; import org.apache.calcite.test.CalciteAssert; import org.junit.Test; @@ -38,35 +36,6 @@ public void testAppendPipe() { verifyPPLToSparkSQL(root, expectedSparkSql); } - @Test - public void testAppendPipeEmptySearchCommand() { - List emptySourcePPLs = - Arrays.asList( - "source=EMP | append [ | where DEPTNO = 20 ]", - "source=EMP | append [ ]", - "source=EMP | append [ | where DEPTNO = 20 | append [ ] ]", - "source=EMP | append [ | where DEPTNO = 10 | lookup DEPT DEPTNO append LOC as JOB ]"); - - for (String ppl : emptySourcePPLs) { - RelNode root = getRelNode(ppl); - String expectedLogical = - "LogicalUnion(all=[true])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalValues(tuples=[[]])\n"; - verifyLogical(root, expectedLogical); - - String expectedSparkSql = - "SELECT *\n" - + "FROM `scott`.`EMP`\n" - + "UNION ALL\n" - + "SELECT *\n" - + "FROM (VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) `t` (`EMPNO`," - + " `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`)\n" - + "WHERE 1 = 0"; - verifyPPLToSparkSQL(root, expectedSparkSql); - } - } - @Test public void testAppendPipeWithMergedColumns() { String ppl = From 93eb5b3fdbffaa00a49b66a98e5aa5afb42ac3d4 Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 20 Oct 2025 15:59:11 +0800 Subject: [PATCH 09/22] fix conflict Signed-off-by: xinyual --- ppl/src/main/antlr/OpenSearchPPLParser.g4 | 3 --- 1 file changed, 3 deletions(-) diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 59050146bb4..cdf2c22e233 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -83,11 +83,8 @@ commands | regexCommand | timechartCommand | rexCommand -<<<<<<< HEAD | appendPipeCommand -======= | replaceCommand ->>>>>>> origin/main ; commandName From 9148512997df04fd4c2ff8f5c8304eb4f3a3625e Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 20 Oct 2025 16:03:10 +0800 Subject: [PATCH 10/22] remove useless code Signed-off-by: xinyual --- .../sql/calcite/CalcitePlanContext.java | 51 ------------------- 1 file changed, 51 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index a7a794826a2..669d8452dc0 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -16,14 +16,11 @@ import java.util.function.BiFunction; import lombok.Getter; import lombok.Setter; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.rex.RexCorrelVariable; import org.apache.calcite.rex.RexLambdaRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.RelBuilder; -import org.apache.calcite.tools.RelBuilderFactory; import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.calcite.utils.CalciteToolsHelper; import org.opensearch.sql.common.setting.Settings; @@ -75,25 +72,6 @@ private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType this.rexLambdaRefMap = new HashMap<>(); } - private CalcitePlanContext( - FrameworkConfig config, - Integer querySizeLimit, - QueryType queryType, - Connection connection, - RelBuilder relBuilder, - ExtendedRexBuilder rexBuilder, - FunctionProperties functionProperties, - Map rexLambdaRefMap) { - this.config = config; - this.querySizeLimit = querySizeLimit; - this.queryType = queryType; - this.connection = connection; - this.relBuilder = relBuilder; - this.rexBuilder = rexBuilder; - this.functionProperties = functionProperties; - this.rexLambdaRefMap = rexLambdaRefMap; - } - public RexNode resolveJoinCondition( UnresolvedExpression expr, BiFunction transformFunction) { @@ -127,35 +105,6 @@ public CalcitePlanContext clone() { return new CalcitePlanContext(config, sysLimit, queryType); } - /** - * A deep copy to create a totally same one calciteContext - * - * @return a deep clone calcite context and current context - */ - public CalcitePlanContext deepClone() { - RelOptCluster cluster = this.relBuilder.getCluster(); - RelBuilderFactory factory = RelBuilder.proto(config.getContext()); - RelOptSchema schema = - this.relBuilder.getCluster().getPlanner().getContext().unwrap(RelOptSchema.class); - RelBuilder siblingRelBuilder = factory.create(cluster, schema); - siblingRelBuilder.push(this.relBuilder.peek()); // Add current logical plan as base - CalcitePlanContext clone = - new CalcitePlanContext( - config, - querySizeLimit, - queryType, - connection, - siblingRelBuilder, - new ExtendedRexBuilder(siblingRelBuilder.getRexBuilder()), - functionProperties, - rexLambdaRefMap); - clone.inCoalesceFunction = this.inCoalesceFunction; - clone.isProjectVisited = this.isProjectVisited; - clone.isResolvingJoinCondition = this.isResolvingJoinCondition; - clone.isResolvingSubquery = this.isResolvingSubquery; - return clone; - } - public static CalcitePlanContext create( FrameworkConfig config, SysLimit sysLimit, QueryType queryType) { return new CalcitePlanContext(config, sysLimit, queryType); From 345cdeba1a10f876d2ff1b03aea07a8f383ca78d Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 20 Oct 2025 16:08:02 +0800 Subject: [PATCH 11/22] remove useless code Signed-off-by: xinyual --- .../org/opensearch/sql/calcite/CalciteRexNodeVisitor.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java index 45300932c31..bb6f16b2e8a 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java @@ -9,7 +9,6 @@ import static org.apache.calcite.sql.SqlKind.AS; import static org.opensearch.sql.ast.expression.SpanUnit.NONE; import static org.opensearch.sql.ast.expression.SpanUnit.UNKNOWN; -import static org.opensearch.sql.calcite.CalcitePlanContext.create; import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY; import java.math.BigDecimal; @@ -323,8 +322,7 @@ public CalcitePlanContext prepareLambdaContext( String functionName, @Nullable RelDataType defaultTypeForReduceAcc) { try { - CalcitePlanContext lambdaContext = - create(context.config, context.querySizeLimit, context.queryType); + CalcitePlanContext lambdaContext = context.clone(); List candidateType = new ArrayList<>(); candidateType.add( ((ArraySqlType) previousArgument.get(0).getType()) From 3d4d8657e006203c0d02d729d4134d9355bd998b Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 20 Oct 2025 16:09:19 +0800 Subject: [PATCH 12/22] remove useless code Signed-off-by: xinyual --- .../sql/calcite/CalciteRelNodeVisitor.java | 73 ------------------- 1 file changed, 73 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 464f8e8113c..e59a8bcfbbc 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -2785,77 +2785,4 @@ private RexNode createOptimizedTransliteration( throw new RuntimeException("Failed to optimize sed expression: " + sedExpression, e); } } - - private RelNode unionAllPreserveAllColumns( - RelNode base, RelNode sub, CalcitePlanContext baseContext, CalcitePlanContext subContext) { - LinkedHashMap targetColumnNameToType = - collectTargetColumnNameToType(base, sub, baseContext); - RelNode castedBase = projectToSchema(base, targetColumnNameToType, baseContext); - RelNode castedSub = projectToSchema(sub, targetColumnNameToType, subContext); - - baseContext.relBuilder.push(castedBase); - baseContext.relBuilder.push(castedSub); - baseContext.relBuilder.union(true, 2); - return baseContext.relBuilder.peek(); - } - - private LinkedHashMap collectTargetColumnNameToType( - RelNode left, RelNode right, CalcitePlanContext context) { - LinkedHashMap cols = new LinkedHashMap<>(); // name to type - for (RelDataTypeField f : left.getRowType().getFieldList()) { - cols.put(f.getName(), f.getType()); - } - for (RelDataTypeField f : right.getRowType().getFieldList()) { - if (!cols.containsKey(f.getName())) { - cols.put(f.getName(), f.getType()); - } else { - RelDataType merged = - context - .relBuilder - .getTypeFactory() - .leastRestrictive(List.of(cols.get(f.getName()), f.getType())); - if (merged != null) cols.put(f.getName(), merged); - } - } - return cols; - } - - private RelNode projectToSchema( - RelNode input, LinkedHashMap targetCols, CalcitePlanContext context) { - RexBuilder rex = context.rexBuilder; - RelBuilder rel = context.relBuilder; - Map idx = new LinkedHashMap<>(); // name to index - List inFields = input.getRowType().getFieldList(); - for (int i = 0; i < inFields.size(); i++) { - String n = inFields.get(i).getName(); - idx.put(n, i); - } - - List exprs = new ArrayList<>(targetCols.size()); - List names = new ArrayList<>(targetCols.size()); - - for (Map.Entry e : targetCols.entrySet()) { - String normName = e.getKey(); - RelDataType toType = e.getValue(); - - RexNode expr; - Integer pos = idx.get(normName); - if (pos != null) { - RexNode ref = RexInputRef.of(pos, input.getRowType()); - if (!ref.getType().equals(toType)) { - ref = rex.makeCast(toType, ref, true); - } - expr = ref; - names.add(inFields.get(pos).getName()); - } else { - expr = rex.makeNullLiteral(toType); - names.add(normName); - } - exprs.add(expr); - } - - rel.project(exprs, names); - - return rel.peek(); - } } From 00aba11c23483d51007359af8e99f25d9788c8c3 Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 20 Oct 2025 16:09:33 +0800 Subject: [PATCH 13/22] apply spotless Signed-off-by: xinyual --- .../java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index e59a8bcfbbc..52079558759 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -32,7 +32,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -52,7 +51,6 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFamily; import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexCorrelVariable; import org.apache.calcite.rex.RexInputRef; From 4c2b311acb4be9b30dea98ea0286389883b24210 Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 20 Oct 2025 16:14:44 +0800 Subject: [PATCH 14/22] remove useless chaneg Signed-off-by: xinyual --- .../sql/ppl/utils/PPLQueryDataAnonymizer.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 97a9e2c9661..79d24632220 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -82,6 +82,7 @@ import org.opensearch.sql.ast.tree.Replace; import org.opensearch.sql.ast.tree.Reverse; import org.opensearch.sql.ast.tree.Rex; +import org.opensearch.sql.ast.tree.SPath; import org.opensearch.sql.ast.tree.Search; import org.opensearch.sql.ast.tree.Sort; import org.opensearch.sql.ast.tree.SpanBin; @@ -720,6 +721,23 @@ public String visitFillNull(FillNull node, String context) { } } + @Override + public String visitSpath(SPath node, String context) { + String child = node.getChild().get(0).accept(this, context); + StringBuilder builder = new StringBuilder(); + builder.append(child).append(" | spath"); + if (node.getInField() != null) { + builder.append(" input=").append(MASK_COLUMN); + } + if (node.getOutField() != null) { + builder.append(" output=").append(MASK_COLUMN); + } + if (node.getPath() != null) { + builder.append(" path=").append(MASK_COLUMN); + } + return builder.toString(); + } + @Override public String visitPatterns(Patterns node, String context) { String child = node.getChild().get(0).accept(this, context); From 8744cb28473846d609736622049bd56378ad4972 Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 20 Oct 2025 16:29:24 +0800 Subject: [PATCH 15/22] add explain IT Signed-off-by: xinyual --- .../sql/calcite/remote/CalciteExplainIT.java | 12 ++++++++++++ .../calcite/explain_appendpipe_command.json | 6 ++++++ 2 files changed, 18 insertions(+) create mode 100644 integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index b6124126d27..02a3a1a7c43 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -797,6 +797,18 @@ public void testExplainAppendCommand() throws IOException { TEST_INDEX_BANK))); } + @Test + public void testExplainAppendPipeCommand() throws IOException { + String expected = loadExpectedPlan("explain_appendpipe_command.json"); + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + Locale.ROOT, + "source=%s | appendpipe [ stats count(balance) as cnt by gender ]", + TEST_INDEX_BANK))); + } + @Test public void testMvjoinExplain() throws IOException { String query = diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json new file mode 100644 index 00000000000..6ec42972a10 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", + "physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..12=[{inputs}], expr#13=[null:BIGINT], proj#0..13=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"birthdate\",\"gender\",\"city\",\"lastname\",\"balance\",\"employer\",\"state\",\"age\",\"email\",\"male\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},cnt=COUNT($1)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"cnt\":{\"value_count\":{\"field\":\"balance\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} \ No newline at end of file From c48fa346705ebfd77679fd6e56fb63991f885a9e Mon Sep 17 00:00:00 2001 From: xinyual Date: Tue, 21 Oct 2025 11:20:25 +0800 Subject: [PATCH 16/22] fix IT Signed-off-by: xinyual --- .../calcite_no_pushdown/explain_appendpipe_command.json | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json new file mode 100644 index 00000000000..2b111e119db --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", + "physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..18=[{inputs}], expr#19=[null:BIGINT], proj#0..12=[{exprs}], cnt=[$t19])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n EnumerableAggregate(group=[{4}], cnt=[COUNT($7)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n" + } +} \ No newline at end of file From 3a23979785404223f616d1616140389d832b8bbc Mon Sep 17 00:00:00 2001 From: xinyual Date: Tue, 21 Oct 2025 13:36:20 +0800 Subject: [PATCH 17/22] apply spotless Signed-off-by: xinyual --- .../sql/calcite/remote/CalciteExplainIT.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 02a3a1a7c43..7abf6065e27 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -801,12 +801,12 @@ public void testExplainAppendCommand() throws IOException { public void testExplainAppendPipeCommand() throws IOException { String expected = loadExpectedPlan("explain_appendpipe_command.json"); assertJsonEqualsIgnoreId( - expected, - explainQueryToString( - String.format( - Locale.ROOT, - "source=%s | appendpipe [ stats count(balance) as cnt by gender ]", - TEST_INDEX_BANK))); + expected, + explainQueryToString( + String.format( + Locale.ROOT, + "source=%s | appendpipe [ stats count(balance) as cnt by gender ]", + TEST_INDEX_BANK))); } @Test From f934efa90ba4c485ca8c9ca9e88f463a070d1be1 Mon Sep 17 00:00:00 2001 From: xinyual Date: Wed, 22 Oct 2025 15:12:29 +0800 Subject: [PATCH 18/22] add doc Signed-off-by: xinyual --- docs/user/ppl/cmd/appendpipe.rst | 89 ++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 docs/user/ppl/cmd/appendpipe.rst diff --git a/docs/user/ppl/cmd/appendpipe.rst b/docs/user/ppl/cmd/appendpipe.rst new file mode 100644 index 00000000000..27ab635c9e3 --- /dev/null +++ b/docs/user/ppl/cmd/appendpipe.rst @@ -0,0 +1,89 @@ +========= +appendpipe +========= + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 2 + + +Description +============ +| Using ``appendpipe`` command to appends the result of the subpipeline to the search results. Unlike a subsearch, the subpipeline is not run first.The subpipeline is run when the search reaches the appendpipe command. +The appendpipe command is used to append the output of transforming commands, such as chart, timechart, stats, and top. +The command aligns columns with the same field names and types. For different column fields between the main search and sub-search, NULL values are filled in the respective rows. + +Version +======= +3.3.0 + +Syntax +============ +appendpipe [] + +* subpipeline: mandatory. A list of commands that are applied to the search results from the commands that occur in the search before the ``appendpipe`` command. + +Example 1: Append rows from a total count to existing search result +==================================================================================== + +This example appends rows from "total by gender" to "sum by gender, state" with merged column of same field name and type. + +PPL query:: + + os> source=accounts | stats sum(age) as part by gender, state | sort -sum | head 5 | appendpipe [ stats sum(part) as total by gender ]; + fetched rows / total rows = 6/6 + +------+--------+-------+-------+ + | part | gender | state | total | + |------+--------+-------+-------| + | 36 | M | TN | null | + | 33 | M | MD | null | + | 32 | M | IL | null | + | 28 | F | VA | null | + | null | F | null | 28 | + | null | M | null | 101 | + +------+--------+-------+-------+ + + + +Example 2: Append rows with merged column names +=============================================================== + +This example appends rows from "count by gender" to "sum by gender, state". + +PPL query:: + + os> source=accounts | stats sum(age) as total by gender, state | sort -`sum(age)` | head 5 | appendpipe [ stats sum(total) as total by gender ]; + fetched rows / total rows = 6/6 + +----------+--------+-------+ + | total | gender | state | + |----------+--------+-------| + | 36 | M | TN | + | 33 | M | MD | + | 32 | M | IL | + | 28 | F | VA | + | 28 | F | null | + | 101 | M | null | + +----------+--------+-------+ + +Example 3: Append rows with column type conflict +============================================= + +This example shows how column type conflicts are handled when appending results. Same name columns with different types will generate two different columns in appended result. + +PPL query:: + + os> source=accounts | stats sum(age) as total by gender, state | sort -`sum(age)` | head 5 | appendpipe [ stats sum(total) as total by gender | eval state = 123 ]; + fetched rows / total rows = 6/6 + +------+--------+-------+--------+ + | sum | gender | state | state0 | + |------+--------+-------+--------| + | 36 | M | TN | null | + | 33 | M | MD | null | + | 32 | M | IL | null | + | 28 | F | VA | null | + | 28 | F | null | 123 | + | 101 | M | null | 123 | + +------+--------+-------+--------+ + From d71e145f0aa2f51fb829fef10f83b532c05e95fb Mon Sep 17 00:00:00 2001 From: xinyual Date: Thu, 23 Oct 2025 10:38:37 +0800 Subject: [PATCH 19/22] optimize doc Signed-off-by: xinyual --- docs/user/ppl/cmd/appendpipe.rst | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/user/ppl/cmd/appendpipe.rst b/docs/user/ppl/cmd/appendpipe.rst index 27ab635c9e3..843a2da28da 100644 --- a/docs/user/ppl/cmd/appendpipe.rst +++ b/docs/user/ppl/cmd/appendpipe.rst @@ -12,7 +12,6 @@ appendpipe Description ============ | Using ``appendpipe`` command to appends the result of the subpipeline to the search results. Unlike a subsearch, the subpipeline is not run first.The subpipeline is run when the search reaches the appendpipe command. -The appendpipe command is used to append the output of transforming commands, such as chart, timechart, stats, and top. The command aligns columns with the same field names and types. For different column fields between the main search and sub-search, NULL values are filled in the respective rows. Version @@ -32,7 +31,7 @@ This example appends rows from "total by gender" to "sum by gender, state" with PPL query:: - os> source=accounts | stats sum(age) as part by gender, state | sort -sum | head 5 | appendpipe [ stats sum(part) as total by gender ]; + os> source=accounts | stats sum(age) as part by gender, state | sort -part | head 5 | appendpipe [ stats sum(part) as total by gender ]; fetched rows / total rows = 6/6 +------+--------+-------+-------+ | part | gender | state | total | @@ -54,7 +53,7 @@ This example appends rows from "count by gender" to "sum by gender, state". PPL query:: - os> source=accounts | stats sum(age) as total by gender, state | sort -`sum(age)` | head 5 | appendpipe [ stats sum(total) as total by gender ]; + os> source=accounts | stats sum(age) as total by gender, state | sort -total | head 5 | appendpipe [ stats sum(total) as total by gender ]; fetched rows / total rows = 6/6 +----------+--------+-------+ | total | gender | state | @@ -74,7 +73,7 @@ This example shows how column type conflicts are handled when appending results. PPL query:: - os> source=accounts | stats sum(age) as total by gender, state | sort -`sum(age)` | head 5 | appendpipe [ stats sum(total) as total by gender | eval state = 123 ]; + os> source=accounts | stats sum(age) as total by gender, state | sort -total | head 5 | appendpipe [ stats sum(total) as total by gender | eval state = 123 ]; fetched rows / total rows = 6/6 +------+--------+-------+--------+ | sum | gender | state | state0 | From 7664cd56a0ba3e55ab0526d08ece76ed30b1f57e Mon Sep 17 00:00:00 2001 From: xinyual Date: Mon, 3 Nov 2025 10:19:36 +0800 Subject: [PATCH 20/22] add UT Signed-off-by: xinyual --- .../java/org/opensearch/sql/ast/dsl/AstDSL.java | 6 ++++++ .../opensearch/sql/ppl/parser/AstBuilderTest.java | 15 +++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index d987cf44cab..6e425cc714d 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -49,6 +49,7 @@ import org.opensearch.sql.ast.expression.WindowFunction; import org.opensearch.sql.ast.expression.Xor; import org.opensearch.sql.ast.tree.Aggregation; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.CountBin; import org.opensearch.sql.ast.tree.Dedupe; @@ -550,6 +551,11 @@ public static Trendline trendline( return new Trendline(sortField, Arrays.asList(computations)).attach(input); } + public static AppendPipe appendPipe(UnresolvedPlan input, UnresolvedPlan subquery) { + + return new AppendPipe(subquery).attach(input); + } + public static Trendline.TrendlineComputation computation( Integer numDataPoints, Field dataField, String alias, Trendline.TrendlineType type) { return new Trendline.TrendlineComputation(numDataPoints, dataField, alias, type); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index b9948e6abe2..bbc3d2c83eb 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -12,6 +12,7 @@ import static org.opensearch.sql.ast.dsl.AstDSL.agg; import static org.opensearch.sql.ast.dsl.AstDSL.aggregate; import static org.opensearch.sql.ast.dsl.AstDSL.alias; +import static org.opensearch.sql.ast.dsl.AstDSL.appendPipe; import static org.opensearch.sql.ast.dsl.AstDSL.argument; import static org.opensearch.sql.ast.dsl.AstDSL.booleanLiteral; import static org.opensearch.sql.ast.dsl.AstDSL.compare; @@ -870,6 +871,20 @@ public void testFillNullValueWithFields() { fillNull(relation("t"), intLiteral(0), true, field("a"), field("b"), field("c"))); } + @Test + public void testAppendPipe() { + assertEqual( + "source=t | appendpipe [ stats COUNT() ]", + appendPipe( + relation("t"), + agg( + null, + exprList(alias("COUNT()", aggregate("count", AstDSL.allFields()))), + emptyList(), + emptyList(), + defaultStatsArgs()))); + } + public void testTrendline() { assertEqual( "source=t | trendline sma(5, test_field) as test_field_alias sma(1, test_field_2) as" From fa2ec9e3951fe50a9103508ca041c12b0dfb2e0a Mon Sep 17 00:00:00 2001 From: xinyual Date: Tue, 4 Nov 2025 10:55:56 +0800 Subject: [PATCH 21/22] fix IT due to performance change Signed-off-by: xinyual --- docs/user/ppl/cmd/appendpipe.rst | 22 +++------------- .../remote/CalcitePPLAppendPipeCommandIT.java | 26 ++++++++----------- .../ppl/calcite/CalcitePPLAppendPipeTest.java | 22 ---------------- 3 files changed, 14 insertions(+), 56 deletions(-) diff --git a/docs/user/ppl/cmd/appendpipe.rst b/docs/user/ppl/cmd/appendpipe.rst index 843a2da28da..43c4dd1e84d 100644 --- a/docs/user/ppl/cmd/appendpipe.rst +++ b/docs/user/ppl/cmd/appendpipe.rst @@ -66,23 +66,7 @@ PPL query:: | 101 | M | null | +----------+--------+-------+ -Example 3: Append rows with column type conflict -============================================= - -This example shows how column type conflicts are handled when appending results. Same name columns with different types will generate two different columns in appended result. - -PPL query:: - - os> source=accounts | stats sum(age) as total by gender, state | sort -total | head 5 | appendpipe [ stats sum(total) as total by gender | eval state = 123 ]; - fetched rows / total rows = 6/6 - +------+--------+-------+--------+ - | sum | gender | state | state0 | - |------+--------+-------+--------| - | 36 | M | TN | null | - | 33 | M | MD | null | - | 32 | M | IL | null | - | 28 | F | VA | null | - | 28 | F | null | 123 | - | 101 | M | null | 123 | - +------+--------+-------+--------+ +Limitations +=========== +* **Schema Compatibility**: Same as command ``append``, when fields with the same name exist between the main search and sub-search but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns). diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java index 5366cb83145..d25d3ca80db 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java @@ -75,20 +75,16 @@ public void testAppendpipeWithMergedColumn() throws IOException { @Test public void testAppendpipeWithConflictTypeColumn() throws IOException { - JSONObject actual = - executeQuery( - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum by gender | appendpipe [ eval sum = cast(sum as" - + " double) ] | head 5", - TEST_INDEX_ACCOUNT)); - verifySchemaInOrder( - actual, schema("sum", "bigint"), schema("gender", "string"), schema("sum0", "double")); - verifyDataRows( - actual, - rows(14947, "F", null), - rows(15224, "M", null), - rows(null, "F", 14947d), - rows(null, "M", 15224d)); + Exception exception = + assertThrows( + Exception.class, + () -> + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender | appendpipe [ eval sum =" + + " cast(sum as double) ] | head 5", + TEST_INDEX_ACCOUNT))); + assertTrue(exception.getMessage().contains("due to incompatible types")); } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java index 8887ae803a9..faf944da4a0 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java @@ -59,26 +59,4 @@ public void testAppendPipeWithMergedColumns() { + "FROM `scott`.`EMP`"; verifyPPLToSparkSQL(root, expectedSparkSql); } - - @Test - public void testAppendPipeWithConflictTypeColumn() { - String ppl = "source=EMP | fields DEPTNO | appendpipe [ | fields DEPTNO | eval DEPTNO = 20 ]"; - RelNode root = getRelNode(ppl); - String expectedLogical = - "LogicalUnion(all=[true])\n" - + " LogicalProject(DEPTNO=[$7], DEPTNO0=[null:INTEGER])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(DEPTNO=[null:TINYINT], DEPTNO0=[20])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; - verifyLogical(root, expectedLogical); - verifyResultCount(root, 28); - - String expectedSparkSql = - "SELECT `DEPTNO`, CAST(NULL AS INTEGER) `DEPTNO0`\n" - + "FROM `scott`.`EMP`\n" - + "UNION ALL\n" - + "SELECT CAST(NULL AS TINYINT) `DEPTNO`, 20 `DEPTNO0`\n" - + "FROM `scott`.`EMP`"; - verifyPPLToSparkSQL(root, expectedSparkSql); - } } From fe1b23f69a2ff286093831a25a62ea2d4be81c32 Mon Sep 17 00:00:00 2001 From: xinyual Date: Wed, 12 Nov 2025 10:19:20 +0800 Subject: [PATCH 22/22] add multiply children check Signed-off-by: xinyual --- .../java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 6bf8ae45a75..859ad4a714f 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -248,6 +248,9 @@ public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) { while (childNode.getChild() != null && !childNode.getChild().isEmpty() && !(childNode.getChild().getFirst() instanceof Values)) { + if (childNode.getChild().size() > 1) { + throw new RuntimeException("AppendPipe doesn't support multiply children subquery."); + } childNode = (UnresolvedPlan) childNode.getChild().getFirst(); } childNode.attach(node.getChild().getFirst());