-
Notifications
You must be signed in to change notification settings - Fork 554
Fix #4382: Fix thread leak in WSTP by replacing LinkedTransferQueue with LinkedBlockingDeque #4388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: series/3.6.x
Are you sure you want to change the base?
Conversation
@@ -732,7 +732,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |||
// by another thread in the future. | |||
val len = runtimeBlockingExpiration.length | |||
val unit = runtimeBlockingExpiration.unit | |||
if (pool.cachedThreads.tryTransfer(this, len, unit)) { | |||
if (pool.cachedThreads.offerFirst(this, len, unit)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this will always succeed immediately:
Inserts the specified element at the front of this deque, waiting up to the specified wait time if necessary for space to become available.
Because:
The capacity, if unspecified, is equal to
Integer.MAX_VALUE
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingDeque.html
private[unsafe] val cachedThreads: LinkedTransferQueue[WorkerThread[P]] = | ||
new LinkedTransferQueue | ||
private[unsafe] val cachedThreads: LinkedBlockingDeque[WorkerThread[P]] = | ||
new LinkedBlockingDeque |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To replicate the old behavior, I think we need to specify a capacity of 0
in the constructor (essentially, a synchronous deque). I'm not entirely certain if that's supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To replicate the old behavior, I think we need to specify a capacity of
0
in the constructor (essentially, a synchronous deque). I'm not entirely certain if that's supported.
Capacity of 0
is not allowed
Because
IllegalArgumentException - if capacity is less than 1
the min we can go is 1
.
Would that work?
This is not quite correct. Under high load (i.e., lots of blocking tasks), then we want the threads to persist so we can reuse them as much as possible. It's under lower load, when there are more cached threads than blocking tasks, we would like the older threads to time out and exit. |
I have a new idea for how to fix this:
Although officially unspecified, It turns out that the non-fair implementation of |
…] and ConcurrentHashMap to keep track of blocker threads.
@@ -714,7 +713,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |||
if (blocking) { | |||
// The worker thread was blocked before. It is no longer part of the | |||
// core pool and needs to be cached. | |||
|
|||
val stateToTransfer = transferState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to do this to avoid NPE in pool.stateTransferQueue.offer(st)
, but I think this is causing the error
[error] x blocking work does not starve poll
[error] None is not Some (IOPlatformSpecification.scala:702)
pool.replaceWorker(idx, cached) | ||
pool.replaceWorker(idx, this) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change doesn't make sense. The old code used to take a thread out of the cache and promote it to the idx
th worker thread, to replace this
thread which is about to block.. The new code tries to replace this thread with itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh ok I see it now, will try to fix this .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking of adding a thread reference field in Transfer state to pass on the cached WorkerThread
…and ConcurrentHashMap idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, this is looking good!
transferState, | ||
new WorkerThread.TransferState, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
|
||
if (pool.transferStateQueue.offer(transferState)) { | ||
// If successful, a waiting thread will pick it up | ||
// Register this thread in the blockerThreads map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused by this comment. Doesn't the registration happen above?
var t: WorkerThread[P] = null | ||
while ({ | ||
t = cachedThreads.poll() | ||
t ne null | ||
}) { | ||
val it = blockerThreads.keySet().iterator() | ||
while (it.hasNext()) { | ||
val t = it.next() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, I think one of the goals here is to avoid any allocations, in case the runtime was shutting down in a fatal condition (e.g. out-of-memory). Unfortunately, creating the iterator is an allocation. But, I don't know how to iterate the elements of a ConcurrentHashMap
without an iterator 🤔
Description:
This PR fixes Issue #4382 by addressing a thread leak in the Work Stealing Thread Pool (WSTP) caused by the FIFO behavior of
LinkedTransferQueue
(introduced in #4295). The change switches toLinkedBlockingDeque
with LIFO ordering, usingofferFirst
andpollFirst
to prioritize newer threads for reuse, allowing older threads to time out and exit under high load.Changes:
LinkedTransferQueue
withLinkedBlockingDeque
inWorkStealingThreadPool.scala
for LIFO behavior.WorkerThread.run
to useofferFirst
instead oftryTransfer
WorkerThread
initialization to usepollFirst
for retrieving cached threads, ensuring LIFO reuse.