Skip to content

Commit 16784fa

Browse files
vrozovHeartSaVioR
authored andcommitted
[SPARK-50854][SS] Make path fully qualified before passing it to FileStreamSink
### What changes were proposed in this pull request? 1. Ensure that if relative path is used in `DataStreamWriter`, the path resolution is done on the Spark Driver and is not deferred to Spark Executor. 2. Construct fully qualified path in `DataSource` similar to how it is done for `DataFrameWriter` before it is passed to `FileStreamSink`. 3. Add a check to `FileStreamSink` that asserts that `path` is an absolute path. https://lists.apache.org/thread/ffzwn1y2fgyjw0j09cv4np9z00wymxwv ### Why are the changes needed? To properly support relative paths in structured streaming. The use case mostly applies to single node local Spark cluster. ### Does this PR introduce _any_ user-facing change? The change is only applicable to the use case when relative path is used in `DataStreamWriter`, resulting in data being output to correct directory. No changes are expected for absolute path (the most common production case). ### How was this patch tested? Added new test case to `FileStreamSinkSuite`. ### Was this patch authored or co-authored using generative AI tooling? No Closes #49654 from vrozov/SPARK-50854. Authored-by: Vlad Rozov <vrozov@amazon.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 2cd5365 commit 16784fa

File tree

6 files changed

+52
-17
lines changed

6 files changed

+52
-17
lines changed

