Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Append;
import org.opensearch.sql.ast.tree.AppendCol;
import org.opensearch.sql.ast.tree.AppendPipe;
import org.opensearch.sql.ast.tree.Bin;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
Expand Down Expand Up @@ -822,6 +823,11 @@ public LogicalPlan visitAppendCol(AppendCol node, AnalysisContext context) {
throw getOnlyForCalciteException("Appendcol");
}

@Override
public LogicalPlan visitAppendPipe(AppendPipe node, AnalysisContext context) {
throw getOnlyForCalciteException("AppendPipe");
}

@Override
public LogicalPlan visitAppend(Append node, AnalysisContext context) {
throw getOnlyForCalciteException("Append");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Append;
import org.opensearch.sql.ast.tree.AppendCol;
import org.opensearch.sql.ast.tree.AppendPipe;
import org.opensearch.sql.ast.tree.Bin;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
Expand Down Expand Up @@ -138,6 +139,10 @@ public T visitSearch(Search node, C context) {
return visitChildren(node, context);
}

public T visitAppendPipe(AppendPipe node, C context) {
return visitChildren(node, context);
}

public T visitFilter(Filter node, C context) {
return visitChildren(node, context);
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.sql.ast.expression.WindowFunction;
import org.opensearch.sql.ast.expression.Xor;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.AppendPipe;
import org.opensearch.sql.ast.tree.Bin;
import org.opensearch.sql.ast.tree.CountBin;
import org.opensearch.sql.ast.tree.Dedupe;
Expand Down Expand Up @@ -550,6 +551,11 @@ public static Trendline trendline(
return new Trendline(sortField, Arrays.asList(computations)).attach(input);
}

public static AppendPipe appendPipe(UnresolvedPlan input, UnresolvedPlan subquery) {

return new AppendPipe(subquery).attach(input);
}

public static Trendline.TrendlineComputation computation(
Integer numDataPoints, Field dataField, String alias, Trendline.TrendlineType type) {
return new Trendline.TrendlineComputation(numDataPoints, dataField, alias, type);
Expand Down
45 changes: 45 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;

@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
public class AppendPipe extends UnresolvedPlan {

private UnresolvedPlan subQuery;

private UnresolvedPlan child;

public AppendPipe(UnresolvedPlan subQuery) {
this.subQuery = subQuery;
}

@Override
public AppendPipe attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitAppendPipe(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Append;
import org.opensearch.sql.ast.tree.AppendCol;
import org.opensearch.sql.ast.tree.AppendPipe;
import org.opensearch.sql.ast.tree.Bin;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
Expand Down Expand Up @@ -236,6 +237,25 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) {
return context.relBuilder.peek();
}

@Override
public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) {
visitChildren(node, context);
UnresolvedPlan subqueryPlan = node.getSubQuery();
UnresolvedPlan childNode = subqueryPlan;
while (childNode.getChild() != null
&& !childNode.getChild().isEmpty()
&& !(childNode.getChild().getFirst() instanceof Values)) {
childNode = (UnresolvedPlan) childNode.getChild().getFirst();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be more than 1 children in the subquery? Maybe adding assertion and throw exception to avoid this case if we don't support.

}
childNode.attach(node.getChild().getFirst());

subqueryPlan.accept(this, context);

RelNode subPipelineNode = context.relBuilder.build();
RelNode mainNode = context.relBuilder.build();
return mergeTableAndResolveColumnConflict(mainNode, subPipelineNode, context);
}

@Override
public RelNode visitRegex(Regex node, CalcitePlanContext context) {
visitChildren(node, context);
Expand Down Expand Up @@ -1706,9 +1726,13 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) {
// 3. Merge two query schemas using shared logic
RelNode subsearchNode = context.relBuilder.build();
RelNode mainNode = context.relBuilder.build();
return mergeTableAndResolveColumnConflict(mainNode, subsearchNode, context);
}

private RelNode mergeTableAndResolveColumnConflict(
RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) {
// Use shared schema merging logic that handles type conflicts via field renaming
List<RelNode> nodesToMerge = Arrays.asList(mainNode, subsearchNode);
List<RelNode> nodesToMerge = Arrays.asList(mainNode, subqueryNode);
List<RelNode> projectedNodes =
SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context);

Expand Down
88 changes: 88 additions & 0 deletions docs/user/ppl/cmd/appendpipe.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
=========
appendpipe
=========

.. rubric:: Table of contents

.. contents::
:local:
:depth: 2


Description
============
| Using ``appendpipe`` command to appends the result of the subpipeline to the search results. Unlike a subsearch, the subpipeline is not run first.The subpipeline is run when the search reaches the appendpipe command.
The command aligns columns with the same field names and types. For different column fields between the main search and sub-search, NULL values are filled in the respective rows.

Version
=======
3.3.0

Syntax
============
appendpipe [<subpipeline>]

* subpipeline: mandatory. A list of commands that are applied to the search results from the commands that occur in the search before the ``appendpipe`` command.

Example 1: Append rows from a total count to existing search result
====================================================================================

This example appends rows from "total by gender" to "sum by gender, state" with merged column of same field name and type.

PPL query::

os> source=accounts | stats sum(age) as part by gender, state | sort -part | head 5 | appendpipe [ stats sum(part) as total by gender ];
fetched rows / total rows = 6/6
+------+--------+-------+-------+
| part | gender | state | total |
|------+--------+-------+-------|
| 36 | M | TN | null |
| 33 | M | MD | null |
| 32 | M | IL | null |
| 28 | F | VA | null |
| null | F | null | 28 |
| null | M | null | 101 |
+------+--------+-------+-------+



Example 2: Append rows with merged column names
===============================================================

This example appends rows from "count by gender" to "sum by gender, state".

PPL query::

os> source=accounts | stats sum(age) as total by gender, state | sort -total | head 5 | appendpipe [ stats sum(total) as total by gender ];
fetched rows / total rows = 6/6
+----------+--------+-------+
| total | gender | state |
|----------+--------+-------|
| 36 | M | TN |
| 33 | M | MD |
| 32 | M | IL |
| 28 | F | VA |
| 28 | F | null |
| 101 | M | null |
+----------+--------+-------+

Example 3: Append rows with column type conflict
=============================================

This example shows how column type conflicts are handled when appending results. Same name columns with different types will generate two different columns in appended result.

PPL query::

os> source=accounts | stats sum(age) as total by gender, state | sort -total | head 5 | appendpipe [ stats sum(total) as total by gender | eval state = 123 ];
fetched rows / total rows = 6/6
+------+--------+-------+--------+
| sum | gender | state | state0 |
|------+--------+-------+--------|
| 36 | M | TN | null |
| 33 | M | MD | null |
| 32 | M | IL | null |
| 28 | F | VA | null |
| 28 | F | null | 123 |
| 101 | M | null | 123 |
+------+--------+-------+--------+

Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,18 @@ public void testExplainAppendCommand() throws IOException {
TEST_INDEX_BANK)));
}

