Skip to content

Commit 267c963

Browse files
committed
adding lock
1 parent 7384a84 commit 267c963

File tree

1 file changed

+34
-11
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state

1 file changed

+34
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

+34-11
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,9 @@ object StateStore extends Logging {
856856

857857
private val maintenanceThreadPoolLock = new Object
858858

859+
private val providersQueueLock = new Object
860+
861+
@GuardedBy("providersQueueLock")
859862
private val unloadedProvidersToClose =
860863
new ConcurrentLinkedQueue[(StateStoreProviderId, StateStoreProvider)]
861864

@@ -1162,17 +1165,19 @@ object StateStore extends Logging {
11621165
// Wait until this partition can be processed
11631166
private def awaitProcessThisPartition(
11641167
id: StateStoreProviderId,
1165-
timeoutMs: Long
1166-
): Boolean = maintenanceThreadPoolLock synchronized {
1167-
val endTime = System.currentTimeMillis() + timeoutMs
1168+
timeoutMs: Long): Boolean = maintenanceThreadPoolLock synchronized {
1169+
val startTime = System.currentTimeMillis()
1170+
val endTime = startTime + timeoutMs
11681171

11691172
// If immediate processing fails, wait with timeout
11701173
var canProcessThisPartition = processThisPartition(id)
11711174
while (!canProcessThisPartition && System.currentTimeMillis() < endTime) {
11721175
canProcessThisPartition = processThisPartition(id)
11731176
maintenanceThreadPoolLock.wait(timeoutMs)
11741177
}
1175-
1178+
val elapsedTime = System.currentTimeMillis() - startTime
1179+
logInfo(log"Waited for ${MDC(LogKeys.TOTAL_TIME, elapsedTime)} ms to be able to process " +
1180+
log"maintenance for partition ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, id)}")
11761181
canProcessThisPartition
11771182
}
11781183

@@ -1189,20 +1194,34 @@ object StateStore extends Logging {
11891194
}
11901195

11911196
// Providers that couldn't be processed now and need to be added back to the queue
1192-
val providersToRequeue = new ArrayBuffer[(StateStoreProviderId, StateStoreProvider)]()
1193-
1194-
while (!unloadedProvidersToClose.isEmpty) {
1195-
val (providerId, provider) = unloadedProvidersToClose.poll()
1197+
val providersToRequeue = ArrayBuffer.empty[(StateStoreProviderId, StateStoreProvider)]
1198+
1199+
// Create a temporary list and drain the concurrent queue into it under a lock
1200+
val tempList = providersQueueLock synchronized {
1201+
val items = ArrayBuffer.empty[(StateStoreProviderId, StateStoreProvider)]
1202+
while (!unloadedProvidersToClose.isEmpty) {
1203+
val item = unloadedProvidersToClose.poll()
1204+
items += item
1205+
}
1206+
items
1207+
}
11961208

1209+
// Process all items in the temporary list
1210+
tempList.foreach { case (providerId, provider) =>
11971211
if (processThisPartition(providerId)) {
11981212
submitMaintenanceWorkForProvider(
11991213
providerId, provider, storeConf, MaintenanceTaskType.FromMaintenanceQueue)
12001214
} else {
1201-
providersToRequeue += (providerId, provider)
1215+
providersToRequeue += ((providerId, provider))
12021216
}
12031217
}
12041218

1205-
providersToRequeue.foreach(unloadedProvidersToClose.offer)
1219+
providersQueueLock synchronized {
1220+
// Add providers that couldn't be processed back to the original queue
1221+
providersToRequeue.foreach { providerPair =>
1222+
unloadedProvidersToClose.offer(providerPair)
1223+
}
1224+
}
12061225

12071226
loadedProviders.synchronized {
12081227
loadedProviders.toSeq
@@ -1239,7 +1258,9 @@ object StateStore extends Logging {
12391258
val ableToProcessNow = awaitProcessThisPartition(id, timeoutMs)
12401259
if (!ableToProcessNow) {
12411260
// Add to queue for later processing if we can't process now
1242-
unloadedProvidersToClose.add((id, provider))
1261+
providersQueueLock synchronized {
1262+
unloadedProvidersToClose.offer((id, provider))
1263+
}
12431264
}
12441265
ableToProcessNow
12451266

@@ -1249,6 +1270,8 @@ object StateStore extends Logging {
12491270
true
12501271

12511272
case FromLoadedProviders =>
1273+
// Provider from loadedProviders can be processed immediately
1274+
// as it's in maintenancePartitions
12521275
true
12531276
}
12541277

0 commit comments

Comments
 (0)