docs/streaming/ss-migration-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](../sql-migration-gui
2727

2828
- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)
2929
- Since Spark 4.0, new configuration `spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint` (default: `0.3`) controls the amount of additional space allowed in the checkpoint directory to store stale version files for batch deletion inside maintenance task. This is to amortize the cost of listing in cloud store. Setting this to `0` defaults to the old behavior. (See [SPARK-48931](https://issues.apache.org/jira/browse/SPARK-48931) for more details.)
30+
- Since Spark 4.0, when relative path is used to output data in `DataStreamWriter` the resolution to absolute path is done in the Spark Driver and is not deferred to Spark Executor. This is to make Structured Streaming behavior similar to DataFrame API (`DataFrameWriter`). (See [SPARK-50854](https://issues.apache.org/jira/browse/SPARK-50854) for more details.)
3031

3132
## Upgrading from Structured Streaming 3.3 to 3.4
3233

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2950,4 +2950,8 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
29502950
messageParameters = Map("option" -> option)
29512951
)
29522952
}
2953+
2954+
def notAbsolutePathError(path: Path): SparkException = {
2955+
SparkException.internalError(s"$path is not absolute path.")
2956+
}
29532957
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ case class DataSource(
120120
private def newHadoopConfiguration(): Configuration =
121121
sparkSession.sessionState.newHadoopConfWithOptions(options)
122122

123+
private def makeQualified(path: Path): Path = {
124+
val fs = path.getFileSystem(newHadoopConfiguration())
125+
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
126+
}
127+
123128
lazy val sourceInfo: SourceInfo = sourceSchema()
124129
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
125130
private val equality = sparkSession.sessionState.conf.resolver
@@ -319,9 +324,9 @@ case class DataSource(
319324
s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode)
320325

321326
case fileFormat: FileFormat =>
322-
val path = caseInsensitiveOptions.getOrElse("path", {
327+
val path = makeQualified(new Path(caseInsensitiveOptions.getOrElse("path", {
323328
throw QueryExecutionErrors.dataPathNotSpecifiedError()
324-
})
329+
}))).toString
325330
if (outputMode != OutputMode.Append) {
326331
throw QueryCompilationErrors.dataSourceOutputModeUnsupportedError(className, outputMode)
327332
}
@@ -456,9 +461,7 @@ case class DataSource(
456461
// 3. It's OK that the output path doesn't exist yet;
457462
val allPaths = paths ++ caseInsensitiveOptions.get("path")
458463
val outputPath = if (allPaths.length == 1) {
459-
val path = new Path(allPaths.head)
460-
val fs = path.getFileSystem(newHadoopConfiguration())
461-
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
464+
makeQualified(new Path(allPaths.head))
462465
} else {
463466
throw QueryExecutionErrors.multiplePathsSpecifiedError(allPaths)
464467
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ class FileStreamSink(
134134

135135
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
136136
private val basePath = new Path(path)
137+
if (!basePath.isAbsolute) {
138+
throw QueryExecutionErrors.notAbsolutePathError(basePath)
139+
}
137140
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
138141
sparkSession.sessionState.conf)
139142
private val retention = options.get("retention").map(Utils.timeStringAsMs)

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1101,7 +1101,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
11011101
.start("/tmp")
11021102
}
11031103

1104-
e.getMessage should equal("Sink FileSink[/tmp] does not support async progress tracking")
1104+
e.getMessage should equal("Sink FileSink[file:/tmp] does not support async progress tracking")
11051105
}
11061106

11071107
test("with log purging") {

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.streaming
1919

2020
import java.io.{File, IOException}
21-
import java.nio.file.Files
21+
import java.nio.file.{Files, Paths}
2222
import java.util.Locale
2323

2424
import scala.collection.mutable.ArrayBuffer
@@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
2727
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
2828
import org.apache.hadoop.mapreduce.JobContext
2929

30-
import org.apache.spark.SparkConf
30+
import org.apache.spark.{SparkConf, SparkException}
3131
import org.apache.spark.deploy.SparkHadoopUtil
3232
import org.apache.spark.internal.io.FileCommitProtocol
3333
import org.apache.spark.paths.SparkPath
@@ -36,6 +36,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame}
3636
import org.apache.spark.sql.catalyst.util.stringToFile
3737
import org.apache.spark.sql.execution.DataSourceScanExec
3838
import org.apache.spark.sql.execution.datasources._
39+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
3940
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable}
4041
import org.apache.spark.sql.execution.streaming._
4142
import org.apache.spark.sql.functions._
@@ -292,35 +293,48 @@ abstract class FileStreamSinkSuite extends StreamTest {
292293
test("parquet") {
293294
testFormat(None) // should not throw error as default format parquet when not specified
294295
testFormat(Some("parquet"))
296+
testFormat(None, relativizeOutputPath = true)
297+
testFormat(Some("parquet"), relativizeOutputPath = true)
295298
}
296299

297300
test("orc") {
298301
testFormat(Some("orc"))
302+
testFormat(Some("orc"), relativizeOutputPath = true)
299303
}
300304

301305
test("text") {
302306
testFormat(Some("text"))
307+
testFormat(Some("text"), relativizeOutputPath = true)
303308
}
304309

305310
test("json") {
306311
testFormat(Some("json"))
312+
testFormat(Some("json"), relativizeOutputPath = true)
307313
}
308314

309-
def testFormat(format: Option[String]): Unit = {
310-
val inputData = MemoryStream[Int]
315+
def testFormat(format: Option[String], relativizeOutputPath: Boolean = false): Unit = {
316+
val inputData = MemoryStream[String] // text format only supports String
311317
val ds = inputData.toDS()
312318

313-
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
319+
val tempDir = Utils.createTempDir(namePrefix = "stream.output")
320+
val outputPath = if (relativizeOutputPath) {
321+
Paths.get("").toAbsolutePath.relativize(tempDir.toPath).toString
322+
} else {
323+
tempDir.getCanonicalPath
324+
}
314325
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
315326

316-
var query: StreamingQuery = null
327+
val writer = ds.toDF("value").writeStream
328+
.option("checkpointLocation", checkpointDir)
329+
if (format.nonEmpty) {
330+
writer.format(format.get)
331+
}
317332

333+
var query: StreamingQuery = null
318334
try {
319-
val writer = ds.map(i => (i, i * 1000)).toDF("id", "value").writeStream
320-
if (format.nonEmpty) {
321-
writer.format(format.get)
322-
}
323-
query = writer.option("checkpointLocation", checkpointDir).start(outputDir)
335+
query = writer.start(outputPath)
336+
inputData.addData("data")
337+
query.processAllAvailable()
324338
} finally {
325339
if (query != null) {
326340
query.stop()
@@ -664,6 +678,16 @@ abstract class FileStreamSinkSuite extends StreamTest {
664678
s" $path."))
665679
}
666680
}
681+
682+
test("SPARK-50854: Make path fully qualified before passing it to FileStreamSink") {
683+
val fileFormat = new ParquetFileFormat() // any valid FileFormat
684+
val partitionColumnNames = Seq.empty[String]
685+
val options = Map.empty[String, String]
686+
val exception = intercept[SparkException] {
687+
new FileStreamSink(spark, "test.parquet", fileFormat, partitionColumnNames, options)
688+
}
689+
assert(exception.getMessage.contains("is not absolute path."))
690+
}
667691
}
668692

669693
object PendingCommitFilesTrackingManifestFileCommitProtocol {

0 commit comments

Comments
 (0)