Skip to content

Fix #4382: Fix thread leak in WSTP by replacing LinkedTransferQueue with LinkedBlockingDeque #4388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: series/3.6.x
Choose a base branch
from

Conversation

pantShrey
Copy link

Description:
This PR fixes Issue #4382 by addressing a thread leak in the Work Stealing Thread Pool (WSTP) caused by the FIFO behavior of LinkedTransferQueue (introduced in #4295). The change switches to LinkedBlockingDeque with LIFO ordering, using offerFirst and pollFirst to prioritize newer threads for reuse, allowing older threads to time out and exit under high load.

Changes:

  • Replace LinkedTransferQueue with LinkedBlockingDeque in WorkStealingThreadPool.scala for LIFO behavior.
  • Update WorkerThread.run to use offerFirst instead of tryTransfer
  • Adjust WorkerThread initialization to use pollFirst for retrieving cached threads, ensuring LIFO reuse.

@@ -732,7 +732,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
// by another thread in the future.
val len = runtimeBlockingExpiration.length
val unit = runtimeBlockingExpiration.unit
if (pool.cachedThreads.tryTransfer(this, len, unit)) {
if (pool.cachedThreads.offerFirst(this, len, unit)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this will always succeed immediately:

Inserts the specified element at the front of this deque, waiting up to the specified wait time if necessary for space to become available.

Because:

The capacity, if unspecified, is equal to Integer.MAX_VALUE

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingDeque.html

private[unsafe] val cachedThreads: LinkedTransferQueue[WorkerThread[P]] =
new LinkedTransferQueue
private[unsafe] val cachedThreads: LinkedBlockingDeque[WorkerThread[P]] =
new LinkedBlockingDeque
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To replicate the old behavior, I think we need to specify a capacity of 0 in the constructor (essentially, a synchronous deque). I'm not entirely certain if that's supported.

Copy link
Author

@pantShrey pantShrey Apr 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To replicate the old behavior, I think we need to specify a capacity of 0 in the constructor (essentially, a synchronous deque). I'm not entirely certain if that's supported.

Capacity of 0 is not allowed
Because

IllegalArgumentException - if capacity is less than 1

the min we can go is 1.
Would that work?

@armanbilge
Copy link
Member

allowing older threads to time out and exit under high load.

This is not quite correct. Under high load (i.e., lots of blocking tasks), then we want the threads to persist so we can reuse them as much as possible.

It's under lower load, when there are more cached threads than blocking tasks, we would like the older threads to time out and exit.

@armanbilge
Copy link
Member

I have a new idea for how to fix this:

  1. We should have a pool-level SynchronousQueue[TransferState].
  2. When a thread transitions to blocking, it should offer its state to the queue. If this fails, it can start a new worker thread to replace itself.
  3. When a thread transitions to cached, it can poll up to the timeout for a new state.
  4. Meanwhile, we can use a ConcurrentHashMap to keep track of blocker threads.

Although officially unspecified, It turns out that the non-fair implementation of SynchronousQueue in the JDK uses a LIFO stack. Although we probably don't want to rely on this in the long-term, I propose that this is good enough to fix the bug for now.

…] and ConcurrentHashMap to keep track of blocker threads.
@@ -714,7 +713,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
if (blocking) {
// The worker thread was blocked before. It is no longer part of the
// core pool and needs to be cached.

val stateToTransfer = transferState
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to do this to avoid NPE in pool.stateTransferQueue.offer(st), but I think this is causing the error

[error] x blocking work does not starve poll
[error] None is not Some (IOPlatformSpecification.scala:702)

Comment on lines 935 to 943
pool.replaceWorker(idx, cached)
pool.replaceWorker(idx, this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change doesn't make sense. The old code used to take a thread out of the cache and promote it to the idxth worker thread, to replace this thread which is about to block.. The new code tries to replace this thread with itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ok I see it now, will try to fix this .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of adding a thread reference field in Transfer state to pass on the cached WorkerThread

Copy link
Member

@armanbilge armanbilge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, this is looking good!

Comment on lines -964 to +973
transferState,
new WorkerThread.TransferState,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?


if (pool.transferStateQueue.offer(transferState)) {
// If successful, a waiting thread will pick it up
// Register this thread in the blockerThreads map
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this comment. Doesn't the registration happen above?

Comment on lines -752 to +757
var t: WorkerThread[P] = null
while ({
t = cachedThreads.poll()
t ne null
}) {
val it = blockerThreads.keySet().iterator()
while (it.hasNext()) {
val t = it.next()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, I think one of the goals here is to avoid any allocations, in case the runtime was shutting down in a fatal condition (e.g. out-of-memory). Unfortunately, creating the iterator is an allocation. But, I don't know how to iterate the elements of a ConcurrentHashMap without an iterator 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants