Skip to content

Commit d7ce6ef

Browse files
committed
[SPARK-51866][CONNECT][TESTS] Ensure serializerAllocator/deserializerAllocator are closed if ArrowEncoderSuite#roundTripWithDifferentIOEncoders fails to create CloseableIterator
### What changes were proposed in this pull request? This pull request ensures that `serializerAllocator` and `deserializerAllocator` are closed when the creation of `CloseableIterator` by `ArrowEncoderSuite#roundTripWithDifferentIOEncoders` fails. ### Why are the changes needed? When adding the test options `(Test / javaOptions) += "-Darrow.memory.debug.allocator=true",` for the `connect-client-jvm` module, `ArrowEncoderSuite` will throw the following error: ``` [info] org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite *** ABORTED *** (3 seconds, 446 milliseconds) [info] java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding child allocators. [info] Allocator(ROOT) 0/0/574720/9223372036854775807 (res/actual/peak/limit) [info] child allocators: 2 [info] Allocator(serialization) 0/0/0/9223372036854775807 (res/actual/peak/limit) [info] child allocators: 0 [info] ledgers: 0 [info] reservations: 0 [info] Allocator(deserialization) 0/0/0/9223372036854775807 (res/actual/peak/limit) [info] child allocators: 0 [info] ledgers: 0 [info] reservations: 0 [info] ledgers: 0 [info] reservations: 0 [info] at org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:462) [info] at org.apache.arrow.memory.RootAllocator.close(RootAllocator.java:27) [info] at org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite.afterAll(ArrowEncoderSuite.scala:62) [info] at org.scalatest.BeforeAndAfterAll.$anonfun$run$1(BeforeAndAfterAll.scala:225) [info] at org.scalatest.Status.$anonfun$withAfterEffect$1(Status.scala:377) [info] at org.scalatest.Status.$anonfun$withAfterEffect$1$adapted(Status.scala:373) [info] at org.scalatest.CompositeStatus.whenCompleted(Status.scala:962) [info] at org.scalatest.Status.withAfterEffect(Status.scala:373) [info] at org.scalatest.Status.withAfterEffect$(Status.scala:371) [info] at org.scalatest.CompositeStatus.withAfterEffect(Status.scala:863) [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:224) [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) [info] at org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite.run(ArrowEncoderSuite.scala:53) [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321) [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517) [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414) [info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [info] at java.base/java.lang.Thread.run(Thread.java:840) [info] Run completed in 5 seconds, 568 milliseconds. [info] Total number of tests run: 108 [info] Suites: completed 0, aborted 1 [info] Tests: succeeded 108, failed 0, canceled 0, ignored 0, pending 0 [info] *** 1 SUITE ABORTED *** [error] Error during tests: [error] org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - locally confirmed that when adding the test parameter `(Test / javaOptions) += "-Darrow.memory.debug.allocator=true",` for the `connect-client-jvm` module, the aforementioned error message is no longer thrown. ### Was this patch authored or co-authored using generative AI tooling? No Closes #50664 from LuciferYang/Fix-ArrowEncoderSuite. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent cdd5296 commit d7ce6ef

File tree

1 file changed

+38
-29
lines changed

1 file changed

+38
-29
lines changed

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -99,40 +99,49 @@ class ArrowEncoderSuite extends ConnectFunSuite with BeforeAndAfterAll {
9999
val serializerAllocator = newAllocator("serialization")
100100
val deserializerAllocator = newAllocator("deserialization")
101101

102-
val arrowIterator = ArrowSerializer.serialize(
103-
input = iterator,
104-
enc = inputEncoder,
105-
allocator = serializerAllocator,
106-
maxRecordsPerBatch = maxRecordsPerBatch,
107-
maxBatchSize = maxBatchSize,
108-
batchSizeCheckInterval = batchSizeCheckInterval,
109-
timeZoneId = "UTC",
110-
largeVarTypes = false)
102+
try {
103+
val arrowIterator = ArrowSerializer.serialize(
104+
input = iterator,
105+
enc = inputEncoder,
106+
allocator = serializerAllocator,
107+
maxRecordsPerBatch = maxRecordsPerBatch,
108+
maxBatchSize = maxBatchSize,
109+
batchSizeCheckInterval = batchSizeCheckInterval,
110+
timeZoneId = "UTC",
111+
largeVarTypes = false)
111112

112-
val inspectedIterator = if (inspectBatch != null) {
113-
arrowIterator.map { batch =>
114-
inspectBatch(batch)
115-
batch
113+
val inspectedIterator = if (inspectBatch != null) {
114+
arrowIterator.map { batch =>
115+
inspectBatch(batch)
116+
batch
117+
}
118+
} else {
119+
arrowIterator
116120
}
117-
} else {
118-
arrowIterator
119-
}
120121

121-
val resultIterator =
122-
ArrowDeserializers.deserializeFromArrow(
123-
inspectedIterator,
124-
outputEncoder,
125-
deserializerAllocator,
126-
timeZoneId = "UTC")
127-
new CloseableIterator[O] {
128-
override def close(): Unit = {
129-
arrowIterator.close()
130-
resultIterator.close()
122+
val resultIterator =
123+
ArrowDeserializers.deserializeFromArrow(
124+
inspectedIterator,
125+
outputEncoder,
126+
deserializerAllocator,
127+
timeZoneId = "UTC")
128+
new CloseableIterator[O] {
129+
override def close(): Unit = {
130+
arrowIterator.close()
131+
resultIterator.close()
132+
serializerAllocator.close()
133+
deserializerAllocator.close()
134+
}
135+
136+
override def hasNext: Boolean = resultIterator.hasNext
137+
138+
override def next(): O = resultIterator.next()
139+
}
140+
} catch {
141+
case e: Throwable =>
131142
serializerAllocator.close()
132143
deserializerAllocator.close()
133-
}
134-
override def hasNext: Boolean = resultIterator.hasNext
135-
override def next(): O = resultIterator.next()
144+
throw e
136145
}
137146
}
138147

0 commit comments

Comments
 (0)