@Test
public void testExplainAppendPipeCommand() throws IOException {
String expected = loadExpectedPlan("explain_appendpipe_command.json");
assertJsonEqualsIgnoreId(
expected,
explainQueryToString(
String.format(
Locale.ROOT,
"source=%s | appendpipe [ stats count(balance) as cnt by gender ]",
TEST_INDEX_BANK)));
}

@Test
public void testMvjoinExplain() throws IOException {
String query =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder;

import java.io.IOException;
import java.util.Locale;
import org.json.JSONObject;
import org.junit.Test;
import org.opensearch.sql.ppl.PPLIntegTestCase;

public class CalcitePPLAppendPipeCommandIT extends PPLIntegTestCase {
@Override
public void init() throws Exception {
super.init();
enableCalcite();
loadIndex(Index.ACCOUNT);
loadIndex(Index.BANK);
}

@Test
public void testAppendPipe() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Locale.ROOT,
"source=%s | stats sum(age) as sum_age_by_gender by gender | appendpipe [ "
+ " sort -sum_age_by_gender ] |"
+ " head 5",
TEST_INDEX_ACCOUNT));
verifySchemaInOrder(actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string"));
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(15224, "M"), rows(14947, "F"));
}

@Test
public void testAppendDifferentIndex() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Locale.ROOT,
"source=%s | stats sum(age) as sum by gender | append [ source=%s | stats"
+ " sum(age) as bank_sum_age ]",
TEST_INDEX_ACCOUNT,
TEST_INDEX_BANK));
verifySchemaInOrder(
actual,
schema("sum", "bigint"),
schema("gender", "string"),
schema("bank_sum_age", "bigint"));
verifyDataRows(actual, rows(14947, "F", null), rows(15224, "M", null), rows(null, null, 238));
}

@Test
public void testAppendpipeWithMergedColumn() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Locale.ROOT,
"source=%s | stats sum(age) as sum by gender |"
+ " appendpipe [ stats sum(sum) as sum ] | head 5",
TEST_INDEX_ACCOUNT,
TEST_INDEX_ACCOUNT));
verifySchemaInOrder(actual, schema("sum", "bigint"), schema("gender", "string"));
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(30171, null));
}

@Test
public void testAppendpipeWithConflictTypeColumn() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Locale.ROOT,
"source=%s | stats sum(age) as sum by gender | appendpipe [ eval sum = cast(sum as"
+ " double) ] | head 5",
TEST_INDEX_ACCOUNT));
verifySchemaInOrder(
actual, schema("sum", "bigint"), schema("gender", "string"), schema("sum0", "double"));
verifyDataRows(
actual,
rows(14947, "F", null),
rows(15224, "M", null),
rows(null, "F", 14947d),
rows(null, "M", 15224d));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n",
"physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..12=[{inputs}], expr#13=[null:BIGINT], proj#0..13=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"birthdate\",\"gender\",\"city\",\"lastname\",\"balance\",\"employer\",\"state\",\"age\",\"email\",\"male\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},cnt=COUNT($1)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"cnt\":{\"value_count\":{\"field\":\"balance\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n",
"physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..18=[{inputs}], expr#19=[null:BIGINT], proj#0..12=[{exprs}], cnt=[$t19])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n EnumerableAggregate(group=[{4}], cnt=[COUNT($7)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n"
}
}
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ BUFFER_LIMIT: 'BUFFER_LIMIT';
LABEL: 'LABEL';
SHOW_NUMBERED_TOKEN: 'SHOW_NUMBERED_TOKEN';
AGGREGATION: 'AGGREGATION';
APPENDPIPE: 'APPENDPIPE';

//Native JOIN KEYWORDS
JOIN: 'JOIN';
Expand Down
Loading
Loading