Skip to content

Fix #4382: Fix thread leak in WSTP by replacing LinkedTransferQueue with SynchronousQueue and ConcurrentHashMap #4388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: series/3.6.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration}

import java.time.Instant
import java.time.temporal.ChronoField
import java.util.concurrent.{LinkedTransferQueue, ThreadLocalRandom}
import java.util.concurrent.{ConcurrentHashMap, SynchronousQueue, ThreadLocalRandom}
import java.util.concurrent.atomic.{
AtomicBoolean,
AtomicInteger,
Expand Down Expand Up @@ -131,8 +131,11 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
*/
private[this] val state: AtomicInteger = new AtomicInteger(threadCount << UnparkShift)

private[unsafe] val cachedThreads: LinkedTransferQueue[WorkerThread[P]] =
new LinkedTransferQueue
private[unsafe] val stateTransferQueue: SynchronousQueue[WorkerThread.TransferState] =
new SynchronousQueue[WorkerThread.TransferState](false)

private[unsafe] val blockerThreads: ConcurrentHashMap[WorkerThread[P], java.lang.Boolean] =
new ConcurrentHashMap()

/**
* The shutdown latch of the work stealing thread pool.
Expand Down Expand Up @@ -749,14 +752,11 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
system.close()
}

var t: WorkerThread[P] = null
var ts: WorkerThread.TransferState = new WorkerThread.TransferState()
while ({
t = cachedThreads.poll()
t ne null
}) {
t.interrupt()
// don't bother joining, cached threads are not doing anything interesting
}
ts = stateTransferQueue.poll()
ts ne null
}) {}

// Drain the external queue.
externalQueue.clear()
Expand Down
40 changes: 24 additions & 16 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.concurrent.{BlockContext, CanAwait}
import scala.concurrent.duration.{Duration, FiniteDuration}

import java.lang.Long.MIN_VALUE
import java.util.concurrent.{ArrayBlockingQueue, ThreadLocalRandom}
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

import WorkerThread.{Metrics, TransferState}
Expand Down Expand Up @@ -110,7 +110,6 @@ private[effect] final class WorkerThread[P <: AnyRef](
*/
private[this] var _active: Runnable = _

private val stateTransfer: ArrayBlockingQueue[TransferState] = new ArrayBlockingQueue(1)
private[this] val runtimeBlockingExpiration: Duration = pool.runtimeBlockingExpiration

private[effect] var currentIOFiber: IOFiber[?] = _
Expand Down Expand Up @@ -714,7 +713,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
if (blocking) {
// The worker thread was blocked before. It is no longer part of the
// core pool and needs to be cached.

val stateToTransfer = transferState
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to do this to avoid NPE in pool.stateTransferQueue.offer(st), but I think this is causing the error

[error] x blocking work does not starve poll
[error] None is not Some (IOPlatformSpecification.scala:702)

// First of all, remove the references to data structures of the core
// pool because they have already been transferred to another thread
// which took the place of this one.
Expand All @@ -728,24 +727,32 @@ private[effect] final class WorkerThread[P <: AnyRef](
transferState = null

try {
// Try to transfer this thread via the cached threads data structure, to be picked up
// Try to transfer this thread via the stateTransferQueue to be picked up
// by another thread in the future.
val st = stateToTransfer
val len = runtimeBlockingExpiration.length
val unit = runtimeBlockingExpiration.unit
if (pool.cachedThreads.tryTransfer(this, len, unit)) {
val timeoutNanos = unit.toNanos(len)

if (pool.stateTransferQueue.offer(st)) {
// Someone accepted the transfer of this thread and will transfer the state soon.
val newState = stateTransfer.take()
init(newState)
val newState = pool.stateTransferQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS)
if (newState ne null) {
init(newState)
} else {
// The timeout elapsed and no one woke up this thread. It's time to exit.
pool.blockedWorkerThreadCounter.decrementAndGet()
return
}
} else {
// The timeout elapsed and no one woke up this thread. It's time to exit.
// Nobody polling, spawn new replacement was done in prepareForBlocking
pool.blockedWorkerThreadCounter.decrementAndGet()
return
}
} catch {
case _: InterruptedException =>
// This thread was interrupted while cached. This should only happen
// during the shutdown of the pool. Nothing else to be done, just
// exit.
// during the shutdown of the pool. Nothing else to be done, just exit.
return
}
}
Expand Down Expand Up @@ -928,15 +935,16 @@ private[effect] final class WorkerThread[P <: AnyRef](
// Set the name of this thread to a blocker prefixed name.
setName(s"$prefix-$nameIndex")

val cached = pool.cachedThreads.poll()
if (cached ne null) {
val ts = transferState
val available = pool.stateTransferQueue.poll(0, TimeUnit.MILLISECONDS)
if (available ne null) {
// There is a cached worker thread that can be reused.
val idx = index
pool.replaceWorker(idx, cached)
pool.replaceWorker(idx, this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change doesn't make sense. The old code used to take a thread out of the cache and promote it to the idxth worker thread, to replace this thread which is about to block.. The new code tries to replace this thread with itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ok I see it now, will try to fix this .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of adding a thread reference field in Transfer state to pass on the cached WorkerThread

// Transfer the data structures to the cached thread and wake it up.
transferState.index = idx
transferState.tick = tick + 1
val _ = cached.stateTransfer.offer(transferState)
ts.index = idx
ts.tick = tick + 1
val _ = pool.stateTransferQueue.offer(ts)
} else {
// Spawn a new `WorkerThread`, a literal clone of this one. It is safe to
// transfer ownership of the local queue and the parked signal to the new
Expand Down
Loading