Skip to content

Commit ef43ce5

Browse files
committed
Forbid adding new child jobs after the parent decides to complete
1 parent 8a76e31 commit ef43ce5

File tree

2 files changed

+63
-21
lines changed

2 files changed

+63
-21
lines changed

kotlinx-coroutines-core/common/src/JobSupport.kt

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
899899
val child = firstChild(state)
900900
if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
901901
return COMPLETING_WAITING_CHILDREN
902+
list.close(LIST_CHILD_PERMISSION)
903+
val anotherChild = firstChild(state)
904+
if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate))
905+
return COMPLETING_WAITING_CHILDREN
902906
// otherwise -- we have not children left (all were already cancelled?)
903907
return finalizeFinishingState(finishing, proposedUpdate)
904908
}
@@ -928,7 +932,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
928932
val waitChild = lastChild.nextChild()
929933
// try wait for next child
930934
if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
931-
// no more children to wait -- try update state
935+
// no more children to wait -- stop accepting children
936+
state.list.close(LIST_CHILD_PERMISSION)
937+
// did any children get added?
938+
val waitChildAgain = lastChild.nextChild()
939+
// try wait for next child
940+
if (waitChildAgain != null && tryWaitForChild(state, waitChildAgain, proposedUpdate)) return // waiting for next child
941+
// no more children, now we are sure; try to update the state
932942
val finalState = finalizeFinishingState(state, proposedUpdate)
933943
afterCompletion(finalState)
934944
}
@@ -968,41 +978,45 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
968978
val node = ChildHandleNode(child).also { it.job = this }
969979
val added = tryPutNodeIntoList(node) { state, list ->
970980
if (state is Finishing) {
971-
val rootCause: Throwable
981+
val rootCause: Throwable?
972982
val handle: ChildHandle
973983
synchronized(state) {
974984
// check if we are installing cancellation handler on job that is being cancelled
975-
val maybeRootCause = state.rootCause // != null if cancelling job
985+
rootCause = state.rootCause // != null if cancelling job
976986
// We add the node to the list in two cases --- either the job is not being cancelled,
977987
// or we are adding a child to a coroutine that is not completing yet
978-
if (maybeRootCause == null || !state.isCompleting) {
988+
if (rootCause == null || !state.isCompleting) {
979989
// Note: add node the list while holding lock on state (make sure it cannot change)
980-
if (!list.addLast(node, LIST_MAX_PERMISSION))
981-
return@tryPutNodeIntoList false // retry
990+
handle = if (list.addLast(node, LIST_CHILD_PERMISSION)) {
991+
node
992+
} else {
993+
NonDisposableHandle
994+
}
982995
// just return the node if we don't have to invoke the handler (not cancelling yet)
983-
rootCause = maybeRootCause ?: return@tryPutNodeIntoList true
984996
// otherwise handler is invoked immediately out of the synchronized section & handle returned
985-
handle = node
986997
} else {
987-
rootCause = maybeRootCause
988998
handle = NonDisposableHandle
989999
}
9901000
}
9911001
node.invoke(rootCause)
9921002
return handle
993-
} else list.addLast(node, LIST_MAX_PERMISSION).also { success ->
994-
if (success) {
995-
/** Handling the following case:
996-
* - A child requested to be added to the list;
997-
* - We checked the state and saw that it wasn't `Finishing`;
998-
* - Then, the job got cancelled and notified everyone about it;
999-
* - Only then did we add the child to the list
1000-
* - and ended up here.
1001-
*/
1002-
val latestState = this@JobSupport.state
1003-
if (latestState is Finishing) {
1004-
synchronized(latestState) { latestState.rootCause }?.let { node.invoke(it) }
1003+
} else {
1004+
list.addLast(node, LIST_CHILD_PERMISSION).also { success ->
1005+
if (success) {
1006+
/** Handling the following case:
1007+
* - A child requested to be added to the list;
1008+
* - We checked the state and saw that it wasn't `Finishing`;
1009+
* - Then, the job got cancelled and notified everyone about it;
1010+
* - Only then did we add the child to the list
1011+
* - and ended up here.
1012+
*/
1013+
val latestState = this@JobSupport.state
1014+
if (latestState is Finishing) {
1015+
synchronized(latestState) { latestState.rootCause }?.let { node.invoke(it) }
1016+
}
10051017
}
1018+
// if we didn't add the node to the list, we'll loop and notice
1019+
// either `Finishing` or the final state, so no spin loop here
10061020
}
10071021
}
10081022
}
@@ -1340,6 +1354,7 @@ private val EMPTY_NEW = Empty(false)
13401354
private val EMPTY_ACTIVE = Empty(true)
13411355

13421356
private const val LIST_MAX_PERMISSION = Int.MAX_VALUE
1357+
private const val LIST_CHILD_PERMISSION = 1
13431358
private const val LIST_CANCELLATION_PERMISSION = 0
13441359

13451360
private class Empty(override val isActive: Boolean) : Incomplete {

kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import kotlinx.coroutines.testing.*
44
import org.junit.*
55
import org.junit.Test
66
import java.util.concurrent.*
7+
import java.util.concurrent.atomic.*
78
import kotlin.test.*
89

910
class JobChildStressTest : TestBase() {
@@ -54,4 +55,30 @@ class JobChildStressTest : TestBase() {
5455
}
5556
}
5657
}
58+
59+
@Test
60+
fun testFailingChildIsAddedWhenJobFinalizesItsState() {
61+
// All exceptions should get aggregated here
62+
repeat(N_ITERATIONS) {
63+
runBlocking {
64+
val rogueJob = AtomicReference<Job?>()
65+
val deferred = CompletableDeferred<Unit>()
66+
launch(pool + deferred) {
67+
deferred.complete(Unit) // Transition deferred into "completing" state waiting for current child
68+
// **Asynchronously** submit task that launches a child so it races with completion
69+
pool.executor.execute {
70+
rogueJob.set(launch(pool + deferred) {
71+
throw TestException("isCancelled: ${coroutineContext.job.isCancelled}")
72+
})
73+
}
74+
}
75+
76+
deferred.join()
77+
val rogue = rogueJob.get()
78+
if (rogue?.isActive == true) {
79+
throw TestException("Rogue job $rogue with parent " + rogue.parent + " and children list: " + rogue.parent?.children?.toList())
80+
}
81+
}
82+
}
83+
}
5784
}

0 commit comments

Comments
 (0)