-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-51596][SS] Fix concurrent StateStoreProvider maintenance and closing #50595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@@ -1157,6 +1160,69 @@ object StateStore extends Logging { | |||
} | |||
} | |||
|
|||
private def doMaintenanceOnProvider(id: StateStoreProviderId, provider: StateStoreProvider, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lets move the args to new lines ?
@@ -1009,14 +1013,45 @@ object StateStore extends Logging { | |||
|
|||
val otherProviderIds = loadedProviders.keys.filter(_ != storeProviderId).toSeq | |||
val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, otherProviderIds) | |||
providerIdsToUnload.foreach(unload(_)) | |||
providerIdsToUnload.foreach(id => { | |||
loadedProviders.remove(id).foreach( provider => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to synchronize on loadedProviders
here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already in the synchronized block on line 922.
// Trigger maintenance thread to immediately do maintenance on and close the provider. | ||
// Doing maintenance first allows us to do maintenance for a constantly-moving state | ||
// store. | ||
logInfo(log"Task thread trigger maintenance on " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some more info in this log line, like task ID? Also add that provider was removed from loadedProviders.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, that would help with investigation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
include trigger maintenance to close provider
@@ -1157,6 +1160,69 @@ object StateStore extends Logging { | |||
} | |||
} | |||
|
|||
private def doMaintenanceOnProvider(id: StateStoreProviderId, provider: StateStoreProvider, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment saying that if alreadyRemovedFromLoadedProviders is false, we must already have the lock to process this partition?
@@ -1009,14 +1013,45 @@ object StateStore extends Logging { | |||
|
|||
val otherProviderIds = loadedProviders.keys.filter(_ != storeProviderId).toSeq | |||
val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, otherProviderIds) | |||
providerIdsToUnload.foreach(unload(_)) | |||
providerIdsToUnload.foreach(id => { | |||
loadedProviders.remove(id).foreach( provider => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already in the synchronized block on line 922.
// Trigger maintenance thread to immediately do maintenance on and close the provider. | ||
// Doing maintenance first allows us to do maintenance for a constantly-moving state | ||
// store. | ||
logInfo(log"Task thread trigger maintenance on " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, that would help with investigation
// Trigger maintenance thread to immediately do maintenance on and close the provider. | ||
// Doing maintenance first allows us to do maintenance for a constantly-moving state | ||
// store. | ||
logInfo(log"Task thread trigger maintenance on " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
include trigger maintenance to close provider
*/ | ||
def unload(storeProviderId: StateStoreProviderId, | ||
alreadyRemovedStoreFromLoadedProviders: Option[StateStoreProvider] = None): Unit = { | ||
var toCloseProviders: List[StateStoreProvider] = Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this a list? we will only be closing one provider right?
if (alreadyRemovedFromLoadedProviders) { | ||
// If provider is already removed from loadedProviders, we MUST process | ||
// this partition to close it, so we block until we can. | ||
awaitProcessThisPartition(id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets make it clear that we are waiting for any possible ongoing maintenance on this partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also add comment about when this would have already been removed
} | ||
} catch { | ||
case NonFatal(e) => | ||
logWarning(log"Error managing ${MDC(LogKeys.STATE_STORE_PROVIDER, provider)}, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: replace the word "managing"?
// store. | ||
logInfo(log"Task thread trigger maintenance on " + | ||
log"provider=${MDC(LogKeys.STATE_STORE_PROVIDER, id)}") | ||
doMaintenanceOnProvider(id, provider, alreadyRemovedFromLoadedProviders = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets rename this to submitMaintenanceWorkForProvider
. To make it clear that this isn't blocking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you actually just while you're modifying this code also change all of the places where we have ${MDC(LogKeys.STATE_STORE_PROVIDER, provider)}
(which logs the java object) to ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, id)}
? The first one doesn't give us much information, I think we really just want to see the ID in the logs
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
val awaitingPartitionDuration = System.currentTimeMillis() - startTime | ||
try { | ||
provider.doMaintenance() | ||
// If shouldRemoveFromLoadedProviders is false, we don't need to verify |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops this comment needs to be updated
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, not sure how possible it is, but could you try to write a test where the task submits maintenance work for the provider while it's already being processed by a maintenance thread (i.e. when we need to block in awaitProcessThisPartition
)? If it's kind of hard to write a reliable unit test to test this, maybe a manual test would do if you hack around with the timing, just want to check that this functionality works as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending a nit
@@ -1009,14 +1013,46 @@ object StateStore extends Logging { | |||
|
|||
val otherProviderIds = loadedProviders.keys.filter(_ != storeProviderId).toSeq | |||
val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, otherProviderIds) | |||
providerIdsToUnload.foreach(unload(_)) | |||
val taskContextIdLogLine = Option(TaskContext.get()).map { tc => | |||
log"taskId=${MDC(LogKeys.STATE_STORE_PROVIDER_ID, tc.taskAttemptId())}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think LogKey is wrong here
// Trigger maintenance thread to immediately do maintenance on and close the provider. | ||
// Doing maintenance first allows us to do maintenance for a constantly-moving state | ||
// store. | ||
logInfo(log"Task thread trigger maintenance to close " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Submitted maintenance from task thread
// with the coordinator as we know it definitely should be unloaded. | ||
if (submittedFromTaskThread) { | ||
unload(id, Some(provider)) | ||
} else if (!verifyIfStoreInstanceActive(id)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent seems off here ?
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
// Block until we can process this partition | ||
private def awaitProcessThisPartition( | ||
id: StateStoreProviderId, | ||
storeConf: StateStoreConf): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets just pass the timeout value here ?
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for making these changes!
// (we've already removed it from loadedProviders) | ||
true | ||
|
||
case FromLoadedProviders => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 - lets comment this case too
private def awaitProcessThisPartition( | ||
id: StateStoreProviderId, | ||
timeoutMs: Long | ||
): Boolean = maintenanceThreadPoolLock synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we move to line above ?
canProcessThisPartition = processThisPartition(id) | ||
maintenanceThreadPoolLock.wait(timeoutMs) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a log to indicate how much time we waited here ?
// Providers that couldn't be processed now and need to be added back to the queue | ||
val providersToRequeue = new ArrayBuffer[(StateStoreProviderId, StateStoreProvider)]() | ||
|
||
while (!unloadedProvidersToClose.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access to unloadedProvidersToClose
is not truly thread safe here. Can we add a separate lock to guard access ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unloadedProvidersToClose
is a concurrent queue, so we don't need lock here. Since this is a while loop, if a new elem is added, we will see it. Even if we don't, not an issue, we will see it on next maintenance
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
267c963
to
46b300a
Compare
cc @HeartSaVioR |
@@ -4927,6 +4927,13 @@ | |||
], | |||
"sqlState" : "XXKST" | |||
}, | |||
"STATE_STORE_MAINTENANCE_TASK_TIMEOUT" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we are currently not using this in this code. Couldn't find the usage.
.internal() | ||
.doc("Timeout in seconds to wait for maintenance to process this partition.") | ||
.timeConf(TimeUnit.SECONDS) | ||
.createWithDefault(300L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think 5 mins is too large. Because this means a maintenance thread would be waiting for 5 mins for a partition. Instead of doing some other work. We can end up having many threads waiting, doing nothing for 5 mins. Lets set this lower cc @anishshri-db
// Providers that couldn't be processed now and need to be added back to the queue | ||
val providersToRequeue = new ArrayBuffer[(StateStoreProviderId, StateStoreProvider)]() | ||
|
||
while (!unloadedProvidersToClose.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unloadedProvidersToClose
is a concurrent queue, so we don't need lock here. Since this is a while loop, if a new elem is added, we will see it. Even if we don't, not an issue, we will see it on next maintenance
// Providers that couldn't be processed now and need to be added back to the queue | ||
val providersToRequeue = new ArrayBuffer[(StateStoreProviderId, StateStoreProvider)]() | ||
|
||
while (!unloadedProvidersToClose.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add comment about what providers will be in the unloadedProvidersToClose
for future readers
// Determine if we can process this partition based on the source | ||
val canProcessThisPartition = source match { | ||
case FromTaskThread => | ||
// Provider from task thread needs to wait for lock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add additional comment that it needs to wait because there could already be an ongoing maintenance for that provider
} | ||
|
||
if (source != FromLoadedProviders) { | ||
logInfo(log"Unloaded ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, id)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need this log too right if we unloaded for the FromLoadedProviders
right?
} | ||
} catch { | ||
case NonFatal(e) => | ||
logWarning(log"Error ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, id)}, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we make this Error doing maintenance on
or something?
@@ -1718,6 +1845,240 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] | |||
assert(encoderSpec == deserializedEncoderSpec) | |||
} | |||
|
|||
test("SPARK-51596: task thread waits for ongoing maintenance to complete before" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename this, the task thread isn't the one waiting. Task thread just submits to maint threadpool
} | ||
|
||
// Now unblock the first maintenance operation | ||
// This should allow the task's submitted maintenance to proceed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do you know that the submited work actually waited for the ongoing maintenance and didn't happen concurrently? I don't see that verification here
|
||
// Mark that task submitted maintenance | ||
SignalingStateStoreProvider.taskSubmittedMaintenance = true | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test is actually not checking that the provider was queued. How are we sure it was queued?
var canProcessThisPartition = processThisPartition(id) | ||
while (!canProcessThisPartition && System.currentTimeMillis() < endTime) { | ||
canProcessThisPartition = processThisPartition(id) | ||
maintenanceThreadPoolLock.wait(timeoutMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the canProcessThisPartition
be after the wait? Because with this current code if canProcessThisPartition
becomes true, you will end up still waiting
34220b6
to
9faf0a4
Compare
What changes were proposed in this pull request?
Moves the unload operation away from task thread into the maintenance thread. To ensure unloading still occurs ASAP (rather than potentially waiting for the maintenance interval) as was introduced by https://issues.apache.org/jira/browse/SPARK-33827, we immediately trigger a maintenance thread to do the unload.
This gives us an extra benefit that unloading other providers doesn't block the task thread. To capitalize on this, unload() should not hold the loadedProviders lock the entire time (which will block other task threads), but instead release it once it has deleted the unloading providers from the map and close the providers without the lock held.
Why are the changes needed?
Currently, both the task thread and maintenance thread can call unload() on a provider. This leads to a race condition where the maintenance could be conducting maintenance while the task thread is closing the provider, leading to unexpected behavior.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit test
Was this patch authored or co-authored using generative AI tooling?
No