Skip to content

Commit b634978

Browse files
liviazhuHeartSaVioR
authored andcommitted
[SPARK-51922][SS] Fix UTFDataFormatException thrown from StateStoreChangelogReaderFactory for v1
### What changes were proposed in this pull request? Catch the UTFDataFormatException thrown for v1 in the StateStoreChangelogReaderFactory and assign the version to 1. ### Why are the changes needed? We should not throw this error. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #50721 from liviazhu-db/liviazhu-db/master. Authored-by: Livia Zhu <livia.zhu@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 23785d3 commit b634978

File tree

3 files changed

+23
-2
lines changed

3 files changed

+23
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ class RocksDB(
153153
)
154154
}
155155

156-
private val fileManager = createFileManager(dfsRootDir, createTempDir("fileManager"),
156+
private[spark] val fileManager = createFileManager(dfsRootDir, createTempDir("fileManager"),
157157
hadoopConf, conf.compressionCodec, loggingId = loggingId)
158158
private val byteArrayPair = new ByteArrayPair()
159159
private val commitLatencyMs = new mutable.HashMap[String, Long]()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,10 @@ class StateStoreChangelogReaderFactory(
368368
// When there is no record being written in the changelog file in V1,
369369
// the file contains a single int -1 meaning EOF, then the above readUTF()
370370
// throws with EOFException and we return version 1.
371-
case _: java.io.EOFException => 1
371+
// Or if the first record in the changelog file in V1 has a large enough
372+
// key, readUTF() will throw a UTFDataFormatException so we should return
373+
// version 1 (SPARK-51922).
374+
case _: java.io.EOFException | _: java.io.UTFDataFormatException => 1
372375
}
373376
}
374377

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3516,6 +3516,24 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
35163516
}
35173517
}
35183518

3519+
testWithChangelogCheckpointingEnabled("SPARK-51922 - Changelog writer v1 with large key" +
3520+
" does not cause UTFDataFormatException") {
3521+
val remoteDir = Utils.createTempDir()
3522+
3523+
withDB(remoteDir.toString) { db =>
3524+
db.load(0)
3525+
val key = new Array[Char](98304).mkString("") // Large key that would trigger UTFException
3526+
// if handled incorrectly
3527+
db.put(key, "0")
3528+
db.commit()
3529+
3530+
val changelogReader = db.fileManager.getChangelogReader(1)
3531+
assert(changelogReader.version === 1)
3532+
val entries = changelogReader.toSeq
3533+
assert(entries.size == 1)
3534+
}
3535+
}
3536+
35193537
private def assertAcquiredThreadIsCurrentThread(db: RocksDB): Unit = {
35203538
val threadInfo = db.getAcquiredThreadInfo()
35213539
assert(threadInfo != None,

0 commit comments

Comments
 (0)