Skip to content

Commit a4d1a3b

Browse files
committed
Minor improvements
1 parent e1bd019 commit a4d1a3b

File tree

6 files changed

+61
-62
lines changed

6 files changed

+61
-62
lines changed

atomicfu/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ kotlin {
127127

128128
jvmTest {
129129
dependencies {
130-
implementation("org.jetbrains.kotlinx:lincheck:2.38")
130+
implementation("org.jetbrains.kotlinx:lincheck:2.39")
131131
implementation("org.jetbrains.kotlin:kotlin-reflect")
132132
implementation("org.jetbrains.kotlin:kotlin-test")
133133
implementation("org.jetbrains.kotlin:kotlin-test-junit")

atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt renamed to atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/NativeMutex.kt

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ internal class NativeMutex(
5757
}
5858

5959
// Otherwise try acquire lock
60-
val newState = state.incrementAndGet()
60+
val newState = this@NativeMutex.state.incrementAndGet()
6161
// If new state 1 than I have acquired lock skipping queue.
6262
if (newState == 1) {
6363
owningThread.value = currentParkingHandle
@@ -92,14 +92,14 @@ internal class NativeMutex(
9292
if (newHoldCount < 0) throw IllegalStateException("Thread unlocked more than it locked")
9393

9494
// Lock is released by decrementing (only if decremented to 0)
95-
val currentState = state.decrementAndGet()
95+
val currentState = this@NativeMutex.state.decrementAndGet()
9696
if (currentState == 0) return
9797

9898
// If waiters wake up the first in line. The woken up thread will dequeue the node.
9999
if (currentState > 0) {
100100
var nextParker = parkingQueue.getHead()
101101
// If cancelled And there are other waiting nodes, go to next
102-
while (!nextParker.nodeWake() && state.decrementAndGet() > 0) {
102+
while (!nextParker.nodeWake() && this@NativeMutex.state.decrementAndGet() > 0) {
103103
// We only dequeue here in case of timeoud out node.
104104
// Dequeueing woken nodes can lead to issues when pre-unparked.
105105
parkingQueue.dequeue()
@@ -111,7 +111,7 @@ internal class NativeMutex(
111111

112112
fun tryLock(): Boolean {
113113
val currentThreadId = ParkingSupport.currentThreadHandle()
114-
if (holdCount.value > 0 && owningThread.value == currentThreadId || state.compareAndSet(0, 1)) {
114+
if (holdCount.value > 0 && owningThread.value == currentThreadId || this@NativeMutex.state.compareAndSet(0, 1)) {
115115
owningThread.value = currentThreadId
116116
holdCount.incrementAndGet()
117117
return true
@@ -156,46 +156,56 @@ internal class NativeMutex(
156156

157157
}
158158

159+
/**
160+
* A [Node] is one spot in the mutex's queue.
161+
* It has a reference to the [next] node in the queue.
162+
*
163+
* It can be in one of four [state].
164+
* - [Empty] is the initial state where the node is not in use yet.
165+
* - [ParkingHandle] is the waiting state.
166+
* - [Awoken] is the woken state.
167+
* - [TimedOut] is the state after timeout.
168+
*/
159169
inner class Node {
160-
val parker = atomic<Any>(Empty)
170+
val state = atomic<Any>(Empty)
161171
val next = atomic<Node?>(null)
162172

163173
fun nodeWait(duration: Duration): Boolean {
164174
val deadline = TimeSource.Monotonic.markNow() + duration
165175
while (true) {
166-
when (parker.value) {
167-
Empty -> if (parker.compareAndSet(Empty, ParkingSupport.currentThreadHandle())) {
176+
when (state.value) {
177+
Empty -> if (state.compareAndSet(Empty, ParkingSupport.currentThreadHandle())) {
168178
park(deadline - TimeSource.Monotonic.markNow())
169179
if (deadline < TimeSource.Monotonic.markNow())
170-
parker.compareAndSet(ParkingSupport.currentThreadHandle(), Cancelled)
180+
state.compareAndSet(ParkingSupport.currentThreadHandle(), TimedOut)
171181
}
172182
is ParkingHandle -> {
173183
park(deadline - TimeSource.Monotonic.markNow())
174184
if (deadline < TimeSource.Monotonic.markNow())
175-
parker.compareAndSet(ParkingSupport.currentThreadHandle(), Cancelled)
185+
state.compareAndSet(ParkingSupport.currentThreadHandle(), TimedOut)
176186
}
177187
Awoken -> return true
178-
Cancelled -> return false
188+
TimedOut -> return false
179189
}
180190
}
181191
}
182192

183193
fun nodeWake(): Boolean {
184194
while (true) {
185-
when (val currentState = parker.value) {
186-
Empty -> if (parker.compareAndSet(Empty, Awoken)) return true
187-
is ParkingHandle -> if (parker.compareAndSet(currentState, Awoken)) {
195+
when (val currentState = state.value) {
196+
Empty -> if (state.compareAndSet(Empty, Awoken)) return true
197+
is ParkingHandle -> if (state.compareAndSet(currentState, Awoken)) {
188198
unpark(currentState)
189199
return true
190200
}
191201
Awoken -> throw IllegalStateException("Node is already woken")
192-
Cancelled -> return false
202+
TimedOut -> return false
193203
}
194204
}
195205
}
196206
}
197207

198208
private object Empty
199209
private object Awoken
200-
private object Cancelled
210+
private object TimedOut
201211
}

atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LockWithTimoutTests.kt

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,43 +6,38 @@ import kotlin.test.Test
66
import kotlin.test.assertEquals
77
import kotlin.time.Duration.Companion.milliseconds
88

9-
class LockWithTimeoutTests {
10-
11-
// Helper class with atomic counter in constructor
12-
class AtomicCounter(initialValue: Int = 0) {
13-
val counter = atomic(initialValue)
9+
private const val N_THREADS = 5
10+
private const val TARGET_COUNT = 1_000
11+
private fun getRandomWait() = Random.nextInt(0, 5).toLong()
12+
private fun getRandomTimeout() = Random.nextInt(1, 10)
1413

15-
fun incrementAndGet(): Int = counter.incrementAndGet()
16-
val value: Int get() = counter.value
17-
}
14+
class LockWithTimeoutTests {
15+
val counter = atomic(0)
1816

1917
@Test
2018
fun timeoutLockStressTest() {
2119
val mutex = SynchronousMutex()
22-
val counter = AtomicCounter(0)
23-
val targetCount = 1000
2420
val threads = mutableListOf<TestThread>()
2521

26-
// Create 5 test threads
27-
repeat(5) { threadId ->
22+
repeat(N_THREADS) { threadId ->
2823
val thread = testThread {
29-
while (counter.value < targetCount) {
24+
while (counter.value < TARGET_COUNT) {
3025
// Try to acquire the lock with a timeout
31-
if (mutex.tryLock((Random.nextInt(1, 10)).milliseconds)) {
26+
if (mutex.tryLock(getRandomTimeout().milliseconds)) {
3227
try {
3328
// Increment the counter if lock was acquired
34-
if (counter.value < targetCount) {
29+
if (counter.value < TARGET_COUNT) {
3530
counter.incrementAndGet()
3631
}
37-
// Random sleep to increase variation
38-
sleepMillis(Random.nextInt(0, 5).toLong())
32+
// Random sleep after increment to increase variation
33+
sleepMillis(getRandomWait())
3934
} finally {
4035
mutex.unlock()
4136
}
4237
}
4338

44-
// Random sleep between attempts to increase variation
45-
sleepMillis(Random.nextInt(0, 3).toLong())
39+
// Random sleep between increment attempts to increase variation
40+
sleepMillis(getRandomWait())
4641
}
4742
}
4843
threads.add(thread)
@@ -52,7 +47,7 @@ class LockWithTimeoutTests {
5247
threads.forEach { it.join() }
5348

5449
// Verify the counter reached the target
55-
assertEquals(targetCount, counter.value)
50+
assertEquals(TARGET_COUNT, counter.value)
5651
}
5752

5853
}

atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTest.kt

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ package kotlinx.atomicfu.locks
33
import kotlin.test.Test
44
import kotlin.test.assertEquals
55

6+
private const val SLEEP_MILLIS = 10L
7+
private const val N_REPEATS_SLOW = 100
8+
private const val N_REPEATS_FAST = 30_000
9+
610
class NativeMutexTest {
711

812

@@ -11,38 +15,39 @@ class NativeMutexTest {
1115
val mutex = NativeMutex()
1216
val resultList = mutableListOf<String>()
1317

14-
val fut1 = testThread {
15-
repeat(30) { i ->
18+
val futA = Fut {
19+
repeat(N_REPEATS_SLOW) { i ->
1620
mutex.lock()
1721
resultList.add("a$i")
18-
sleepMillis(100)
22+
sleepMillis(SLEEP_MILLIS)
1923
resultList.add("a$i")
2024
mutex.unlock()
2125
}
2226
}
2327

24-
val fut2 = testThread {
25-
repeat(30) { i ->
28+
val futB = Fut {
29+
repeat(N_REPEATS_SLOW) { i ->
2630
mutex.lock()
2731
resultList.add("b$i")
28-
sleepMillis(100)
32+
sleepMillis(SLEEP_MILLIS)
2933
resultList.add("b$i")
3034
mutex.unlock()
3135
}
3236
}
3337

34-
repeat(30) { i ->
38+
repeat(N_REPEATS_SLOW) { i ->
3539
mutex.lock()
3640
resultList.add("c$i")
37-
sleepMillis(100)
41+
sleepMillis(SLEEP_MILLIS)
3842
resultList.add("c$i")
3943
mutex.unlock()
4044
}
41-
fut1.join()
42-
fut2.join()
45+
46+
futA.waitThrowing()
47+
futB.waitThrowing()
4348

4449
resultList.filterIndexed { i, _ -> i % 2 == 0 }
45-
.zip(resultList.filterIndexed {i, _ -> i % 2 == 1}) { a, b ->
50+
.zip(resultList.filterIndexed { i, _ -> i % 2 == 1 }) { a, b ->
4651
assertEquals(a, b)
4752
}
4853
}
@@ -53,7 +58,7 @@ class NativeMutexTest {
5358
val resultList = mutableListOf<String>()
5459

5560
val fut1 = testThread {
56-
repeat(30000) { i ->
61+
repeat(N_REPEATS_FAST) { i ->
5762
mutex.lock()
5863
resultList.add("a$i")
5964
resultList.add("a$i")
@@ -62,15 +67,15 @@ class NativeMutexTest {
6267
}
6368

6469
val fut2 = testThread {
65-
repeat(30000) { i ->
70+
repeat(N_REPEATS_FAST) { i ->
6671
mutex.lock()
6772
resultList.add("b$i")
6873
resultList.add("b$i")
6974
mutex.unlock()
7075
}
7176
}
7277

73-
repeat(30000) { i ->
78+
repeat(N_REPEATS_FAST) { i ->
7479
mutex.lock()
7580
resultList.add("c$i")
7681
resultList.add("c$i")

atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/VaryingContentionTest.kt

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,9 @@ class VaryingContentionTest {
1010
fun varyingContentionTest() {
1111
val lockInt = LockInt()
1212
multiTestLock(lockInt, 10, 100000)
13-
println("Varying Contention Test 1")
1413
multiTestLock(lockInt, 1, 200000)
15-
println("Varying Contention Test 2")
1614
multiTestLock(lockInt, 20, 300000)
17-
println("Varying Contention Test 3")
1815
multiTestLock(lockInt, 1, 400000)
19-
println("Varying Contention Test 4")
20-
multiTestLock(lockInt, 2, 1000000)
21-
println("Varying Contention Test Done")
2216
}
2317

2418

@@ -53,7 +47,7 @@ class VaryingContentionTest {
5347
)
5448

5549
class LockInt {
56-
private val lock = NativeMutex()
50+
private val lock = SynchronousMutex()
5751
private val check = atomic(0)
5852
var n = 0
5953
fun lock() {

atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,6 @@ package kotlinx.atomicfu.locks
33
import java.util.concurrent.TimeUnit
44
import kotlin.time.Duration
55

6-
/**
7-
* This mutex uses a [ReentrantLock].
8-
*
9-
* Construct with `Mutex(reentrantLock)` to create a [SynchronousMutex] that uses an existing instance of [ReentrantLock].
10-
*/
116
public actual class SynchronousMutex {
127
private val reentrantLock = ReentrantLock()
138
public actual fun tryLock(timeout: Duration): Boolean = reentrantLock.tryLock(timeout.inWholeNanoseconds, TimeUnit.NANOSECONDS)

0 commit comments

Comments
 (0)