Skip to content

Commit 1a81198

Browse files
authored
Add default subquery aliases in join condition for lookup command (#1087)
* Add default subquery aliases in join condition for lookup command Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix style Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 304e021 commit 1a81198

File tree

2 files changed

+67
-15
lines changed

2 files changed

+67
-15
lines changed

integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLLookupITSuite.scala

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ class FlintSparkPPLLookupITSuite
2323
/** Test table and index name */
2424
private val sourceTable = "spark_catalog.default.flint_ppl_test1"
2525
private val lookupTable = "spark_catalog.default.flint_ppl_test2"
26+
private val sourceTable2 = "spark_catalog.default.flint_ppl_test3"
2627

2728
override def beforeAll(): Unit = {
2829
super.beforeAll()
2930
createPeopleTable(sourceTable)
3031
createWorkInformationTable(lookupTable)
32+
createPeopleTable(sourceTable2)
33+
3134
}
3235

3336
protected override def afterEach(): Unit = {
@@ -62,7 +65,9 @@ class FlintSparkPPLLookupITSuite
6265

6366
val lookupProject =
6467
Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias)
65-
val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id"))
68+
val joinCondition = EqualTo(
69+
UnresolvedAttribute("__auto_generated_subquery_name_l.uid"),
70+
UnresolvedAttribute("__auto_generated_subquery_name_s.id"))
6671
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
6772
val projectAfterJoin = Project(
6873
Seq(
@@ -95,7 +100,9 @@ class FlintSparkPPLLookupITSuite
95100

96101
val lookupProject =
97102
Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias)
98-
val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id"))
103+
val joinCondition = EqualTo(
104+
UnresolvedAttribute("__auto_generated_subquery_name_l.uid"),
105+
UnresolvedAttribute("__auto_generated_subquery_name_s.id"))
99106
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
100107
val coalesceExpr =
101108
Coalesce(
@@ -132,7 +139,9 @@ class FlintSparkPPLLookupITSuite
132139

133140
val lookupProject =
134141
Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias)
135-
val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id"))
142+
val joinCondition = EqualTo(
143+
UnresolvedAttribute("__auto_generated_subquery_name_l.uid"),
144+
UnresolvedAttribute("__auto_generated_subquery_name_s.id"))
136145
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
137146
val projectAfterJoin = Project(
138147
Seq(
@@ -164,7 +173,9 @@ class FlintSparkPPLLookupITSuite
164173

165174
val lookupProject =
166175
Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias)
167-
val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id"))
176+
val joinCondition = EqualTo(
177+
UnresolvedAttribute("__auto_generated_subquery_name_l.uid"),
178+
UnresolvedAttribute("__auto_generated_subquery_name_s.id"))
168179
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
169180
val coalesceExpr =
170181
Coalesce(
@@ -208,7 +219,9 @@ class FlintSparkPPLLookupITSuite
208219
lookupAlias)
209220
val joinCondition =
210221
And(
211-
EqualTo(UnresolvedAttribute("uID"), UnresolvedAttribute("id")),
222+
EqualTo(
223+
UnresolvedAttribute("__auto_generated_subquery_name_l.uID"),
224+
UnresolvedAttribute("__auto_generated_subquery_name_s.id")),
212225
EqualTo(
213226
UnresolvedAttribute("__auto_generated_subquery_name_l.name"),
214227
UnresolvedAttribute("__auto_generated_subquery_name_s.name")))
@@ -253,7 +266,9 @@ class FlintSparkPPLLookupITSuite
253266
lookupAlias)
254267
val joinCondition =
255268
And(
256-
EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("ID")),
269+
EqualTo(
270+
UnresolvedAttribute("__auto_generated_subquery_name_l.uid"),
271+
UnresolvedAttribute("__auto_generated_subquery_name_s.ID")),
257272
EqualTo(
258273
UnresolvedAttribute("__auto_generated_subquery_name_l.name"),
259274
UnresolvedAttribute("__auto_generated_subquery_name_s.name")))
@@ -519,4 +534,46 @@ class FlintSparkPPLLookupITSuite
519534
ex.getMessage.contains(
520535
"A column or function parameter with name `new_col` cannot be resolved"))
521536
}
537+
538+
test("test LOOKUP lookupTable name as id shouldn't throw exception") {
539+
sql(s"""
540+
| DROP TABLE IF EXISTS s
541+
| """.stripMargin)
542+
sql(s"""
543+
| DROP TABLE IF EXISTS l
544+
| """.stripMargin)
545+
sql(s"""
546+
| CREATE TABLE s
547+
| (
548+
| id INT,
549+
| uid INT,
550+
| name STRING
551+
| )
552+
| USING $tableType $tableOptions
553+
|""".stripMargin)
554+
sql(s"""
555+
| INSERT INTO s
556+
| VALUES (1, 4, 'b'),
557+
| (2, 5, 'bb'),
558+
| (3, 6, 'ccc')
559+
| """.stripMargin)
560+
561+
sql(s"""
562+
| CREATE TABLE l
563+
| (
564+
| id INT,
565+
| name STRING
566+
| )
567+
| USING $tableType $tableOptions
568+
|""".stripMargin)
569+
sql(s"""
570+
| INSERT INTO l
571+
| VALUES (4, 'x'),
572+
| (5, 'xx')
573+
| """.stripMargin)
574+
val frame =
575+
sql("source = s | LOOKUP l id as uid REPLACE name")
576+
val expectedResults: Array[Row] = Array(Row(4, "x"), Row(5, "xx"), Row(6, null))
577+
assertSameRows(expectedResults, frame)
578+
}
522579
}

ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/LookupTransformer.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,10 @@ static Expression buildLookupMappingCondition(
6262
for (Map.Entry<Field, Field> entry : node.getLookupMappingMap().entrySet()) {
6363
Expression lookupNamedExpression;
6464
Expression sourceNamedExpression;
65-
if (entry.getKey().getField() == entry.getValue().getField()) {
66-
Field lookupWithAlias = buildFieldWithLookupSubqueryAlias(node, entry.getKey());
67-
Field sourceWithAlias = buildFieldWithSourceSubqueryAlias(node, entry.getValue());
68-
lookupNamedExpression = expressionAnalyzer.visitField(lookupWithAlias, context);
69-
sourceNamedExpression = expressionAnalyzer.visitField(sourceWithAlias, context);
70-
} else {
71-
lookupNamedExpression = expressionAnalyzer.visitField(entry.getKey(), context);
72-
sourceNamedExpression = expressionAnalyzer.visitField(entry.getValue(), context);
73-
}
65+
Field lookupWithAlias = buildFieldWithLookupSubqueryAlias(node, entry.getKey());
66+
Field sourceWithAlias = buildFieldWithSourceSubqueryAlias(node, entry.getValue());
67+
lookupNamedExpression = expressionAnalyzer.visitField(lookupWithAlias, context);
68+
sourceNamedExpression = expressionAnalyzer.visitField(sourceWithAlias, context);
7469

7570
Expression equalTo = EqualTo$.MODULE$.apply(lookupNamedExpression, sourceNamedExpression);
7671
equiConditions.add(equalTo);

0 commit comments

Comments
 (0)