Skip to content

[SPARK-51596][SS] Fix concurrent StateStoreProvider maintenance and closing #50595

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 37 commits into
base: master
Choose a base branch
from

Conversation

ericm-db
Copy link
Contributor

What changes were proposed in this pull request?

Moves the unload operation away from task thread into the maintenance thread. To ensure unloading still occurs ASAP (rather than potentially waiting for the maintenance interval) as was introduced by https://issues.apache.org/jira/browse/SPARK-33827, we immediately trigger a maintenance thread to do the unload.

This gives us an extra benefit that unloading other providers doesn't block the task thread. To capitalize on this, unload() should not hold the loadedProviders lock the entire time (which will block other task threads), but instead release it once it has deleted the unloading providers from the map and close the providers without the lock held.

Why are the changes needed?

Currently, both the task thread and maintenance thread can call unload() on a provider. This leads to a race condition where the maintenance could be conducting maintenance while the task thread is closing the provider, leading to unexpected behavior.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit test

Was this patch authored or co-authored using generative AI tooling?

No

@@ -1157,6 +1160,69 @@ object StateStore extends Logging {
}
}

private def doMaintenanceOnProvider(id: StateStoreProviderId, provider: StateStoreProvider,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets move the args to new lines ?

@@ -1009,14 +1013,45 @@ object StateStore extends Logging {

val otherProviderIds = loadedProviders.keys.filter(_ != storeProviderId).toSeq
val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, otherProviderIds)
providerIdsToUnload.foreach(unload(_))
providerIdsToUnload.foreach(id => {
loadedProviders.remove(id).foreach( provider => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to synchronize on loadedProviders here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already in the synchronized block on line 922.

// Trigger maintenance thread to immediately do maintenance on and close the provider.
// Doing maintenance first allows us to do maintenance for a constantly-moving state
// store.
logInfo(log"Task thread trigger maintenance on " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some more info in this log line, like task ID? Also add that provider was removed from loadedProviders.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, that would help with investigation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include trigger maintenance to close provider

@@ -1157,6 +1160,69 @@ object StateStore extends Logging {
}
}

private def doMaintenanceOnProvider(id: StateStoreProviderId, provider: StateStoreProvider,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment saying that if alreadyRemovedFromLoadedProviders is false, we must already have the lock to process this partition?

@@ -1009,14 +1013,45 @@ object StateStore extends Logging {

val otherProviderIds = loadedProviders.keys.filter(_ != storeProviderId).toSeq
val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, otherProviderIds)
providerIdsToUnload.foreach(unload(_))
providerIdsToUnload.foreach(id => {
loadedProviders.remove(id).foreach( provider => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already in the synchronized block on line 922.

// Trigger maintenance thread to immediately do maintenance on and close the provider.
// Doing maintenance first allows us to do maintenance for a constantly-moving state
// store.
logInfo(log"Task thread trigger maintenance on " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, that would help with investigation

// Trigger maintenance thread to immediately do maintenance on and close the provider.
// Doing maintenance first allows us to do maintenance for a constantly-moving state
// store.
logInfo(log"Task thread trigger maintenance on " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include trigger maintenance to close provider

*/
def unload(storeProviderId: StateStoreProviderId,
alreadyRemovedStoreFromLoadedProviders: Option[StateStoreProvider] = None): Unit = {
var toCloseProviders: List[StateStoreProvider] = Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a list? we will only be closing one provider right?

if (alreadyRemovedFromLoadedProviders) {
// If provider is already removed from loadedProviders, we MUST process
// this partition to close it, so we block until we can.
awaitProcessThisPartition(id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets make it clear that we are waiting for any possible ongoing maintenance on this partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add comment about when this would have already been removed

}
} catch {
case NonFatal(e) =>
logWarning(log"Error managing ${MDC(LogKeys.STATE_STORE_PROVIDER, provider)}, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: replace the word "managing"?

// store.
logInfo(log"Task thread trigger maintenance on " +
log"provider=${MDC(LogKeys.STATE_STORE_PROVIDER, id)}")
doMaintenanceOnProvider(id, provider, alreadyRemovedFromLoadedProviders = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets rename this to submitMaintenanceWorkForProvider. To make it clear that this isn't blocking

Copy link
Contributor

@liviazhu-db liviazhu-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you actually just while you're modifying this code also change all of the places where we have ${MDC(LogKeys.STATE_STORE_PROVIDER, provider)} (which logs the java object) to ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, id)}? The first one doesn't give us much information, I think we really just want to see the ID in the logs

val awaitingPartitionDuration = System.currentTimeMillis() - startTime
try {
provider.doMaintenance()
// If shouldRemoveFromLoadedProviders is false, we don't need to verify
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops this comment needs to be updated

Copy link
Contributor

@liviazhu-db liviazhu-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, not sure how possible it is, but could you try to write a test where the task submits maintenance work for the provider while it's already being processed by a maintenance thread (i.e. when we need to block in awaitProcessThisPartition)? If it's kind of hard to write a reliable unit test to test this, maybe a manual test would do if you hack around with the timing, just want to check that this functionality works as expected.

@ericm-db ericm-db requested a review from liviazhu-db April 24, 2025 21:27
Copy link
Contributor

@liviazhu-db liviazhu-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending a nit

@@ -1009,14 +1013,46 @@ object StateStore extends Logging {

val otherProviderIds = loadedProviders.keys.filter(_ != storeProviderId).toSeq
val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, otherProviderIds)
providerIdsToUnload.foreach(unload(_))
val taskContextIdLogLine = Option(TaskContext.get()).map { tc =>
log"taskId=${MDC(LogKeys.STATE_STORE_PROVIDER_ID, tc.taskAttemptId())}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think LogKey is wrong here

// Trigger maintenance thread to immediately do maintenance on and close the provider.
// Doing maintenance first allows us to do maintenance for a constantly-moving state
// store.
logInfo(log"Task thread trigger maintenance to close " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Submitted maintenance from task thread

// with the coordinator as we know it definitely should be unloaded.
if (submittedFromTaskThread) {
unload(id, Some(provider))
} else if (!verifyIfStoreInstanceActive(id)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent seems off here ?

// Block until we can process this partition
private def awaitProcessThisPartition(
id: StateStoreProviderId,
storeConf: StateStoreConf): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets just pass the timeout value here ?

Copy link
Contributor

@liviazhu-db liviazhu-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for making these changes!

// (we've already removed it from loadedProviders)
true

case FromLoadedProviders =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - lets comment this case too

private def awaitProcessThisPartition(
id: StateStoreProviderId,
timeoutMs: Long
): Boolean = maintenanceThreadPoolLock synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we move to line above ?

canProcessThisPartition = processThisPartition(id)
maintenanceThreadPoolLock.wait(timeoutMs)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a log to indicate how much time we waited here ?

// Providers that couldn't be processed now and need to be added back to the queue
val providersToRequeue = new ArrayBuffer[(StateStoreProviderId, StateStoreProvider)]()

while (!unloadedProvidersToClose.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access to unloadedProvidersToClose is not truly thread safe here. Can we add a separate lock to guard access ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unloadedProvidersToClose is a concurrent queue, so we don't need lock here. Since this is a while loop, if a new elem is added, we will see it. Even if we don't, not an issue, we will see it on next maintenance

@ericm-db ericm-db force-pushed the maintenance-changes branch from 267c963 to 46b300a Compare April 29, 2025 21:05
@ericm-db
Copy link
Contributor Author

cc @HeartSaVioR

@@ -4927,6 +4927,13 @@
],
"sqlState" : "XXKST"
},
"STATE_STORE_MAINTENANCE_TASK_TIMEOUT" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we are currently not using this in this code. Couldn't find the usage.

.internal()
.doc("Timeout in seconds to wait for maintenance to process this partition.")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(300L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think 5 mins is too large. Because this means a maintenance thread would be waiting for 5 mins for a partition. Instead of doing some other work. We can end up having many threads waiting, doing nothing for 5 mins. Lets set this lower cc @anishshri-db

// Providers that couldn't be processed now and need to be added back to the queue
val providersToRequeue = new ArrayBuffer[(StateStoreProviderId, StateStoreProvider)]()

while (!unloadedProvidersToClose.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unloadedProvidersToClose is a concurrent queue, so we don't need lock here. Since this is a while loop, if a new elem is added, we will see it. Even if we don't, not an issue, we will see it on next maintenance

// Providers that couldn't be processed now and need to be added back to the queue
val providersToRequeue = new ArrayBuffer[(StateStoreProviderId, StateStoreProvider)]()

while (!unloadedProvidersToClose.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add comment about what providers will be in the unloadedProvidersToClose for future readers

// Determine if we can process this partition based on the source
val canProcessThisPartition = source match {
case FromTaskThread =>
// Provider from task thread needs to wait for lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add additional comment that it needs to wait because there could already be an ongoing maintenance for that provider

}

if (source != FromLoadedProviders) {
logInfo(log"Unloaded ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, id)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need this log too right if we unloaded for the FromLoadedProviders right?

}
} catch {
case NonFatal(e) =>
logWarning(log"Error ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, id)}, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we make this Error doing maintenance on or something?

@@ -1718,6 +1845,240 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
assert(encoderSpec == deserializedEncoderSpec)
}

test("SPARK-51596: task thread waits for ongoing maintenance to complete before" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename this, the task thread isn't the one waiting. Task thread just submits to maint threadpool

}

// Now unblock the first maintenance operation
// This should allow the task's submitted maintenance to proceed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you know that the submited work actually waited for the ongoing maintenance and didn't happen concurrently? I don't see that verification here


// Mark that task submitted maintenance
SignalingStateStoreProvider.taskSubmittedMaintenance = true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is actually not checking that the provider was queued. How are we sure it was queued?

var canProcessThisPartition = processThisPartition(id)
while (!canProcessThisPartition && System.currentTimeMillis() < endTime) {
canProcessThisPartition = processThisPartition(id)
maintenanceThreadPoolLock.wait(timeoutMs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the canProcessThisPartition be after the wait? Because with this current code if canProcessThisPartition becomes true, you will end up still waiting

@ericm-db ericm-db force-pushed the maintenance-changes branch from 34220b6 to 9faf0a4 Compare May 2, 2025 20:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants