Skip to content

Commit 8681859

Browse files
zuevmaximdkhalanskyjb
authored andcommitted
Move ThreadContextElement to common
1 parent 136f9d2 commit 8681859

19 files changed

+607
-535
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,11 @@ abstract interface <#A: kotlin/Any?> kotlinx.coroutines/CompletableDeferred : ko
186186
abstract fun completeExceptionally(kotlin/Throwable): kotlin/Boolean // kotlinx.coroutines/CompletableDeferred.completeExceptionally|completeExceptionally(kotlin.Throwable){}[0]
187187
}
188188

189+
abstract interface <#A: kotlin/Any?> kotlinx.coroutines/ThreadContextElement : kotlin.coroutines/CoroutineContext.Element { // kotlinx.coroutines/ThreadContextElement|null[0]
190+
abstract fun restoreThreadContext(kotlin.coroutines/CoroutineContext, #A) // kotlinx.coroutines/ThreadContextElement.restoreThreadContext|restoreThreadContext(kotlin.coroutines.CoroutineContext;1:0){}[0]
191+
abstract fun updateThreadContext(kotlin.coroutines/CoroutineContext): #A // kotlinx.coroutines/ThreadContextElement.updateThreadContext|updateThreadContext(kotlin.coroutines.CoroutineContext){}[0]
192+
}
193+
189194
abstract interface <#A: kotlin/Throwable & kotlinx.coroutines/CopyableThrowable<#A>> kotlinx.coroutines/CopyableThrowable { // kotlinx.coroutines/CopyableThrowable|null[0]
190195
abstract fun createCopy(): #A? // kotlinx.coroutines/CopyableThrowable.createCopy|createCopy(){}[0]
191196
}

kotlinx-coroutines-core/common/src/Builders.common.kt

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import kotlinx.atomicfu.*
99
import kotlinx.coroutines.internal.*
1010
import kotlinx.coroutines.intrinsics.*
1111
import kotlinx.coroutines.selects.*
12+
import kotlin.concurrent.Volatile
1213
import kotlin.contracts.*
1314
import kotlin.coroutines.*
1415
import kotlin.coroutines.intrinsics.*
@@ -206,10 +207,124 @@ private class LazyStandaloneCoroutine(
206207
}
207208

