Skip to content

Commit ebfcd9b

Browse files
AlexVanGogenvsalavatov
authored andcommitted
IDEA-352355 Asynchronous stack traces for flows in the IDEA debugger
Patched during cherry-pick to 1.10.1. File with conflicts: BufferedChannel.kt Squashed commits as of version 1.8.0-intellij-11 Don't make unnecessary NULL unboxings (cherry picked from commit 0267812) Don't wrap suspend function (cherry picked from commit 8684cad) Get rid of changes in the public API -- bring back CoroutineChannel support (cherry picked from commit 9836a6b) Get rid of changes in the public API (cherry picked from commit f852554) Support SelectImplementation This brings support for select-based flow operators, such as `timeout`. (cherry picked from commit db906d8) Support BufferedChannel Before, `buffer` operations and such were only supported partially, and in some cases async stack traces were working only in 50% collects. For example, when several flows are merged with `flattenMerge` and emit values simultaneously. This change seems large, changing a lot of lines in BufferedChannel.kt, but most of them are effectively refactoring (propagation of a wrapped value). (cherry picked from commit 8be4def) Discard strict double-wrapping check We decided not to go with it, as it may dump a lot of error messages to a clueless user's console. (cherry picked from commit 0efa558) Enhance support for async stack traces in flows * simplify instrumentation by making a single insertion point source instead of having one in every class * handle a double-wrapping case which leads to errors; allow agent to choose how to handle it * support more commonly used operators (such as `scan`, `buffer`, `debounce` with dynamic timeout) Unfortunately, this change doesn't cover all possible scenarios of using flows, as many of them interoperate with `Channel`s, and it should be addressed separately. (cherry picked from commit 00cb4e5) Prepare shared flows for the debugger agent to support async stack traces The agent needs three entities to establish a proper asynchronous stack traces connection: - a capture point -- method that indicates the stack trace that precedes the current stack trace; - an insertion point -- method within the current stack trace; - a key -- an object that is present in both points and is unique enough to bridge two points properly. This change tweaks the code a bit to introduce the three entities in MutableSharedFlow and MutableStateFlow. The key for MutableSharedFlow is the element itself. For MutableSharedFlow, the element is wrapped into a unique object to prevent bridging mistakes when two equal elements are emitted from different places. (cherry picked from commit 75107bc) # Conflicts: # kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt # Conflicts: # IntelliJ-patches.md
1 parent d23e881 commit ebfcd9b

File tree

10 files changed

+194
-52
lines changed

10 files changed

+194
-52
lines changed

