From 16f26160ecf0f87458a077577b5c4937b4e4b695 Mon Sep 17 00:00:00 2001 From: Shrey Pant Date: Sun, 20 Apr 2025 00:37:16 +0530 Subject: [PATCH 1/6] Switch to LinkedBlockingDeque for LIFO thread reuse to address WSTP thread leaks --- .../scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 8 ++++---- .../src/main/scala/cats/effect/unsafe/WorkerThread.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index a7139724e9..32ae538d60 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -39,7 +39,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration} import java.time.Instant import java.time.temporal.ChronoField -import java.util.concurrent.{LinkedTransferQueue, ThreadLocalRandom} +import java.util.concurrent.{LinkedBlockingDeque, ThreadLocalRandom} import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, @@ -131,8 +131,8 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( */ private[this] val state: AtomicInteger = new AtomicInteger(threadCount << UnparkShift) - private[unsafe] val cachedThreads: LinkedTransferQueue[WorkerThread[P]] = - new LinkedTransferQueue + private[unsafe] val cachedThreads: LinkedBlockingDeque[WorkerThread[P]] = + new LinkedBlockingDeque /** * The shutdown latch of the work stealing thread pool. @@ -751,7 +751,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( var t: WorkerThread[P] = null while ({ - t = cachedThreads.poll() + t = cachedThreads.pollLast() t ne null }) { t.interrupt() diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 615fe8804e..553ff87870 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -732,7 +732,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( // by another thread in the future. val len = runtimeBlockingExpiration.length val unit = runtimeBlockingExpiration.unit - if (pool.cachedThreads.tryTransfer(this, len, unit)) { + if (pool.cachedThreads.offerFirst(this, len, unit)) { // Someone accepted the transfer of this thread and will transfer the state soon. val newState = stateTransfer.take() init(newState) @@ -928,7 +928,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( // Set the name of this thread to a blocker prefixed name. setName(s"$prefix-$nameIndex") - val cached = pool.cachedThreads.poll() + val cached = pool.cachedThreads.pollLast() if (cached ne null) { // There is a cached worker thread that can be reused. val idx = index From c5f36ffb5703f66b245e8d4df9920e9d56970b3b Mon Sep 17 00:00:00 2001 From: Shrey Pant Date: Sun, 20 Apr 2025 01:08:45 +0530 Subject: [PATCH 2/6] changed pollLast to pollFirst --- .../main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 2 +- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 32ae538d60..376b91aa25 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -751,7 +751,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( var t: WorkerThread[P] = null while ({ - t = cachedThreads.pollLast() + t = cachedThreads.pollFirst() t ne null }) { t.interrupt() diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 553ff87870..f19683081a 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -928,7 +928,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( // Set the name of this thread to a blocker prefixed name. setName(s"$prefix-$nameIndex") - val cached = pool.cachedThreads.pollLast() + val cached = pool.cachedThreads.pollFirst() if (cached ne null) { // There is a cached worker thread that can be reused. val idx = index From 869c9544875b22f25c0f62121c7511caffcf806d Mon Sep 17 00:00:00 2001 From: Shrey Pant Date: Sun, 27 Apr 2025 00:22:06 +0530 Subject: [PATCH 3/6] added capacity to the LinkedBlockingDeque to simulate a synchronous deque --- .../main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 376b91aa25..d883688583 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -132,7 +132,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( private[this] val state: AtomicInteger = new AtomicInteger(threadCount << UnparkShift) private[unsafe] val cachedThreads: LinkedBlockingDeque[WorkerThread[P]] = - new LinkedBlockingDeque + new LinkedBlockingDeque(1) /** * The shutdown latch of the work stealing thread pool. From cafe75ae2014e1b9a403bf188c967871c9c3bea9 Mon Sep 17 00:00:00 2001 From: Shrey Pant Date: Sat, 3 May 2025 13:35:25 +0530 Subject: [PATCH 4/6] implemented new approach of pool level SynchronousQueue[TransferState] and ConcurrentHashMap to keep track of blocker threads. --- .../unsafe/WorkStealingThreadPool.scala | 20 +++++----- .../cats/effect/unsafe/WorkerThread.scala | 40 +++++++++++-------- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index d883688583..6bec0dca54 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -39,7 +39,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration} import java.time.Instant import java.time.temporal.ChronoField -import java.util.concurrent.{LinkedBlockingDeque, ThreadLocalRandom} +import java.util.concurrent.{ConcurrentHashMap, SynchronousQueue, ThreadLocalRandom} import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, @@ -131,8 +131,11 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( */ private[this] val state: AtomicInteger = new AtomicInteger(threadCount << UnparkShift) - private[unsafe] val cachedThreads: LinkedBlockingDeque[WorkerThread[P]] = - new LinkedBlockingDeque(1) + private[unsafe] val stateTransferQueue: SynchronousQueue[WorkerThread.TransferState] = + new SynchronousQueue[WorkerThread.TransferState](false) + + private[unsafe] val blockerThreads: ConcurrentHashMap[WorkerThread[P], java.lang.Boolean] = + new ConcurrentHashMap() /** * The shutdown latch of the work stealing thread pool. @@ -749,14 +752,11 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( system.close() } - var t: WorkerThread[P] = null + var ts: WorkerThread.TransferState = new WorkerThread.TransferState() while ({ - t = cachedThreads.pollFirst() - t ne null - }) { - t.interrupt() - // don't bother joining, cached threads are not doing anything interesting - } + ts = stateTransferQueue.poll() + ts ne null + }) {} // Drain the external queue. externalQueue.clear() diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index f19683081a..8b05aa4de0 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -26,7 +26,7 @@ import scala.concurrent.{BlockContext, CanAwait} import scala.concurrent.duration.{Duration, FiniteDuration} import java.lang.Long.MIN_VALUE -import java.util.concurrent.{ArrayBlockingQueue, ThreadLocalRandom} +import java.util.concurrent.{ThreadLocalRandom, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import WorkerThread.{Metrics, TransferState} @@ -110,7 +110,6 @@ private[effect] final class WorkerThread[P <: AnyRef]( */ private[this] var _active: Runnable = _ - private val stateTransfer: ArrayBlockingQueue[TransferState] = new ArrayBlockingQueue(1) private[this] val runtimeBlockingExpiration: Duration = pool.runtimeBlockingExpiration private[effect] var currentIOFiber: IOFiber[?] = _ @@ -714,7 +713,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( if (blocking) { // The worker thread was blocked before. It is no longer part of the // core pool and needs to be cached. - + val stateToTransfer = transferState // First of all, remove the references to data structures of the core // pool because they have already been transferred to another thread // which took the place of this one. @@ -728,24 +727,32 @@ private[effect] final class WorkerThread[P <: AnyRef]( transferState = null try { - // Try to transfer this thread via the cached threads data structure, to be picked up + // Try to transfer this thread via the stateTransferQueue to be picked up // by another thread in the future. + val st = stateToTransfer val len = runtimeBlockingExpiration.length val unit = runtimeBlockingExpiration.unit - if (pool.cachedThreads.offerFirst(this, len, unit)) { + val timeoutNanos = unit.toNanos(len) + + if (pool.stateTransferQueue.offer(st)) { // Someone accepted the transfer of this thread and will transfer the state soon. - val newState = stateTransfer.take() - init(newState) + val newState = pool.stateTransferQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS) + if (newState ne null) { + init(newState) + } else { + // The timeout elapsed and no one woke up this thread. It's time to exit. + pool.blockedWorkerThreadCounter.decrementAndGet() + return + } } else { - // The timeout elapsed and no one woke up this thread. It's time to exit. + // Nobody polling, spawn new replacement was done in prepareForBlocking pool.blockedWorkerThreadCounter.decrementAndGet() return } } catch { case _: InterruptedException => // This thread was interrupted while cached. This should only happen - // during the shutdown of the pool. Nothing else to be done, just - // exit. + // during the shutdown of the pool. Nothing else to be done, just exit. return } } @@ -928,15 +935,16 @@ private[effect] final class WorkerThread[P <: AnyRef]( // Set the name of this thread to a blocker prefixed name. setName(s"$prefix-$nameIndex") - val cached = pool.cachedThreads.pollFirst() - if (cached ne null) { + val ts = transferState + val available = pool.stateTransferQueue.poll(0, TimeUnit.MILLISECONDS) + if (available ne null) { // There is a cached worker thread that can be reused. val idx = index - pool.replaceWorker(idx, cached) + pool.replaceWorker(idx, this) // Transfer the data structures to the cached thread and wake it up. - transferState.index = idx - transferState.tick = tick + 1 - val _ = cached.stateTransfer.offer(transferState) + ts.index = idx + ts.tick = tick + 1 + val _ = pool.stateTransferQueue.offer(ts) } else { // Spawn a new `WorkerThread`, a literal clone of this one. It is safe to // transfer ownership of the local queue and the parked signal to the new From cbdad86caae4dada2e157e8e7f9b1b5c9d7bedc7 Mon Sep 17 00:00:00 2001 From: Shrey Pant Date: Sun, 18 May 2025 02:39:42 +0530 Subject: [PATCH 5/6] Fixed several inconsistencies and reimplemented the SynchronousQueue and ConcurrentHashMap idea --- .../unsafe/WorkStealingThreadPool.scala | 13 ++-- .../cats/effect/unsafe/WorkerThread.scala | 59 ++++++++++--------- 2 files changed, 38 insertions(+), 34 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 6bec0dca54..07da2bfb5c 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -131,7 +131,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( */ private[this] val state: AtomicInteger = new AtomicInteger(threadCount << UnparkShift) - private[unsafe] val stateTransferQueue: SynchronousQueue[WorkerThread.TransferState] = + private[unsafe] val transferStateQueue: SynchronousQueue[WorkerThread.TransferState] = new SynchronousQueue[WorkerThread.TransferState](false) private[unsafe] val blockerThreads: ConcurrentHashMap[WorkerThread[P], java.lang.Boolean] = @@ -752,11 +752,12 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( system.close() } - var ts: WorkerThread.TransferState = new WorkerThread.TransferState() - while ({ - ts = stateTransferQueue.poll() - ts ne null - }) {} + val it = blockerThreads.keySet().iterator() + while (it.hasNext()) { + val t = it.next() + t.interrupt() + // don't bother joining, cached threads are not doing anything interesting + } // Drain the external queue. externalQueue.clear() diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 8b05aa4de0..c97ad4cee9 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -26,7 +26,7 @@ import scala.concurrent.{BlockContext, CanAwait} import scala.concurrent.duration.{Duration, FiniteDuration} import java.lang.Long.MIN_VALUE -import java.util.concurrent.{ThreadLocalRandom, TimeUnit} +import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.atomic.AtomicBoolean import WorkerThread.{Metrics, TransferState} @@ -713,7 +713,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( if (blocking) { // The worker thread was blocked before. It is no longer part of the // core pool and needs to be cached. - val stateToTransfer = transferState + // First of all, remove the references to data structures of the core // pool because they have already been transferred to another thread // which took the place of this one. @@ -727,32 +727,31 @@ private[effect] final class WorkerThread[P <: AnyRef]( transferState = null try { - // Try to transfer this thread via the stateTransferQueue to be picked up + // Try to transfer this thread via the cached threads data structure, to be picked up // by another thread in the future. - val st = stateToTransfer val len = runtimeBlockingExpiration.length val unit = runtimeBlockingExpiration.unit - val timeoutNanos = unit.toNanos(len) - if (pool.stateTransferQueue.offer(st)) { - // Someone accepted the transfer of this thread and will transfer the state soon. - val newState = pool.stateTransferQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS) - if (newState ne null) { - init(newState) - } else { - // The timeout elapsed and no one woke up this thread. It's time to exit. - pool.blockedWorkerThreadCounter.decrementAndGet() - return - } + // Try to poll for a new state from the transfer queue + val newState = pool.transferStateQueue.poll(len, unit) + + if (newState ne null) { + // Got a state to take over + init(newState) + } else { - // Nobody polling, spawn new replacement was done in prepareForBlocking + // No state to take over after timeout, exit pool.blockedWorkerThreadCounter.decrementAndGet() + // Remove from blocker threads map if present + pool.blockerThreads.remove(this) return } } catch { case _: InterruptedException => // This thread was interrupted while cached. This should only happen - // during the shutdown of the pool. Nothing else to be done, just exit. + // during the shutdown of the pool. Nothing else to be done, just + // exit. + pool.blockerThreads.remove(this) return } } @@ -935,16 +934,18 @@ private[effect] final class WorkerThread[P <: AnyRef]( // Set the name of this thread to a blocker prefixed name. setName(s"$prefix-$nameIndex") - val ts = transferState - val available = pool.stateTransferQueue.poll(0, TimeUnit.MILLISECONDS) - if (available ne null) { - // There is a cached worker thread that can be reused. - val idx = index - pool.replaceWorker(idx, this) - // Transfer the data structures to the cached thread and wake it up. - ts.index = idx - ts.tick = tick + 1 - val _ = pool.stateTransferQueue.offer(ts) + val idx = index + + // Prepare the transfer state + transferState.index = idx + transferState.tick = tick + 1 + + val _ = pool.blockerThreads.put(this, java.lang.Boolean.TRUE) + + if (pool.transferStateQueue.offer(transferState)) { + // If successful, a waiting thread will pick it up + // Register this thread in the blockerThreads map + } else { // Spawn a new `WorkerThread`, a literal clone of this one. It is safe to // transfer ownership of the local queue and the parked signal to the new @@ -969,7 +970,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( system, _poller, metrics, - transferState, + new WorkerThread.TransferState, pool) // Make sure the clone gets our old name: val clonePrefix = pool.threadPrefix @@ -1010,6 +1011,8 @@ private[effect] final class WorkerThread[P <: AnyRef]( setName(s"$prefix-${_index}") blocking = false + + pool.replaceWorker(newIdx, this) } /** From 40a00ca9d6a2115df5e3b4e25450416737144399 Mon Sep 17 00:00:00 2001 From: Shrey Pant Date: Thu, 22 May 2025 12:06:19 +0530 Subject: [PATCH 6/6] Reverted transferState --- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index c97ad4cee9..b2c4dbb2ce 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -940,11 +940,11 @@ private[effect] final class WorkerThread[P <: AnyRef]( transferState.index = idx transferState.tick = tick + 1 + // Register this thread in the blockerThreads map val _ = pool.blockerThreads.put(this, java.lang.Boolean.TRUE) if (pool.transferStateQueue.offer(transferState)) { // If successful, a waiting thread will pick it up - // Register this thread in the blockerThreads map } else { // Spawn a new `WorkerThread`, a literal clone of this one. It is safe to @@ -970,7 +970,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( system, _poller, metrics, - new WorkerThread.TransferState, + transferState, pool) // Make sure the clone gets our old name: val clonePrefix = pool.threadPrefix