208209
// Used by withContext when context changes, but dispatcher stays the same
209-
internal expect class UndispatchedCoroutine<in T>(
210+
internal actual class UndispatchedCoroutine<in T>actual constructor (
210211
context: CoroutineContext,
211212
uCont: Continuation<T>
212-
) : ScopeCoroutine<T>
213+
) : ScopeCoroutine<T>(if (context[UndispatchedMarker] == null) context + UndispatchedMarker else context, uCont) {
214+
215+
/**
216+
* The state of [ThreadContextElement]s associated with the current undispatched coroutine.
217+
* It is stored in a thread local because this coroutine can be used concurrently in suspend-resume race scenario.
218+
* See the followin, boiled down example with inlined `withContinuationContext` body:
219+
* ```
220+
* val state = saveThreadContext(ctx)
221+
* try {
222+
* invokeSmthWithThisCoroutineAsCompletion() // Completion implies that 'afterResume' will be called
223+
* // COROUTINE_SUSPENDED is returned
224+
* } finally {
225+
* thisCoroutine().clearThreadContext() // Concurrently the "smth" could've been already resumed on a different thread
226+
* // and it also calls saveThreadContext and clearThreadContext
227+
* }
228+
* ```
229+
*
230+
* Usage note:
231+
*
232+
* This part of the code is performance-sensitive.
233+
* It is a well-established pattern to wrap various activities into system-specific undispatched
234+
* `withContext` for the sake of logging, MDC, tracing etc., meaning that there exists thousands of
235+
* undispatched coroutines.
236+
* Each access to Java's [ThreadLocal] leaves a footprint in the corresponding Thread's `ThreadLocalMap`
237+
* that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected
238+
* when either the corresponding thread is GC'ed or it cleans up its stale entries on other TL accesses.
239+
* When such coroutines are promoted to old generation, `ThreadLocalMap`s become bloated and an arbitrary accesses to thread locals
240+
* start to consume significant amount of CPU because these maps are open-addressed and cleaned up incrementally on each access.
241+
* (You can read more about this effect as "GC nepotism").
242+
*
243+
* To avoid that, we attempt to narrow down the lifetime of this thread local as much as possible:
244+
* - It's never accessed when we are sure there are no thread context elements
245+
* - It's cleaned up via [ThreadLocal.remove] as soon as the coroutine is suspended or finished.
246+
*/
247+
private val threadStateToRecover = ThreadLocal<Pair<CoroutineContext, Any?>>()
248+
249+
/*
250+
* Indicates that a coroutine has at least one thread context element associated with it
251+
* and that 'threadStateToRecover' is going to be set in case of dispatchhing in order to preserve them.
252+
* Better than nullable thread-local for easier debugging.
253+
*
254+
* It is used as a performance optimization to avoid 'threadStateToRecover' initialization
255+
* (note: tl.get() initializes thread local),
256+
* and is prone to false-positives as it is never reset: otherwise
257+
* it may lead to logical data races between suspensions point where
258+
* coroutine is yet being suspended in one thread while already being resumed
259+
* in another.
260+
*/
261+
@Volatile
262+
private var threadLocalIsSet = false
263+
264+
init {
265+
/*
266+
* This is a hack for a very specific case in #2930 unless #3253 is implemented.
267+
* 'ThreadLocalStressTest' covers this change properly.
268+
*
269+
* The scenario this change covers is the following:
270+
* 1) The coroutine is being started as plain non kotlinx.coroutines related suspend function,
271+
* e.g. `suspend fun main` or, more importantly, Ktor `SuspendFunGun`, that is invoking
272+
* `withContext(tlElement)` which creates `UndispatchedCoroutine`.
273+
* 2) It (original continuation) is then not wrapped into `DispatchedContinuation` via `intercept()`
274+
* and goes neither through `DC.run` nor through `resumeUndispatchedWith` that both
275+
* do thread context element tracking.
276+
* 3) So thread locals never got chance to get properly set up via `saveThreadContext`,
277+
* but when `withContext` finishes, it attempts to recover thread locals in its `afterResume`.
278+
*
279+
* Here we detect precisely this situation and properly setup context to recover later.
280+
*
281+
*/
282+
if (uCont.context[ContinuationInterceptor] !is CoroutineDispatcher) {
283+
/*
284+
* We cannot just "read" the elements as there is no such API,
285+
* so we update-restore it immediately and use the intermediate value
286+
* as the initial state, leveraging the fact that thread context element
287+
* is idempotent and such situations are increasingly rare.
288+
*/
289+
val values = updateThreadContext(context, null)
290+
restoreThreadContext(context, values)
291+
saveThreadContext(context, values)
292+
}
293+
}
294+
295+
fun saveThreadContext(context: CoroutineContext, oldValue: Any?) {
296+
threadLocalIsSet = true // Specify that thread-local is touched at all
297+
threadStateToRecover.set(context to oldValue)
298+
}
299+
300+
fun clearThreadContext(): Boolean {
301+
return !(threadLocalIsSet && threadStateToRecover.get() == null).also {
302+
threadStateToRecover.remove()
303+
}
304+
}
305+
306+
override fun afterCompletionUndispatched() {
307+
clearThreadLocal()
308+
}
309+
310+
override fun afterResume(state: Any?) {
311+
clearThreadLocal()
312+
// resume undispatched -- update context but stay on the same dispatcher
313+
val result = recoverResult(state, uCont)
314+
withContinuationContext(uCont, null) {
315+
uCont.resumeWith(result)
316+
}
317+
}
318+
319+
private fun clearThreadLocal() {
320+
if (threadLocalIsSet) {
321+
threadStateToRecover.get()?.let { (ctx, value) ->
322+
restoreThreadContext(ctx, value)
323+
}
324+
threadStateToRecover.remove()
325+
}
326+
}
327+
}
213328