IntelliJ-patches.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,42 @@ exceeding the parallelism limit would eliminate this (likely expected) side effe
5050
associated coroutine dispatcher when it decides to park the thread
5151
- `CoroutineDispatcher.softLimitedParallelism` – an analogue of `.limitedParallelism` which supports
5252
parallelism compensation
53+
54+
## Asynchronous stack traces for flows in the IDEA debugger
55+
56+
The agent needs three entities to establish a proper asynchronous stack traces connection:
57+
- a capture point — method that indicates the stack trace that precedes the current stack trace;
58+
- an insertion point — method within the current stack trace;
59+
- a key — an object that is present in both points and is unique enough to bridge two stack traces properly.
60+
61+
The key for MutableStateFlow is the element itself. For MutableSharedFlow, the element is wrapped into a unique object to prevent bridging mistakes when two equal elements are emitted from different places.
62+
63+
Most of the operators applicable to flows (such as `map`, `scan`, `debounce`, `timeout`, `buffer`) are supported. As some of them use an intermediary flow inside, the transferred values are wrapped and unwrapped the same way as in MutableSharedFlow.
64+
It means there may be all-library async stack traces between a stack trace containing `emit` and a stack trace containing `collect`.
65+
66+
### API
67+
68+
Some logic related to instrumentation was extracted to separate methods so that the debugger agent could instrument it properly:
69+
70+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternal` -- wrapper class used to create a unique object for the debugger agent
71+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternal` -- returns passed argument by default; the agent instruments it to call `wrapInternalDebuggerCapture` instead
72+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternalDebuggerCapture` -- wraps passed arguments into a `FlowValueWrapperInternal`; only used after transformation.
73+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternal` -- returns passed argument by default; the agent instruments it to call `unwrapInternalDebuggerCapture` instead
74+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternalDebuggerCapture` -- unwraps passed argument so it returns the original value; only used after transformation
75+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapTyped` -- utility function served to ease casting to a real underlying type
76+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.emitInternal(FlowCollector, value)` -- alternative of a regular `FlowCollector.emit` that supports insertion points; if there is a `FlowCollector`, its `emit` call can be replaced with `emitInternal` so this case would also be supported for constructing async stack traces
77+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.debuggerCapture` -- common insertion point for a debugger agent; simplifies instrumentation; the value is always being unwrapped inside.
78+
79+
One internal method was added to `BufferedChannel`: `emitAllInternal`. This method ensures the value will be unwrapped in an insertion point.
80+
81+
One internal method was added to `flow/Channels.kt`: `emitAllInternal`. It emits all values, like usual, but also considers wrapping/unwrapping supported in `BufferedChannel`.
82+
83+
One internal method was added to `ChannelCoroutine`: `emitAllInternal` serves to bridge its delegate and the method above.
84+
85+
One internal method was added to `BufferedChannelIterator`: `nextInternal` -- same as `next` but may return a wrapped value. It should only be used with a function that is capable of unwrapping the value (see `BufferedChannel.emitAll` and `BufferedChannelIterator.next`), so there's a guarantee a wrapped value will always unwrap before emitting.
86+
87+
Why not just let `next` return a maybe wrapped value? That's because it is heavily used outside a currently supported scope. For example, one may just indirectly call it from a for-loop. In this case, unwrapping will never happen, and a user will get a handful of `ClassCastException`s.
88+
89+
Changes were made to lambda parameter `onElementRetrieved` in `BufferedChannel<E>` methods: now they accept `Any?` instead of `E` because now they may be given a wrapped value.
90+
91+
`SelectImplementation.complete` now uses `debuggerCapture` to properly propagate value that might come from flows.

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import kotlinx.coroutines.*
77
import kotlinx.coroutines.channels.ChannelResult.Companion.closed
88
import kotlinx.coroutines.channels.ChannelResult.Companion.failure
99
import kotlinx.coroutines.channels.ChannelResult.Companion.success
10+
import kotlinx.coroutines.flow.FlowCollector
11+
import kotlinx.coroutines.flow.internal.*
1012
import kotlinx.coroutines.internal.*
1113
import kotlinx.coroutines.selects.*
1214
import kotlinx.coroutines.selects.TrySelectDetailedResult.*
@@ -684,7 +686,7 @@ internal open class BufferedChannel<E>(
684686
protected open fun onReceiveDequeued() {}
685687

686688
override suspend fun receive(): E =
687-
receiveImpl( // <-- this is an inline function
689+
receiveImpl<E>( // <-- this is an inline function
688690
// Do not create a continuation until it is required;
689691
// it is created later via [onNoWaiterSuspend], if needed.
690692
waiter = null,
@@ -693,7 +695,7 @@ internal open class BufferedChannel<E>(
693695
// Also, inform `BufferedChannel` extensions that
694696
// synchronization of this receive operation is completed.
695697
onElementRetrieved = { element ->
696-
return element
698+
return unwrapTyped<E>(element)
697699
},
698700
// As no waiter is provided, suspension is impossible.
699701
onSuspend = { _, _, _ -> error("unexpected") },
@@ -726,7 +728,7 @@ internal open class BufferedChannel<E>(
726728
// specified, we need to invoke it in the latter case.
727729
onElementRetrieved = { element ->
728730
val onCancellation = onUndeliveredElement?.bindCancellationFun()
729-
cont.resume(element, onCancellation)
731+
cont.resume(unwrapTyped(element), onCancellation)
730732
},
731733
onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
732734
)
@@ -752,7 +754,7 @@ internal open class BufferedChannel<E>(
752754
receiveImpl( // <-- this is an inline function
753755
waiter = null,
754756
onElementRetrieved = { element ->
755-
success(element)
757+
success(unwrapTyped(element))
756758
},
757759
onSuspend = { _, _, _ -> error("unexpected") },
758760
onClosed = { closed(closeCause) },
@@ -769,7 +771,8 @@ internal open class BufferedChannel<E>(
769771
segment, index, r,
770772
waiter = waiter,
771773
onElementRetrieved = { element ->
772-
cont.resume(success(element), onUndeliveredElement?.bindCancellationFunResult())
774+
val unwrapped = unwrapTyped<E>(element)
775+
cont.resume(success(unwrapped), onUndeliveredElement?.bindCancellationFunResult())
773776
},
774777
onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
775778
)
@@ -802,7 +805,7 @@ internal open class BufferedChannel<E>(
802805
// Store an already interrupted receiver in case of suspension.
803806
waiter = INTERRUPTED_RCV,
804807
// Finish when an element is successfully retrieved.
805-
onElementRetrieved = { element -> success(element) },
808+
onElementRetrieved = { element -> success(unwrapTyped(element)) },
806809
// On suspension, the `INTERRUPTED_RCV` token has been
807810
// installed, and this `tryReceive()` must fail.
808811
onSuspend = { segm, _, globalIndex ->
@@ -866,7 +869,7 @@ internal open class BufferedChannel<E>(
866869
// Clean the reference to the previous segment.
867870
segment.cleanPrev()
868871
@Suppress("UNCHECKED_CAST")
869-
onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
872+
onUndeliveredElement?.callUndeliveredElementCatchingException(unwrapTyped(updCellResult))?.let { throw it }
870873
}
871874
}
872875
}
@@ -884,7 +887,7 @@ internal open class BufferedChannel<E>(
884887
/* This lambda is invoked when an element has been
885888
successfully retrieved, either from the buffer or
886889
by making a rendezvous with a suspended sender. */
887-
onElementRetrieved: (element: E) -> R,
890+
onElementRetrieved: (element: Any?) -> R,
888891
/* This lambda is called when the operation suspends in the cell
889892
specified by the segment and its global and in-segment indices. */
890893
onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
@@ -954,7 +957,7 @@ internal open class BufferedChannel<E>(
954957
// Clean the reference to the previous segment before finishing.
955958
segment.cleanPrev()
956959
@Suppress("UNCHECKED_CAST")
957-
onElementRetrieved(updCellResult as E)
960+
onElementRetrieved(updCellResult)
958961
}
959962
}
960963
}
@@ -972,7 +975,7 @@ internal open class BufferedChannel<E>(
972975
/* This lambda is invoked when an element has been
973976
successfully retrieved, either from the buffer or
974977
by making a rendezvous with a suspended sender. */
975-
onElementRetrieved: (element: E) -> Unit,
978+
onElementRetrieved: (element: Any?) -> Unit,
976979
/* This lambda is called when the channel is observed
977980
in the closed state and no waiting senders is found,
978981
which means that it is closed for receiving. */
@@ -1561,7 +1564,7 @@ internal open class BufferedChannel<E>(
15611564
private val onUndeliveredElementReceiveCancellationConstructor: OnCancellationConstructor? = onUndeliveredElement?.let {
15621565
{ select: SelectInstance<*>, _: Any?, element: Any? ->
15631566
{ _, _, _ ->
1564-
if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(element as E, select.context)
1567+
if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(unwrapTyped(element), select.context)
15651568
}
15661569
}
15671570
}
@@ -1572,6 +1575,13 @@ internal open class BufferedChannel<E>(
15721575

15731576
override fun iterator(): ChannelIterator<E> = BufferedChannelIterator()
15741577

1578+
internal suspend fun emitAllInternal(collector: FlowCollector<E>) {
1579+
val iterator = iterator() as BufferedChannel.BufferedChannelIterator
1580+
while (iterator.hasNext()) {
1581+
collector.emitInternal(iterator.nextInternal())
1582+
}
1583+
}
1584+
15751585
/**
15761586
* The key idea is that an iterator is a special receiver type,
15771587
* which should be resumed differently to [receive] and [onReceive]
@@ -1666,7 +1676,7 @@ internal open class BufferedChannel<E>(
16661676
onElementRetrieved = { element ->
16671677
this.receiveResult = element
16681678
this.continuation = null
1669-
cont.resume(true, onUndeliveredElement?.bindCancellationFun(element))
1679+
cont.resume(true, onUndeliveredElement?.bindCancellationFun(unwrapTyped(element)))
16701680
},
16711681
onClosed = { onClosedHasNextNoWaiterSuspend() }
16721682
)
@@ -1694,8 +1704,17 @@ internal open class BufferedChannel<E>(
16941704
}
16951705
}
16961706

1697-
@Suppress("UNCHECKED_CAST")
16981707
override fun next(): E {
1708+
return unwrapInternal(nextInternal())
1709+
}
1710+
1711+
/**
1712+
* Result may be wrapped by debugger agent; use this method only with [unwrapInternal] or [emitInternal]!
1713+
*
1714+
* @see [next], [emitAll]
1715+
*/
1716+
@Suppress("UNCHECKED_CAST")
1717+
internal fun nextInternal(): E {
16991718
// Read the already received result, or [NO_RECEIVE_RESULT] if [hasNext] has not been invoked yet.
17001719
val result = receiveResult
17011720
check(result !== NO_RECEIVE_RESULT) { "`hasNext()` has not been invoked" }
@@ -1712,7 +1731,7 @@ internal open class BufferedChannel<E>(
17121731
val cont = this.continuation!!
17131732
this.continuation = null
17141733
// Store the retrieved element in `receiveResult`.
1715-
this.receiveResult = element
1734+
this.receiveResult = wrapInternal(element)
17161735
// Try to resume this `hasNext()`. Importantly, the receiver coroutine
17171736
// may be cancelled after it is successfully resumed but not dispatched yet.
17181737
// In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
@@ -2815,16 +2834,17 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
28152834
}
28162835

28172836
@Suppress("UNCHECKED_CAST")
2818-
internal fun getElement(index: Int) = data[index * 2].value as E
2837+
internal fun getElement(index: Int) = unwrapInternal(data[index * 2].value) as E
28192838

2820-
internal fun retrieveElement(index: Int): E = getElement(index).also { cleanElement(index) }
2839+
@Suppress("UNCHECKED_CAST")
2840+
internal fun retrieveElement(index: Int): E = (data[index * 2].value as E).also { cleanElement(index) }
28212841

28222842
internal fun cleanElement(index: Int) {
2823-
setElementLazy(index, null)
2843+
data[index * 2].lazySet(null)
28242844
}
28252845

28262846
private fun setElementLazy(index: Int, value: Any?) {
2827-
data[index * 2].lazySet(value)
2847+
data[index * 2].lazySet(wrapInternal(value))
28282848
}
28292849

28302850
// ######################################

kotlinx-coroutines-core/common/src/channels/Channel.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
88
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
99
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
1010
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
11+
import kotlinx.coroutines.flow.FlowCollector
1112
import kotlinx.coroutines.internal.*
1213
import kotlinx.coroutines.selects.*
1314
import kotlin.contracts.*

kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package kotlinx.coroutines.channels
22

33
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.flow.FlowCollector
5+
import kotlinx.coroutines.flow.emitAllInternal
46
import kotlin.coroutines.*
57

68
internal open class ChannelCoroutine<E>(
@@ -35,4 +37,8 @@ internal open class ChannelCoroutine<E>(
3537
_channel.cancel(exception) // cancel the channel
3638
cancelCoroutine(exception) // cancel the job
3739
}
40+
41+
internal suspend fun emitAllInternal(flowCollector: FlowCollector<E>) {
42+
emitAllInternal(_channel, flowCollector)
43+
}
3844
}

kotlinx-coroutines-core/common/src/flow/Channels.kt

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
2929
ensureActive()
3030
var cause: Throwable? = null
3131
try {
32-
for (element in channel) {
33-
emit(element)
34-
}
32+
emitAllInternal(channel, this)
3533
} catch (e: Throwable) {
3634
cause = e
3735
throw e
@@ -40,6 +38,22 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
4038
}
4139
}
4240

41+
internal suspend fun <T> emitAllInternal(channel: ReceiveChannel<T>, collector: FlowCollector<T>) {
42+
when (channel) {
43+
is BufferedChannel<*> -> {
44+
(channel as BufferedChannel<T>).emitAllInternal(collector)
45+
}
46+
is ChannelCoroutine<*> -> {
47+
(channel as ChannelCoroutine<T>).emitAllInternal(collector)
48+
}
49+
else -> {
50+
for (element in channel) {
51+
collector.emit(element)
52+
}
53+
}
54+
}
55+
}
56+
4357
/**
4458
* Represents the given receive channel as a hot flow and [receives][ReceiveChannel.receive] from the channel
4559
* in fan-out fashion every time this flow is collected. One element will be emitted to one collector only.

kotlinx-coroutines-core/common/src/flow/SharedFlow.kt

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ internal open class SharedFlowImpl<T>(
369369
val result = ArrayList<T>(replaySize)
370370
val buffer = buffer!! // must be allocated, because replaySize > 0
371371
@Suppress("UNCHECKED_CAST")
372-
for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T
372+
for (i in 0 until replaySize) result += unwrapInternal(buffer.getBufferAt(replayIndex + i)) as T
373373
result
374374
}
375375

@@ -378,7 +378,7 @@ internal open class SharedFlowImpl<T>(
378378
*/
379379
@Suppress("UNCHECKED_CAST")
380380
protected val lastReplayedLocked: T
381-
get() = buffer!!.getBufferAt(replayIndex + replaySize - 1) as T
381+
get() = unwrapInternal(buffer!!.getBufferAt(replayIndex + replaySize - 1)) as T
382382

383383
@Suppress("UNCHECKED_CAST")
384384
override suspend fun collect(collector: FlowCollector<T>): Nothing {
@@ -394,7 +394,7 @@ internal open class SharedFlowImpl<T>(
394394
awaitValue(slot) // await signal that the new value is available
395395
}
396396
collectorJob?.ensureActive()
397-
collector.emit(newValue as T)
397+
collector.emitInternal(newValue as T)
398398
}
399399
} finally {
400400
freeSlot(slot)
@@ -474,8 +474,16 @@ internal open class SharedFlowImpl<T>(
474474
minCollectorIndex = newHead
475475
}
476476

477-
// enqueues item to buffer array, caller shall increment either bufferSize or queueSize
478477
private fun enqueueLocked(item: Any?) {
478+
enqueueLockedInner(wrapInternal(item))
479+
}
480+
481+
private fun enqueueEmitterLocked(emitter: Emitter) {
482+
enqueueLockedInner(emitter)
483+
}
484+
485+
// enqueues item to buffer array, caller shall increment either bufferSize or queueSize
486+
private fun enqueueLockedInner(item: Any?) {
479487
val curSize = totalSize
480488
val buffer = when (val curBuffer = buffer) {
481489
null -> growBuffer(null, 0, 2)
@@ -505,8 +513,8 @@ internal open class SharedFlowImpl<T>(
505513
return@lock null
506514
}
507515
// add suspended emitter to the buffer
508-
Emitter(this, head + totalSize, value, cont).also {
509-
enqueueLocked(it)
516+
Emitter(this, head + totalSize, wrapInternal(value), cont).also {
517+
enqueueEmitterLocked(it)
510518
queueSize++ // added to queue of waiting emitters
511519
// synchronous shared flow might rendezvous with waiting emitter
512520
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)

kotlinx-coroutines-core/common/src/flow/StateFlow.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ private class StateFlowImpl<T>(
330330
val oldState = _state.value
331331
if (expectedState != null && oldState != expectedState) return false // CAS support
332332
if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
333-
_state.value = newState
333+
updateInner(newState)
334334
curSequence = sequence
335335
if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
336336
curSequence++ // make it odd
@@ -366,6 +366,11 @@ private class StateFlowImpl<T>(
366366
}
367367
}
368368

369+
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
370+
private fun updateInner(newState: Any) {
371+
_state.value = newState
372+
}
373+
369374
override val replayCache: List<T>
370375
get() = listOf(value)
371376

@@ -398,7 +403,7 @@ private class StateFlowImpl<T>(
398403
collectorJob?.ensureActive()
399404
// Conflate value emissions using equality
400405
if (oldState == null || oldState != newState) {
401-
collector.emit(NULL.unbox(newState))
406+
collector.emitInternal(newState)
402407
oldState = newState
403408
}
404409
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot

0 commit comments

Comments
 (0)