Skip to content

Commit d23e881

Browse files
committed
IJPL-149823 Parallelism compensation for CoroutineDispatchers and runBlocking
update IntelliJ-patches.md expose runBlockingWithParallelismCompensation in IntellijCoroutines introduce runBlockingWithParallelismCompensation and remove parallelism compensation from runBlocking parallelismCompensation: split and inline withCompensatedParallelism so that it is only present in the stacktraces when compensation is effective IJPL-149823 Parallelism compensation for CoroutineDispatchers and runBlocking # Conflicts: # kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt 1.10.1 patch notes: * SoftLimitedDispatcher was updated to match the new version of LimitedDispatcher * named dispatchers (see usages of namedOrThis): NamedSoftParallelismDispatcher is introduced as an alternative for NamedDispatcher that fits into the soft parallelism dispatcher hierarchy # Conflicts: # IntelliJ-patches.md # kotlinx-coroutines-core/jvm/src/internal/intellij/intellij.kt
1 parent 730ba1e commit d23e881

15 files changed

+678
-27
lines changed

IntelliJ-patches.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,34 @@ We provide a single method `kotlinx.coroutines.internal.intellij.IntellijCorouti
1919
The invariant is that the result of this method is always equal to `coroutineContext` in suspending environment,
2020
and it does not change during the non-suspending execution within the same thread.
2121

22+
## Parallelism compensation for `CoroutineDispatcher`s
23+
24+
If `runBlocking` happens to be invoked on a thread from `CoroutineDispatcher`, it may cause a thread starvation problem
25+
(Kotlin#3983). This happens because `runBlocking` does not release an associated computational permit while it parks the
26+
thread. To fix this, a parallelism compensation mechanism is introduced. Some `CoroutineDispatcher`s (such as
27+
`Dispatchers.Default`, `Dispatchers.IO` and others) support `ParallelismCompensation`, meaning that these dispatchers
28+
can be notified that they should increase parallelism and parallelism limit, or they should decrease it. It is important that these
29+
are only requests and dispatchers are in full control on how and when they need to adjust the effective parallelism.
30+
It also means that the instantaneous parallelism may exceed the current allowed parallelism limit for the given dispatcher.
31+
32+
`runBlockingWithParallelismCompensation` (further abbreviated as `rBWPC`) is introduced as a counterpart of `runBlocking`
33+
with the following behavioral change. When `rBWPC` decides to park a `CoroutineDispatcher` thread, it first increases the allowed parallelism
34+
limit of the `CoroutineDispatcher`. After the thread unparks, `rBWPC` notifies the dispatcher that the parallelism limit should be lowered back.
35+
A separate function is introduced because parallelism compensation is not always a desirable behavior.
36+
37+
It is easy to see that this behavior cannot be general for `CoroutineDispatcher`s, at least because it breaks the contract
38+
of `LimitedDispatcher` (one that can be acquired via `.limitedParallelism`). It means that parallelism compensation
39+
cannot work for `LimitedDispatcher`, so `runBlockingWithParallelismCompensation` can still cause starvation issues there, but it seems rather
40+
expected.
41+
42+
Parallelism compensation support is internal and is implemented for `Dispatchers.Default` and `Dispatchers.IO`.
43+
To acquire an analogue of `limitedParallelism` dispatcher which supports parallelism compensation, use
44+
`IntellijCoroutines.softLimitedParallelism`. Be advised that not every `.limitedParallelism` call can be substituted
45+
with `.softLimitedParallelism`, e.g., `.limitedParallelism(1)` may be used as a synchronization manager and in this case
46+
exceeding the parallelism limit would eliminate this (likely expected) side effect.
47+
48+
### API
49+
- `runBlockingWithParallelismCompensation` - an analogue of `runBlocking` which also compensates parallelism of the
50+
associated coroutine dispatcher when it decides to park the thread
51+
- `CoroutineDispatcher.softLimitedParallelism` – an analogue of `.limitedParallelism` which supports
52+
parallelism compensation

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ internal class LimitedDispatcher(
130130
internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }
131131

132132
internal fun CoroutineDispatcher.namedOrThis(name: String?): CoroutineDispatcher {
133-
if (name != null) return NamedDispatcher(this, name)
133+
if (name != null) {
134+
if (this is SoftLimitedParallelism) return NamedSoftParallelismDispatcher(this, name)
135+
return NamedDispatcher(this, name)
136+
}
134137
return this
135138
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package kotlinx.coroutines.internal
2+
3+
import kotlinx.coroutines.CoroutineDispatcher
4+
import kotlinx.coroutines.DefaultDelay
5+
import kotlinx.coroutines.Delay
6+
import kotlinx.coroutines.InternalCoroutinesApi
7+
import kotlinx.coroutines.Runnable
8+
import kotlin.coroutines.CoroutineContext
9+
10+
/**
11+
* Copy of [NamedDispatcher] for SoftParallelism hierarchy of dispatchers
12+
*/
13+
internal class NamedSoftParallelismDispatcher<D>(
14+
private val dispatcher: D,
15+
private val name: String
16+
) : CoroutineDispatcher(), SoftLimitedParallelism, Delay by (dispatcher as? Delay ?: DefaultDelay)
17+
where D : CoroutineDispatcher, D : SoftLimitedParallelism
18+
{
19+
20+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
21+
22+
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatcher.dispatch(context, block)
23+
24+
@InternalCoroutinesApi
25+
override fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatcher.dispatchYield(context, block)
26+
27+
override fun toString(): String {
28+
return name
29+
}
30+
31+
override fun softLimitedParallelism(
32+
parallelism: Int,
33+
name: String?
34+
): CoroutineDispatcher {
35+
// behaves like NamedDispatcher does with LimitedDispatcher
36+
parallelism.checkParallelism()
37+
return SoftLimitedDispatcher(this, parallelism, name)
38+
}
39+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package kotlinx.coroutines.internal
2+
3+
import kotlinx.atomicfu.*
4+
import kotlinx.coroutines.*
5+
import kotlinx.coroutines.scheduling.ParallelismCompensation
6+
import kotlin.coroutines.*
7+
8+
/**
9+
* Introduced as part of IntelliJ patches.
10+
*
11+
* CoroutineDispatchers may optionally implement this interface to declare an ability to construct [SoftLimitedDispatcher]
12+
* on top of themselves. This is not possible in general case, because the worker of the underlying dispatcher must
13+
* implement [ParallelismCompensation] and properly propagate such requests to the task it is running.
14+
*/
15+
internal interface SoftLimitedParallelism {
16+
fun softLimitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher
17+
}
18+
19+
/**
20+
* Introduced as part of IntelliJ patches.
21+
*/
22+
internal fun CoroutineDispatcher.softLimitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
23+
if (this is SoftLimitedParallelism) {
24+
return this.softLimitedParallelism(parallelism, name)
25+
}
26+
// SoftLimitedDispatcher cannot be used on top of LimitedDispatcher, because the latter doesn't propagate compensation requests
27+
throw UnsupportedOperationException("CoroutineDispatcher.softLimitedParallelism cannot be applied to $this")
28+
}
29+
30+
/**
31+
* Introduced as part of IntelliJ patches.
32+
*
33+
* Shamelessly copy-pasted from [LimitedDispatcher], but [ParallelismCompensation] is
34+
* implemented for [Worker] to allow compensation.
35+
*
36+
* [ParallelismCompensation] breaks the contract of [LimitedDispatcher] so a separate class is made to implement a
37+
* dispatcher that mostly behaves as limited, but can temporarily increase parallelism if necessary.
38+
*/
39+
internal class SoftLimitedDispatcher(
40+
private val dispatcher: CoroutineDispatcher,
41+
parallelism: Int,
42+
private val name: String?
43+
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay), SoftLimitedParallelism {
44+
private val initialParallelism = parallelism
45+
// `parallelism limit - runningWorkers`; may be < 0 if decompensation is expected
46+
private val availablePermits = atomic(parallelism)
47+
48+
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
49+
50+
private val workerAllocationLock = SynchronizedObject()
51+
52+
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
53+
return super.limitedParallelism(parallelism, name)
54+
}
55+
56+
override fun softLimitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
57+
parallelism.checkParallelism()
58+
if (parallelism >= initialParallelism) return namedOrThis(name)
59+
return SoftLimitedDispatcher(this, parallelism, name)
60+
}
61+
62+
override fun dispatch(context: CoroutineContext, block: Runnable) {
63+
dispatchInternal(block) { worker ->
64+
dispatcher.safeDispatch(this, worker)
65+
}
66+
}
67+
68+
@InternalCoroutinesApi
69+
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
70+
dispatchInternal(block) { worker ->
71+
dispatcher.dispatchYield(this, worker)
72+
}
73+
}
74+
75+
/**
76+
* Tries to dispatch the given [block].
77+
* If there are not enough workers, it starts a new one via [startWorker].
78+
*/
79+
private inline fun dispatchInternal(block: Runnable, startWorker: (Worker) -> Unit) {
80+
queue.addLast(block)
81+
if (availablePermits.value <= 0) return
82+
if (!tryAllocateWorker()) return
83+
val task = obtainTaskOrDeallocateWorker() ?: return
84+
startWorker(Worker(task))
85+
}
86+
87+
/**
88+
* Tries to obtain the permit to start a new worker.
89+
*/
90+
private fun tryAllocateWorker(): Boolean {
91+
synchronized(workerAllocationLock) {
92+
val permits = availablePermits.value
93+
if (permits <= 0) return false
94+
return availablePermits.compareAndSet(permits, permits - 1)
95+
}
96+
}
97+
98+
/**
99+
* Obtains the next task from the queue, or logically deallocates the worker if the queue is empty.
100+
*/
101+
private fun obtainTaskOrDeallocateWorker(): Runnable? {
102+
val permits = availablePermits.value
103+
if (permits < 0) { // decompensation
104+
if (availablePermits.compareAndSet(permits, permits + 1)) {
105+
return null
106+
}
107+
}
108+
while (true) {
109+
when (val nextTask = queue.removeFirstOrNull()) {
110+
null -> synchronized(workerAllocationLock) {
111+
availablePermits.incrementAndGet()
112+
if (queue.size == 0) return null
113+
availablePermits.decrementAndGet()
114+
}
115+
else -> return nextTask
116+
}
117+
}
118+
}
119+
120+
override fun toString() = name ?: "$dispatcher.softLimitedParallelism($initialParallelism)"
121+
122+
/**
123+
* Every running Worker holds a permit
124+
*/
125+
private inner class Worker(private var currentTask: Runnable) : Runnable, ParallelismCompensation {
126+
override fun run() {
127+
var fairnessCounter = 0
128+
while (true) {
129+
try {
130+
currentTask.run()
131+
} catch (e: Throwable) {
132+
handleCoroutineException(EmptyCoroutineContext, e)
133+
}
134+
currentTask = obtainTaskOrDeallocateWorker() ?: return
135+
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
136+
if (++fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this@SoftLimitedDispatcher)) {
137+
// Do "yield" to let other views execute their runnable as well
138+
// Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
139+
dispatcher.safeDispatch(this@SoftLimitedDispatcher, this)
140+
return
141+
}
142+
}
143+
}
144+
145+
override fun increaseParallelismAndLimit() {
146+
val newTask = obtainTaskOrDeallocateWorker() // either increases the number of permits or we launch a new worker (which holds a permit)
147+
if (newTask != null) {
148+
dispatcher.safeDispatch(this@SoftLimitedDispatcher, Worker(newTask))
149+
}
150+
(currentTask as? ParallelismCompensation)?.increaseParallelismAndLimit()
151+
}
152+
153+
override fun decreaseParallelismLimit() {
154+
try {
155+
(currentTask as? ParallelismCompensation)?.decreaseParallelismLimit()
156+
} finally {
157+
availablePermits.decrementAndGet()
158+
}
159+
}
160+
}
161+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package kotlinx.coroutines.scheduling
2+
3+
/**
4+
* Introduced as part of IntelliJ patches.
5+
*
6+
* Runnables that are dispatched on [kotlinx.coroutines.CoroutineDispatcher] may optionally implement this interface
7+
* to declare an ability to compensate the associated parallelism resource.
8+
*/
9+
internal interface ParallelismCompensation {
10+
/**
11+
* Should increase both the limit and the effective parallelism.
12+
*/
13+
fun increaseParallelismAndLimit()
14+
15+
/**
16+
* Should only decrease the parallelism limit. The effective parallelism may temporarily stay higher than this limit.
17+
* Runnable should take care of checking whether effective parallelism needs to decrease to meet the desired limit.
18+
*/
19+
fun decreaseParallelismLimit()
20+
}

kotlinx-coroutines-core/jvm/src/Builders.kt

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package kotlinx.coroutines
66

7-
import java.util.concurrent.locks.*
7+
import kotlinx.coroutines.scheduling.withCompensatedParallelism
88
import kotlin.contracts.*
99
import kotlin.coroutines.*
1010

@@ -46,6 +46,21 @@ import kotlin.coroutines.*
4646
*/
4747
@Throws(InterruptedException::class)
4848
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
49+
contract {
50+
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
51+
}
52+
return runBlocking(context, compensateParallelism = false, block)
53+
}
54+
55+
@Throws(InterruptedException::class)
56+
internal fun <T> runBlockingWithParallelismCompensation(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
57+
contract {
58+
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
59+
}
60+
return runBlocking(context, compensateParallelism = true, block)
61+
}
62+
63+
private fun <T> runBlocking(context: CoroutineContext, compensateParallelism: Boolean, block: suspend CoroutineScope.() -> T): T {
4964
contract {
5065
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
5166
}
@@ -64,15 +79,16 @@ public actual fun <T> runBlocking(context: CoroutineContext, block: suspend Coro
6479
?: ThreadLocalEventLoop.currentOrNull()
6580
newContext = GlobalScope.newCoroutineContext(context)
6681
}
67-
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
82+
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop, compensateParallelism)
6883
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
6984
return coroutine.joinBlocking()
7085
}
7186

7287
private class BlockingCoroutine<T>(
7388
parentContext: CoroutineContext,
7489
private val blockedThread: Thread,
75-
private val eventLoop: EventLoop?
90+
private val eventLoop: EventLoop?,
91+
private val compensateParallelism: Boolean,
7692
) : AbstractCoroutine<T>(parentContext, true, true) {
7793

7894
override val isScopedCoroutine: Boolean get() = true
@@ -95,7 +111,15 @@ private class BlockingCoroutine<T>(
95111
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
96112
// note: process next even may loose unpark flag, so check if completed before parking
97113
if (isCompleted) break
98-
parkNanos(this, parkNanos)
114+
if (parkNanos > 0) {
115+
if (compensateParallelism) {
116+
withCompensatedParallelism {
117+
parkNanos(this, parkNanos)
118+
}
119+
} else {
120+
parkNanos(this, parkNanos)
121+
}
122+
}
99123
}
100124
} finally { // paranoia
101125
eventLoop?.decrementUseCount()

kotlinx-coroutines-core/jvm/src/internal/intellij/intellij.kt

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,16 @@
33
*/
44
package kotlinx.coroutines.internal.intellij
55

6+
import kotlinx.coroutines.CoroutineDispatcher
7+
import kotlinx.coroutines.CoroutineScope
68
import kotlinx.coroutines.InternalCoroutinesApi
79
import kotlin.coroutines.*
10+
import kotlinx.coroutines.internal.softLimitedParallelism as softLimitedParallelismImpl
11+
import kotlinx.coroutines.internal.SoftLimitedDispatcher
12+
import kotlinx.coroutines.runBlockingWithParallelismCompensation as runBlockingWithParallelismCompensationImpl
13+
import kotlinx.coroutines.Dispatchers
14+
import kotlin.coroutines.CoroutineContext
15+
import kotlin.jvm.Throws
816

917
internal val currentContextThreadLocal : ThreadLocal<CoroutineContext?> = ThreadLocal.withInitial { null }
1018

@@ -25,4 +33,31 @@ public object IntellijCoroutines {
2533
public fun currentThreadCoroutineContext(): CoroutineContext? {
2634
return currentContextThreadLocal.get()
2735
}
36+
37+
/**
38+
* An analogue of [runBlocking][kotlinx.coroutines.runBlocking] that [compensates parallelism][kotlinx.coroutines.scheduling.withCompensatedParallelism]
39+
* while the coroutine is not complete and the associated event loop has no immediate work available.
40+
*/
41+
@Throws(InterruptedException::class)
42+
public fun <T> runBlockingWithParallelismCompensation(
43+
context: CoroutineContext,
44+
block: suspend CoroutineScope.() -> T
45+
): T =
46+
runBlockingWithParallelismCompensationImpl(context, block)
47+
48+
/**
49+
* Constructs a [SoftLimitedDispatcher] from the specified [CoroutineDispatcher].
50+
* [SoftLimitedDispatcher] behaves as [LimitedDispatcher][kotlinx.coroutines.internal.LimitedDispatcher] but allows
51+
* temporarily exceeding the parallelism limit in case [parallelism compensation][kotlinx.coroutines.scheduling.withCompensatedParallelism]
52+
* was requested (e.g., by [kotlinx.coroutines.runBlocking]).
53+
*
54+
* This extension can only be used on instances of [Dispatchers.Default], [Dispatchers.IO] and also on what this extension
55+
* has returned. Throws [UnsupportedOperationException] if [this] does not support the parallelism compensation mechanism.
56+
*/
57+
public fun CoroutineDispatcher.softLimitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher =
58+
softLimitedParallelismImpl(parallelism, name)
59+
60+
@Deprecated("use named version", level = DeprecationLevel.HIDDEN)
61+
public fun CoroutineDispatcher.softLimitedParallelism(parallelism: Int): CoroutineDispatcher =
62+
softLimitedParallelismImpl(parallelism, null)
2863
}

0 commit comments

Comments
 (0)