-
Notifications
You must be signed in to change notification settings - Fork 554
Fix #4382: Fix thread leak in WSTP by replacing LinkedTransferQueue with SynchronousQueue and ConcurrentHashMap #4388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: series/3.6.x
Are you sure you want to change the base?
Changes from 4 commits
16f2616
c5f36ff
869c954
cafe75a
cbdad86
40a00ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,7 @@ import scala.concurrent.{BlockContext, CanAwait} | |
import scala.concurrent.duration.{Duration, FiniteDuration} | ||
|
||
import java.lang.Long.MIN_VALUE | ||
import java.util.concurrent.{ArrayBlockingQueue, ThreadLocalRandom} | ||
import java.util.concurrent.{ThreadLocalRandom, TimeUnit} | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
|
||
import WorkerThread.{Metrics, TransferState} | ||
|
@@ -110,7 +110,6 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |
*/ | ||
private[this] var _active: Runnable = _ | ||
|
||
private val stateTransfer: ArrayBlockingQueue[TransferState] = new ArrayBlockingQueue(1) | ||
private[this] val runtimeBlockingExpiration: Duration = pool.runtimeBlockingExpiration | ||
|
||
private[effect] var currentIOFiber: IOFiber[?] = _ | ||
|
@@ -714,7 +713,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |
if (blocking) { | ||
// The worker thread was blocked before. It is no longer part of the | ||
// core pool and needs to be cached. | ||
|
||
val stateToTransfer = transferState | ||
// First of all, remove the references to data structures of the core | ||
// pool because they have already been transferred to another thread | ||
// which took the place of this one. | ||
|
@@ -728,24 +727,32 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |
transferState = null | ||
|
||
try { | ||
// Try to transfer this thread via the cached threads data structure, to be picked up | ||
// Try to transfer this thread via the stateTransferQueue to be picked up | ||
// by another thread in the future. | ||
val st = stateToTransfer | ||
val len = runtimeBlockingExpiration.length | ||
val unit = runtimeBlockingExpiration.unit | ||
if (pool.cachedThreads.tryTransfer(this, len, unit)) { | ||
val timeoutNanos = unit.toNanos(len) | ||
|
||
if (pool.stateTransferQueue.offer(st)) { | ||
// Someone accepted the transfer of this thread and will transfer the state soon. | ||
val newState = stateTransfer.take() | ||
init(newState) | ||
val newState = pool.stateTransferQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS) | ||
if (newState ne null) { | ||
init(newState) | ||
} else { | ||
// The timeout elapsed and no one woke up this thread. It's time to exit. | ||
pool.blockedWorkerThreadCounter.decrementAndGet() | ||
return | ||
} | ||
} else { | ||
// The timeout elapsed and no one woke up this thread. It's time to exit. | ||
// Nobody polling, spawn new replacement was done in prepareForBlocking | ||
pool.blockedWorkerThreadCounter.decrementAndGet() | ||
return | ||
} | ||
} catch { | ||
case _: InterruptedException => | ||
// This thread was interrupted while cached. This should only happen | ||
// during the shutdown of the pool. Nothing else to be done, just | ||
// exit. | ||
// during the shutdown of the pool. Nothing else to be done, just exit. | ||
return | ||
} | ||
} | ||
|
@@ -928,15 +935,16 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |
// Set the name of this thread to a blocker prefixed name. | ||
setName(s"$prefix-$nameIndex") | ||
|
||
val cached = pool.cachedThreads.poll() | ||
if (cached ne null) { | ||
val ts = transferState | ||
val available = pool.stateTransferQueue.poll(0, TimeUnit.MILLISECONDS) | ||
if (available ne null) { | ||
// There is a cached worker thread that can be reused. | ||
val idx = index | ||
pool.replaceWorker(idx, cached) | ||
pool.replaceWorker(idx, this) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this change doesn't make sense. The old code used to take a thread out of the cache and promote it to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh ok I see it now, will try to fix this . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am thinking of adding a thread reference field in Transfer state to pass on the cached WorkerThread |
||
// Transfer the data structures to the cached thread and wake it up. | ||
transferState.index = idx | ||
transferState.tick = tick + 1 | ||
val _ = cached.stateTransfer.offer(transferState) | ||
ts.index = idx | ||
ts.tick = tick + 1 | ||
val _ = pool.stateTransferQueue.offer(ts) | ||
} else { | ||
// Spawn a new `WorkerThread`, a literal clone of this one. It is safe to | ||
// transfer ownership of the local queue and the parked signal to the new | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to do this to avoid NPE in
pool.stateTransferQueue.offer(st)
, but I think this is causing the error