generated from amazon-archives/__template_Custom
-
Notifications
You must be signed in to change notification settings - Fork 178
Support appendpipecommand in PPL
#4602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+408
−1
Merged
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
759aab4
add demos
xinyual 00a1563
add missing column
xinyual ddf26a4
add appendpipe poc
xinyual 7233111
merge from main
xinyual a8dd2ca
slighty change syntax
xinyual c3549f5
add unresolved plan
xinyual 9dfc626
add IT
xinyual a3e78a8
Merge remote-tracking branch 'origin/main' into addappendPipe
xinyual f88730f
add tests
xinyual 4c281cc
remove useless ut
xinyual 70e368d
merge from main
xinyual 93eb5b3
fix conflict
xinyual 9148512
remove useless code
xinyual 345cdeb
remove useless code
xinyual 3d4d865
remove useless code
xinyual 00aba11
apply spotless
xinyual 4c2b311
remove useless chaneg
xinyual 8744cb2
add explain IT
xinyual c48fa34
fix IT
xinyual 3a23979
apply spotless
xinyual f934efa
add doc
xinyual d71e145
optimize doc
xinyual 7664cd5
add UT
xinyual 1624912
Merge remote-tracking branch 'origin/main' into addAppendpipe
xinyual fa2ec9e
fix IT due to performance change
xinyual fe1b23f
add multiply children check
xinyual f1b3a81
merge from main
xinyual File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
45 changes: 45 additions & 0 deletions
45
core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.ast.tree; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import java.util.List; | ||
| import lombok.EqualsAndHashCode; | ||
| import lombok.Getter; | ||
| import lombok.Setter; | ||
| import lombok.ToString; | ||
| import org.opensearch.sql.ast.AbstractNodeVisitor; | ||
|
|
||
| @Getter | ||
| @Setter | ||
| @ToString | ||
| @EqualsAndHashCode(callSuper = false) | ||
| public class AppendPipe extends UnresolvedPlan { | ||
|
|
||
| private UnresolvedPlan subQuery; | ||
|
|
||
| private UnresolvedPlan child; | ||
|
|
||
| public AppendPipe(UnresolvedPlan subQuery) { | ||
| this.subQuery = subQuery; | ||
| } | ||
|
|
||
| @Override | ||
| public AppendPipe attach(UnresolvedPlan child) { | ||
| this.child = child; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public List<UnresolvedPlan> getChild() { | ||
| return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); | ||
| } | ||
|
|
||
| @Override | ||
| public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) { | ||
| return nodeVisitor.visitAppendPipe(this, context); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| ========= | ||
| appendpipe | ||
| ========= | ||
|
|
||
| .. rubric:: Table of contents | ||
|
|
||
| .. contents:: | ||
| :local: | ||
| :depth: 2 | ||
|
|
||
|
|
||
| Description | ||
| ============ | ||
| | Using ``appendpipe`` command to appends the result of the subpipeline to the search results. Unlike a subsearch, the subpipeline is not run first.The subpipeline is run when the search reaches the appendpipe command. | ||
| The command aligns columns with the same field names and types. For different column fields between the main search and sub-search, NULL values are filled in the respective rows. | ||
|
|
||
| Version | ||
| ======= | ||
| 3.3.0 | ||
|
|
||
| Syntax | ||
| ============ | ||
| appendpipe [<subpipeline>] | ||
|
|
||
| * subpipeline: mandatory. A list of commands that are applied to the search results from the commands that occur in the search before the ``appendpipe`` command. | ||
|
|
||
| Example 1: Append rows from a total count to existing search result | ||
| ==================================================================================== | ||
|
|
||
| This example appends rows from "total by gender" to "sum by gender, state" with merged column of same field name and type. | ||
|
|
||
| PPL query:: | ||
|
|
||
| os> source=accounts | stats sum(age) as part by gender, state | sort -part | head 5 | appendpipe [ stats sum(part) as total by gender ]; | ||
| fetched rows / total rows = 6/6 | ||
| +------+--------+-------+-------+ | ||
| | part | gender | state | total | | ||
| |------+--------+-------+-------| | ||
| | 36 | M | TN | null | | ||
| | 33 | M | MD | null | | ||
| | 32 | M | IL | null | | ||
| | 28 | F | VA | null | | ||
| | null | F | null | 28 | | ||
| | null | M | null | 101 | | ||
| +------+--------+-------+-------+ | ||
|
|
||
|
|
||
|
|
||
| Example 2: Append rows with merged column names | ||
| =============================================================== | ||
|
|
||
| This example appends rows from "count by gender" to "sum by gender, state". | ||
|
|
||
| PPL query:: | ||
|
|
||
| os> source=accounts | stats sum(age) as total by gender, state | sort -total | head 5 | appendpipe [ stats sum(total) as total by gender ]; | ||
| fetched rows / total rows = 6/6 | ||
| +----------+--------+-------+ | ||
| | total | gender | state | | ||
| |----------+--------+-------| | ||
| | 36 | M | TN | | ||
| | 33 | M | MD | | ||
| | 32 | M | IL | | ||
| | 28 | F | VA | | ||
| | 28 | F | null | | ||
| | 101 | M | null | | ||
| +----------+--------+-------+ | ||
|
|
||
| Example 3: Append rows with column type conflict | ||
| ============================================= | ||
|
|
||
| This example shows how column type conflicts are handled when appending results. Same name columns with different types will generate two different columns in appended result. | ||
|
|
||
| PPL query:: | ||
|
|
||
| os> source=accounts | stats sum(age) as total by gender, state | sort -total | head 5 | appendpipe [ stats sum(total) as total by gender | eval state = 123 ]; | ||
| fetched rows / total rows = 6/6 | ||
| +------+--------+-------+--------+ | ||
| | sum | gender | state | state0 | | ||
| |------+--------+-------+--------| | ||
| | 36 | M | TN | null | | ||
| | 33 | M | MD | null | | ||
| | 32 | M | IL | null | | ||
| | 28 | F | VA | null | | ||
| | 28 | F | null | 123 | | ||
| | 101 | M | null | 123 | | ||
| +------+--------+-------+--------+ | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
94 changes: 94 additions & 0 deletions
94
...g-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.calcite.remote; | ||
|
|
||
| import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; | ||
| import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; | ||
| import static org.opensearch.sql.util.MatcherUtils.rows; | ||
| import static org.opensearch.sql.util.MatcherUtils.schema; | ||
| import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; | ||
| import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Locale; | ||
| import org.json.JSONObject; | ||
| import org.junit.Test; | ||
| import org.opensearch.sql.ppl.PPLIntegTestCase; | ||
|
|
||
| public class CalcitePPLAppendPipeCommandIT extends PPLIntegTestCase { | ||
| @Override | ||
| public void init() throws Exception { | ||
| super.init(); | ||
| enableCalcite(); | ||
| loadIndex(Index.ACCOUNT); | ||
| loadIndex(Index.BANK); | ||
| } | ||
|
|
||
| @Test | ||
| public void testAppendPipe() throws IOException { | ||
| JSONObject actual = | ||
| executeQuery( | ||
| String.format( | ||
| Locale.ROOT, | ||
| "source=%s | stats sum(age) as sum_age_by_gender by gender | appendpipe [ " | ||
| + " sort -sum_age_by_gender ] |" | ||
| + " head 5", | ||
| TEST_INDEX_ACCOUNT)); | ||
| verifySchemaInOrder(actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string")); | ||
| verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(15224, "M"), rows(14947, "F")); | ||
| } | ||
|
|
||
| @Test | ||
| public void testAppendDifferentIndex() throws IOException { | ||
| JSONObject actual = | ||
| executeQuery( | ||
| String.format( | ||
| Locale.ROOT, | ||
| "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats" | ||
| + " sum(age) as bank_sum_age ]", | ||
| TEST_INDEX_ACCOUNT, | ||
| TEST_INDEX_BANK)); | ||
| verifySchemaInOrder( | ||
| actual, | ||
| schema("sum", "bigint"), | ||
| schema("gender", "string"), | ||
| schema("bank_sum_age", "bigint")); | ||
| verifyDataRows(actual, rows(14947, "F", null), rows(15224, "M", null), rows(null, null, 238)); | ||
| } | ||
|
|
||
| @Test | ||
| public void testAppendpipeWithMergedColumn() throws IOException { | ||
| JSONObject actual = | ||
| executeQuery( | ||
| String.format( | ||
| Locale.ROOT, | ||
| "source=%s | stats sum(age) as sum by gender |" | ||
| + " appendpipe [ stats sum(sum) as sum ] | head 5", | ||
| TEST_INDEX_ACCOUNT, | ||
| TEST_INDEX_ACCOUNT)); | ||
| verifySchemaInOrder(actual, schema("sum", "bigint"), schema("gender", "string")); | ||
| verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(30171, null)); | ||
yuancu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| @Test | ||
| public void testAppendpipeWithConflictTypeColumn() throws IOException { | ||
| JSONObject actual = | ||
| executeQuery( | ||
| String.format( | ||
| Locale.ROOT, | ||
| "source=%s | stats sum(age) as sum by gender | appendpipe [ eval sum = cast(sum as" | ||
| + " double) ] | head 5", | ||
| TEST_INDEX_ACCOUNT)); | ||
| verifySchemaInOrder( | ||
| actual, schema("sum", "bigint"), schema("gender", "string"), schema("sum0", "double")); | ||
| verifyDataRows( | ||
| actual, | ||
| rows(14947, "F", null), | ||
| rows(15224, "M", null), | ||
| rows(null, "F", 14947d), | ||
| rows(null, "M", 15224d)); | ||
| } | ||
| } | ||
6 changes: 6 additions & 0 deletions
6
integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "calcite": { | ||
| "logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", | ||
| "physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..12=[{inputs}], expr#13=[null:BIGINT], proj#0..13=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"birthdate\",\"gender\",\"city\",\"lastname\",\"balance\",\"employer\",\"state\",\"age\",\"email\",\"male\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},cnt=COUNT($1)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"cnt\":{\"value_count\":{\"field\":\"balance\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" | ||
| } | ||
| } |
6 changes: 6 additions & 0 deletions
6
...est/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "calcite": { | ||
| "logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", | ||
| "physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..18=[{inputs}], expr#19=[null:BIGINT], proj#0..12=[{exprs}], cnt=[$t19])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n EnumerableAggregate(group=[{4}], cnt=[COUNT($7)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n" | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be more than 1 children in the subquery? Maybe adding assertion and throw exception to avoid this case if we don't support.