Skip to content

Commit d2a864f

Browse files
asl3gengliangwang
andcommitted
[SPARK-51747][SQL] Data source cached plan should respect options
### What changes were proposed in this pull request? Data source cached plan should respect options, such as CSV delimiter. Before this, DataSourceStrategy caches the first plan and reuses it in the future, ignoring updated options. This change returns a **new plan** if options are changed. ### Why are the changes needed? For example: ``` spark.sql("CREATE TABLE t(a string, b string) USING CSV".stripMargin) spark.sql("INSERT INTO TABLE t VALUES ('a;b', 'c')") spark.sql("SELECT * FROM t").show() spark.sql("SELECT * FROM t WITH ('delimiter' = ';')") ``` Expected output: ``` +----+----+ |col1|col2| +----+----+ | a;b| c| +----+----+ +----+----+ |col1|col2| +----+----+ | a| b,c| +----+----+ ``` Output before this PR: ``` +----+----+ |col1|col2| +----+----+ | a;b| c| +----+----+ +----+----+ |col1|col2| +----+----+ | a;b| c| +----+----+ ``` The PR is needed to get the expected result. ### Does this PR introduce _any_ user-facing change? Yes, corrects the caching behavior from DataSourceStrategy ### How was this patch tested? Added test in DDLSuite.scala ### Was this patch authored or co-authored using generative AI tooling? No Closes #50538 from asl3/asl3/datasourcestrategycacheoptions. Lead-authored-by: Amanda Liu <amanda.liu@databricks.com> Co-authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent a9987a3 commit d2a864f

File tree

2 files changed

+59
-14
lines changed

2 files changed

+59
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.Locale
2121

2222
import scala.collection.immutable.ListMap
2323
import scala.collection.mutable
24+
import scala.jdk.CollectionConverters._
2425

2526
import org.apache.hadoop.fs.Path
2627

@@ -256,20 +257,35 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
256257
QualifiedTableName(table.identifier.catalog.get, table.database, table.identifier.table)
257258
val catalog = sparkSession.sessionState.catalog
258259
val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table)
259-
catalog.getCachedPlan(qualifiedTableName, () => {
260-
val dataSource =
261-
DataSource(
262-
sparkSession,
263-
// In older version(prior to 2.1) of Spark, the table schema can be empty and should be
264-
// inferred at runtime. We should still support it.
265-
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
266-
partitionColumns = table.partitionColumnNames,
267-
bucketSpec = table.bucketSpec,
268-
className = table.provider.get,
269-
options = dsOptions,
270-
catalogTable = Some(table))
271-
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
272-
})
260+
catalog.getCachedTable(qualifiedTableName) match {
261+
case null =>
262+
val dataSource =
263+
DataSource(
264+
sparkSession,
265+
// In older version(prior to 2.1) of Spark, the table schema can be empty and should be
266+
// inferred at runtime. We should still support it.
267+
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
268+
partitionColumns = table.partitionColumnNames,
269+
bucketSpec = table.bucketSpec,
270+
className = table.provider.get,
271+
options = dsOptions,
272+
catalogTable = Some(table))
273+
val plan = LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
274+
catalog.cacheTable(qualifiedTableName, plan)
275+
plan
276+
277+
// If the cached table relation's options differ from the new options:
278+
// 1. Create a new HadoopFsRelation with updated options
279+
// 2. Return a new LogicalRelation with the updated HadoopFsRelation
280+
// This ensures the relation reflects any changes in data source options
281+
case r @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _, _)
282+
if new CaseInsensitiveStringMap(fsRelation.options.asJava) !=
283+
new CaseInsensitiveStringMap(dsOptions.asJava) =>
284+
val newFsRelation = fsRelation.copy(options = dsOptions)(sparkSession)
285+
r.copy(relation = newFsRelation)
286+
287+
case other => other
288+
}
273289
}
274290

275291
private def getStreamingRelation(

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,6 +1376,35 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
13761376
}
13771377
}
13781378

1379+
test("SPARK-51747: Data source cached plan should respect options") {
1380+
withTable("t") {
1381+
spark.sql("CREATE TABLE t(a string, b string) USING CSV".stripMargin)
1382+
spark.sql("INSERT INTO TABLE t VALUES ('a;b', 'c')")
1383+
spark.sql("INSERT INTO TABLE t VALUES ('hello; world', 'test')")
1384+
1385+
// check initial contents of table
1386+
checkAnswer(spark.table("t"), Row("a;b", "c") :: Row("hello; world", "test") :: Nil)
1387+
1388+
// no option
1389+
checkAnswer(
1390+
spark.sql("SELECT * FROM t"),
1391+
Row("a;b", "c") :: Row("hello; world", "test") :: Nil
1392+
)
1393+
1394+
// respect delimiter option
1395+
checkAnswer(
1396+
spark.sql("SELECT * FROM t WITH ('delimiter' = ';')"),
1397+
Row("a", "b,c") :: Row("hello", " world,test") :: Nil
1398+
)
1399+
1400+
// respect lineSep option
1401+
checkAnswer(
1402+
spark.sql("SELECT * FROM t WITH ('lineSep' = ';')"),
1403+
Row("a", null) :: Row("b", "c\n") :: Row("hello", null) :: Row(" world", "test\n") :: Nil
1404+
)
1405+
}
1406+
}
1407+
13791408
test("SPARK-18009 calling toLocalIterator on commands") {
13801409
import scala.jdk.CollectionConverters._
13811410
val df = sql("show databases")

0 commit comments

Comments
 (0)