214329
private const val UNDECIDED = 0
215330
private const val SUSPENDED = 1

kotlinx-coroutines-core/common/src/CoroutineContext.common.kt

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package kotlinx.coroutines
22

3+
import kotlinx.coroutines.internal.*
34
import kotlin.coroutines.*
45

56
/**
@@ -20,8 +21,78 @@ public expect fun CoroutineContext.newCoroutineContext(addedContext: CoroutineCo
2021
@Suppress("PropertyName")
2122
internal expect val DefaultDelay: Delay
2223

23-
// countOrElement -- pre-cached value for ThreadContext.kt
24-
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
25-
internal expect inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T
2624
internal expect fun Continuation<*>.toDebugString(): String
2725
internal expect val CoroutineContext.coroutineName: String?
26+
27+
/**
28+
* Executes a block using a given coroutine context.
29+
*/
30+
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
31+
val oldValue = updateThreadContext(context, countOrElement)
32+
try {
33+
return block()
34+
} finally {
35+
restoreThreadContext(context, oldValue)
36+
}
37+
}
38+
39+
/**
40+
* Executes a block using a context of a given continuation.
41+
*/
42+
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T {
43+
val context = continuation.context
44+
val oldValue = updateThreadContext(context, countOrElement)
45+
val undispatchedCompletion = if (oldValue !== NO_THREAD_ELEMENTS) {
46+
// Only if some values were replaced we'll go to the slow path of figuring out where/how to restore them
47+
continuation.updateUndispatchedCompletion(context, oldValue)
48+
} else {
49+
null // fast path -- don't even try to find undispatchedCompletion as there's nothing to restore in the context
50+
}
51+
try {
52+
return block()
53+
} finally {
54+
if (undispatchedCompletion == null || undispatchedCompletion.clearThreadContext()) {
55+
restoreThreadContext(context, oldValue)
56+
}
57+
}
58+
}
59+
60+
internal fun Continuation<*>.updateUndispatchedCompletion(context: CoroutineContext, oldValue: Any?): UndispatchedCoroutine<*>? {
61+
if (this !is CoroutineStackFrame) return null
62+
/*
63+
* Fast-path to detect whether we have undispatched coroutine at all in our stack.
64+
*
65+
* Implementation note.
66+
* If we ever find that stackwalking for thread-locals is way too slow, here is another idea:
67+
* 1) Store undispatched coroutine right in the `UndispatchedMarker` instance
68+
* 2) To avoid issues with cross-dispatch boundary, remove `UndispatchedMarker`
69+
* from the context when creating dispatched coroutine in `withContext`.
70+
* Another option is to "unmark it" instead of removing to save an allocation.
71+
* Both options should work, but it requires more careful studying of the performance
72+
* and, mostly, maintainability impact.
73+
*/
74+
val potentiallyHasUndispatchedCoroutine = context[UndispatchedMarker] !== null
75+
if (!potentiallyHasUndispatchedCoroutine) return null
76+
val completion = undispatchedCompletion()
77+
completion?.saveThreadContext(context, oldValue)
78+
return completion
79+
}
80+
81+
internal tailrec fun CoroutineStackFrame.undispatchedCompletion(): UndispatchedCoroutine<*>? {
82+
// Find direct completion of this continuation
83+
val completion: CoroutineStackFrame = when (this) {
84+
is DispatchedCoroutine<*> -> return null
85+
else -> callerFrame ?: return null // something else -- not supported
86+
}
87+
if (completion is UndispatchedCoroutine<*>) return completion // found UndispatchedCoroutine!
88+
return completion.undispatchedCompletion() // walk up the call stack with tail call
89+
}
90+
91+
/**
92+
* Marker indicating that [UndispatchedCoroutine] exists somewhere up in the stack.
93+
* Used as a performance optimization to avoid stack walking where it is not necessary.
94+
*/
95+
private object UndispatchedMarker: CoroutineContext.Element, CoroutineContext.Key<UndispatchedMarker> {
96+
override val key: CoroutineContext.Key<*>
97+
get() = this
98+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package kotlinx.coroutines
2+
3+
import kotlin.coroutines.*
4+
5+
/**
6+
* Defines elements in [CoroutineContext] that are installed into thread context
7+
* every time the coroutine with this element in the context is resumed on a thread.
8+
*
9+
* Implementations of this interface define a type [S] of the thread-local state that they need to store on
10+
* resume of a coroutine and restore later on suspend. The infrastructure provides the corresponding storage.
11+
*
12+
* Example usage looks like this:
13+
*
14+
* ```
15+
* // Appends "name" of a coroutine to a current thread name when coroutine is executed
16+
* class CoroutineName(val name: String) : ThreadContextElement<String> {
17+
* // declare companion object for a key of this element in coroutine context
18+
* companion object Key : CoroutineContext.Key<CoroutineName>
19+
*
20+
* // provide the key of the corresponding context element
21+
* override val key: CoroutineContext.Key<CoroutineName>
22+
* get() = Key
23+
*
24+
* // this is invoked before coroutine is resumed on current thread
25+
* override fun updateThreadContext(context: CoroutineContext): String {
26+
* val previousName = Thread.currentThread().name
27+
* Thread.currentThread().name = "$previousName # $name"
28+
* return previousName
29+
* }
30+
*
31+
* // this is invoked after coroutine has suspended on current thread
32+
* override fun restoreThreadContext(context: CoroutineContext, oldState: String) {
33+
* Thread.currentThread().name = oldState
34+
* }
35+
* }
36+
*
37+
* // Usage
38+
* launch(Dispatchers.Main + CoroutineName("Progress bar coroutine")) { ... }
39+
* ```
40+
*
41+
* Every time this coroutine is resumed on a thread, UI thread name is updated to
42+
* "UI thread original name # Progress bar coroutine" and the thread name is restored to the original one when
43+
* this coroutine suspends.
44+
*
45+
* To use [ThreadLocal] variable within the coroutine use [ThreadLocal.asContextElement][asContextElement] function.
46+
*
47+
* ### Reentrancy and thread-safety
48+
*
49+
* Correct implementations of this interface must expect that calls to [restoreThreadContext]
50+
* may happen in parallel to the subsequent [updateThreadContext] and [restoreThreadContext] operations.
51+
* See [CopyableThreadContextElement] for advanced interleaving details.
52+
*
53+
* All implementations of [ThreadContextElement] should be thread-safe and guard their internal mutable state
54+
* within an element accordingly.
55+
*/
56+
public interface ThreadContextElement<S> : CoroutineContext.Element {
57+
/**
58+
* Updates context of the current thread.
59+
* This function is invoked before the coroutine in the specified [context] is resumed in the current thread
60+
* when the context of the coroutine this element.
61+
* The result of this function is the old value of the thread-local state that will be passed to [restoreThreadContext].
62+
* This method should handle its own exceptions and do not rethrow it. Thrown exceptions will leave coroutine which
63+
* context is updated in an undefined state and may crash an application.
64+
*
65+
* @param context the coroutine context.
66+
*/
67+
public fun updateThreadContext(context: CoroutineContext): S
68+
69+
/**
70+
* Restores context of the current thread.
71+
* This function is invoked after the coroutine in the specified [context] is suspended in the current thread
72+
* if [updateThreadContext] was previously invoked on resume of this coroutine.
73+
* The value of [oldState] is the result of the previous invocation of [updateThreadContext] and it should
74+
* be restored in the thread-local state by this function.
75+
* This method should handle its own exceptions and do not rethrow it. Thrown exceptions will leave coroutine which
76+
* context is updated in an undefined state and may crash an application.
77+
*
78+
* @param context the coroutine context.
79+
* @param oldState the value returned by the previous invocation of [updateThreadContext].
80+
*/
81+
public fun restoreThreadContext(context: CoroutineContext, oldState: S)
82+
}

0 commit comments

Comments
 (0)