diff --git a/atomicfu/api/atomicfu.api b/atomicfu/api/atomicfu.api index d8e6d4d2..43d0bf80 100644 --- a/atomicfu/api/atomicfu.api +++ b/atomicfu/api/atomicfu.api @@ -146,3 +146,15 @@ public final class kotlinx/atomicfu/locks/ParkingSupport { public final fun unpark (Ljava/lang/Thread;)V } +public final class kotlinx/atomicfu/locks/SynchronousMutex { + public fun ()V + public final fun lock ()V + public final fun tryLock ()Z + public final fun tryLock-LRDsOJo (J)Z + public final fun unlock ()V +} + +public final class kotlinx/atomicfu/locks/SynchronousMutexKt { + public static final fun withLock (Lkotlinx/atomicfu/locks/SynchronousMutex;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object; +} + diff --git a/atomicfu/api/atomicfu.klib.api b/atomicfu/api/atomicfu.klib.api index ef1e1ba4..d170cb60 100644 --- a/atomicfu/api/atomicfu.klib.api +++ b/atomicfu/api/atomicfu.klib.api @@ -45,6 +45,15 @@ final class <#A: kotlin/Any?> kotlinx.atomicfu/AtomicRef { // kotlinx.atomicfu/A final fun compareAndSet(#A, #A): kotlin/Boolean // kotlinx.atomicfu/AtomicRef.compareAndSet|compareAndSet(1:0;1:0){}[0] } +final class kotlinx.atomicfu.locks/SynchronousMutex { // kotlinx.atomicfu.locks/SynchronousMutex|null[0] + constructor () // kotlinx.atomicfu.locks/SynchronousMutex.|(){}[0] + + final fun lock() // kotlinx.atomicfu.locks/SynchronousMutex.lock|lock(){}[0] + final fun tryLock(): kotlin/Boolean // kotlinx.atomicfu.locks/SynchronousMutex.tryLock|tryLock(){}[0] + final fun tryLock(kotlin.time/Duration): kotlin/Boolean // kotlinx.atomicfu.locks/SynchronousMutex.tryLock|tryLock(kotlin.time.Duration){}[0] + final fun unlock() // kotlinx.atomicfu.locks/SynchronousMutex.unlock|unlock(){}[0] +} + final class kotlinx.atomicfu/AtomicBoolean { // kotlinx.atomicfu/AtomicBoolean|null[0] final var value // kotlinx.atomicfu/AtomicBoolean.value|{}value[0] // Targets: [native] @@ -278,6 +287,7 @@ final inline fun (kotlinx.atomicfu/AtomicLong).kotlinx.atomicfu/getAndUpdate(kot final inline fun (kotlinx.atomicfu/AtomicLong).kotlinx.atomicfu/loop(kotlin/Function1): kotlin/Nothing // kotlinx.atomicfu/loop|loop@kotlinx.atomicfu.AtomicLong(kotlin.Function1){}[0] final inline fun (kotlinx.atomicfu/AtomicLong).kotlinx.atomicfu/update(kotlin/Function1) // kotlinx.atomicfu/update|update@kotlinx.atomicfu.AtomicLong(kotlin.Function1){}[0] final inline fun (kotlinx.atomicfu/AtomicLong).kotlinx.atomicfu/updateAndGet(kotlin/Function1): kotlin/Long // kotlinx.atomicfu/updateAndGet|updateAndGet@kotlinx.atomicfu.AtomicLong(kotlin.Function1){}[0] +final inline fun <#A: kotlin/Any?> (kotlinx.atomicfu.locks/SynchronousMutex).kotlinx.atomicfu.locks/withLock(kotlin/Function0<#A>): #A // kotlinx.atomicfu.locks/withLock|withLock@kotlinx.atomicfu.locks.SynchronousMutex(kotlin.Function0<0:0>){0§}[0] final inline fun <#A: kotlin/Any?> (kotlinx.atomicfu/AtomicRef<#A>).kotlinx.atomicfu/getAndUpdate(kotlin/Function1<#A, #A>): #A // kotlinx.atomicfu/getAndUpdate|getAndUpdate@kotlinx.atomicfu.AtomicRef<0:0>(kotlin.Function1<0:0,0:0>){0§}[0] final inline fun <#A: kotlin/Any?> (kotlinx.atomicfu/AtomicRef<#A>).kotlinx.atomicfu/loop(kotlin/Function1<#A, kotlin/Unit>): kotlin/Nothing // kotlinx.atomicfu/loop|loop@kotlinx.atomicfu.AtomicRef<0:0>(kotlin.Function1<0:0,kotlin.Unit>){0§}[0] final inline fun <#A: kotlin/Any?> (kotlinx.atomicfu/AtomicRef<#A>).kotlinx.atomicfu/update(kotlin/Function1<#A, #A>) // kotlinx.atomicfu/update|update@kotlinx.atomicfu.AtomicRef<0:0>(kotlin.Function1<0:0,0:0>){0§}[0] @@ -297,14 +307,6 @@ open annotation class kotlinx.atomicfu.locks/ExperimentalThreadBlockingApi : kot constructor () // kotlinx.atomicfu.locks/ExperimentalThreadBlockingApi.|(){}[0] } -// Targets: [native] -final class kotlinx.atomicfu.locks/NativeMutexNode { // kotlinx.atomicfu.locks/NativeMutexNode|null[0] - constructor () // kotlinx.atomicfu.locks/NativeMutexNode.|(){}[0] - - final fun lock() // kotlinx.atomicfu.locks/NativeMutexNode.lock|lock(){}[0] - final fun unlock() // kotlinx.atomicfu.locks/NativeMutexNode.unlock|unlock(){}[0] -} - // Targets: [native] final class kotlinx.atomicfu.locks/ParkingHandle // kotlinx.atomicfu.locks/ParkingHandle|null[0] diff --git a/atomicfu/build.gradle.kts b/atomicfu/build.gradle.kts index 45f25693..936b5c17 100644 --- a/atomicfu/build.gradle.kts +++ b/atomicfu/build.gradle.kts @@ -126,6 +126,7 @@ kotlin { jvmTest { dependencies { + implementation(libs.jetbrains.lincheck) implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-test") implementation("org.jetbrains.kotlin:kotlin-test-junit") @@ -205,7 +206,12 @@ val transformedTestFU_current by tasks.registering(Test::class) { dependsOn(transformFU) classpath = files(configurations.getByName("jvmTestRuntimeClasspath"), classesPostTransformFU) testClassesDirs = project.files(classesPostTransformFU) - exclude("**/*LFTest.*", "**/TraceToStringTest.*", "**/AtomicfuReferenceJsTest.*") + exclude( + "**/*LFTest.*", + "**/TraceToStringTest.*", + "**/AtomicfuReferenceJsTest.*", + "**/*LincheckTest.*", + ) filter { isFailOnNoMatchingTests = false } launcherForJdk(LAUNCHER_JDK_VERSION) } @@ -219,7 +225,8 @@ val transformedTestBOTH_current by tasks.registering(Test::class) { "**/TraceToStringTest.*", "**/TopLevelGeneratedDeclarationsReflectionTest.*", "**/SyntheticFUFieldsTest.*", - "**/AtomicfuReferenceJsTest.*" + "**/AtomicfuReferenceJsTest.*", + "**/*LincheckTest.*", ) filter { isFailOnNoMatchingTests = false } launcherForJdk(LAUNCHER_JDK_VERSION) @@ -234,7 +241,8 @@ val transformedTestVH by tasks.registering(Test::class) { "**/TraceToStringTest.*", "**/TopLevelGeneratedDeclarationsReflectionTest.*", "**/SyntheticFUFieldsTest.*", - "**/AtomicfuReferenceJsTest.*" + "**/AtomicfuReferenceJsTest.*", + "**/*LincheckTest.*", ) filter { isFailOnNoMatchingTests = false } diff --git a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt deleted file mode 100644 index bcb63584..00000000 --- a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.atomicfu.locks - -import kotlinx.atomicfu.atomic -import kotlinx.cinterop.Arena -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.UnsafeNumber -import kotlinx.cinterop.alloc -import kotlinx.cinterop.ptr -import platform.posix.* - -@OptIn(ExperimentalForeignApi::class, UnsafeNumber::class) -public actual class NativeMutexNode { - internal actual var next: NativeMutexNode? = null - - private val arena: Arena = Arena() - private val cond: pthread_cond_t = arena.alloc() - private val mutex: pthread_mutex_t = arena.alloc() - private val attr: pthread_mutexattr_tVar = arena.alloc() - - - init { - require(pthread_cond_init(cond.ptr, null) == 0) - require(pthread_mutexattr_init(attr.ptr) == 0) - require(pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK.toInt()) == 0) - require(pthread_mutex_init(mutex.ptr, attr.ptr) == 0) - } - - public actual fun lock() { - pthread_mutex_lock(mutex.ptr) - } - - public actual fun unlock() { - pthread_mutex_unlock(mutex.ptr) - } - - internal actual fun wait(lockOwner: Long) { - pthread_cond_wait(cond.ptr, mutex.ptr) - } - - internal actual fun notify() { - pthread_cond_signal(cond.ptr) - } - - internal actual fun dispose() { - pthread_cond_destroy(cond.ptr) - pthread_mutex_destroy(mutex.ptr) - pthread_mutexattr_destroy(attr.ptr) - arena.clear() - } -} - -private val threadCounter = atomic(0L) - -internal actual fun createThreadId(): Long = threadCounter.incrementAndGet() diff --git a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt deleted file mode 100644 index ac0feb10..00000000 --- a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.atomicfu.locks - -import kotlinx.cinterop.Arena -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.alloc -import kotlinx.cinterop.ptr -import kotlinx.cinterop.IntVar -import kotlinx.cinterop.UIntVar -import kotlinx.cinterop.toCPointer -import kotlinx.cinterop.value -import platform.posix.pthread_cond_destroy -import platform.posix.pthread_cond_init -import platform.posix.pthread_cond_signal -import platform.posix.pthread_cond_t -import platform.posix.pthread_cond_wait -import platform.posix.pthread_mutex_destroy -import platform.posix.pthread_mutex_init -import platform.posix.pthread_mutex_lock -import platform.posix.pthread_mutex_t -import platform.posix.pthread_mutex_unlock -import platform.posix.pthread_mutexattr_destroy -import platform.posix.pthread_mutexattr_init -import platform.posix.pthread_mutexattr_settype -import platform.posix.pthread_mutexattr_t -import platform.posix.pthread_get_qos_class_np -import platform.posix.pthread_override_t -import platform.posix.pthread_override_qos_class_end_np -import platform.posix.pthread_override_qos_class_start_np -import platform.posix.qos_class_self - -import platform.posix.PTHREAD_MUTEX_ERRORCHECK - -@OptIn(ExperimentalForeignApi::class) -public actual class NativeMutexNode { - internal actual var next: NativeMutexNode? = null - - private val arena: Arena = Arena() - private val cond: pthread_cond_t = arena.alloc() - private val mutex: pthread_mutex_t = arena.alloc() - private val attr: pthread_mutexattr_t = arena.alloc() - private var qosOverride: pthread_override_t? = null - private var qosOverrideQosClass: UInt = 0U - - // Used locally as return parameters in donateQos - private val lockOwnerQosClass = arena.alloc() - private val lockOwnerRelPrio = arena.alloc() - - init { - require(pthread_cond_init(cond.ptr, null) == 0) - require(pthread_mutexattr_init(attr.ptr) == 0) - require(pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK) == 0) - require(pthread_mutex_init(mutex.ptr, attr.ptr) == 0) - } - - public actual fun lock() { - pthread_mutex_lock(mutex.ptr) - } - - public actual fun unlock() { - pthread_mutex_unlock(mutex.ptr) - } - - internal actual fun notify() { - pthread_cond_signal(cond.ptr) - } - - internal actual fun wait(lockOwner: Long) { - donateQos(lockOwner) - require(pthread_cond_wait(cond.ptr, mutex.ptr) == 0) - clearDonation() - } - - private fun donateQos(lockOwner: Long) { - if (lockOwner == NO_OWNER) { - return - } - val ourQosClass = qos_class_self() - // Set up a new override if required: - if (qosOverride != null) { - // There is an existing override, but we need to go higher. - if (ourQosClass > qosOverrideQosClass) { - pthread_override_qos_class_end_np(qosOverride) - qosOverride = pthread_override_qos_class_start_np(lockOwner.toCPointer(), qos_class_self(), 0) - qosOverrideQosClass = ourQosClass - } - } else { - // No existing override, check if we need to set one up. - pthread_get_qos_class_np(lockOwner.toCPointer(), lockOwnerQosClass.ptr, lockOwnerRelPrio.ptr) - if (ourQosClass > lockOwnerQosClass.value) { - qosOverride = pthread_override_qos_class_start_np(lockOwner.toCPointer(), ourQosClass, 0) - qosOverrideQosClass = ourQosClass - } - } - } - - private fun clearDonation() { - if (qosOverride != null) { - pthread_override_qos_class_end_np(qosOverride) - qosOverride = null - } - } - - internal actual fun dispose() { - pthread_cond_destroy(cond.ptr) - pthread_mutex_destroy(mutex.ptr) - pthread_mutexattr_destroy(attr.ptr) - arena.clear() - } -} diff --git a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt deleted file mode 100644 index 1a019c8f..00000000 --- a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.atomicfu.locks - -import kotlinx.cinterop.toLong -import platform.posix.pthread_self - -internal actual fun createThreadId() = pthread_self().toLong() \ No newline at end of file diff --git a/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt new file mode 100644 index 00000000..8cf33d2c --- /dev/null +++ b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt @@ -0,0 +1,89 @@ +package kotlinx.atomicfu.locks + +import kotlin.contracts.ExperimentalContracts +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract +import kotlin.time.Duration + +/** + * Mutual exclusion for Kotlin Multiplatform. + * + * It can protect a shared resource or critical section from multiple thread accesses. + * Threads can acquire the lock by calling [lock] and release the lock by calling [unlock]. + * + * When a thread calls [lock] while another thread is locked, it will suspend until the lock is released. + * When multiple threads are waiting for the lock, they will acquire it in a fair order (first in first out). + * On JVM, a [lock] call can skip the queue if it happens in between a thread releasing and the first in queue acquiring. + * + * It is reentrant, meaning the lock holding thread can call [lock] multiple times without suspending. + * To release the lock (after multiple [lock] calls) an equal number of [unlock] calls are required. + * + * This Mutex should not be used in combination with coroutines and `suspend` functions + * as it blocks the waiting thread. + * Use the `Mutex` from the coroutines library instead. + * + * ```Kotlin + * mutex.withLock { + * // Critical section only executed by + * // one thread at a time. + * } + * ``` + */ +public expect class SynchronousMutex() { + /** + * Tries to lock this mutex, returning `false` if this mutex is already locked. + * + * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always + * released at the end of your critical section, and [unlock] is never invoked before a successful + * lock acquisition. + * + * (JVM only) this call can potentially skip line. + */ + public fun tryLock(): Boolean + + /** + * Tries to lock this mutex within the given [timeout] period, + * returning `false` if the duration passed without locking. + * + * Note: when [tryLock] succeeds the lock needs to be released by [unlock]. + * When [tryLock] does not succeed the lock does not have to be released. + * + * (JVM only) throws Interrupted exception when thread is interrupted while waiting for lock. + */ + public fun tryLock(timeout: Duration): Boolean + + /** + * Locks the mutex, blocks the thread until the lock is acquired. + * + * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always + * released at the end of your critical section, and [unlock] is never invoked before a successful + * lock acquisition. + */ + public fun lock() + + /** + * Releases the lock. + * Throws [IllegalStateException] when the current thread is not holding the lock. + * + * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always + * released at the end of the critical section, and [unlock] is never invoked before a successful + * lock acquisition. + */ + public fun unlock() +} + +/** + * Executes the given code [block] under this mutex's lock. + * + * @return result of [block] + */ +@OptIn(ExperimentalContracts::class) +public inline fun SynchronousMutex.withLock(block: () -> T): T { + contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } + lock() + return try { + block() + } finally { + unlock() + } +} diff --git a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/NativeMutex.kt b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/NativeMutex.kt new file mode 100644 index 00000000..28a71b30 --- /dev/null +++ b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/NativeMutex.kt @@ -0,0 +1,224 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.AtomicRef +import kotlinx.atomicfu.atomic +import kotlin.time.Duration +import kotlin.time.TimeSource + +/** + * Mutex implementation for Kotlin/Native. + * In concurrentMain sourceSet to be testable with Lincheck. + * [park] and [unpark] functions can be passed for testability. + */ +internal class NativeMutex( + val park : (Duration) -> Unit = { ParkingSupport.park(it) }, + val unpark : (ParkingHandle) -> Unit = ParkingSupport::unpark, +) { + /** + * The [state] variable stands for: 0 -> Lock is free + * 1 -> Lock is locked but no waiters + * 4 -> Lock is locked with 3 waiters + * + * The state.incrementAndGet() call makes my claim on the lock. + * The returned value either means I acquired it (when it is 1). + * Or I need to enqueue and park (when it is > 1). + * + * The [holdCount] variable is to enable reentrancy. + * + * Works by using a [parkingQueue]. + * When a thread tries to acquire the lock, but finds it is already locked it enqueues by appending to the [parkingQueue]. + * On enqueue the parking queue provides the second last node, this node is used to park on. + * When our thread is woken up that means that the thread parked on the thrid last node called unpark on the second last node. + * Since a woken up thread is the first in line it means that it's node is the head and can therefore dequeue. + * + * Unlocking happens by calling state.decrementAndGet(). + * When the returned value is 0 it means the lock is free and we can simply return. + * If the new state is > 0, then there are waiters. We wake up the first by unparking the head of the queue. + * This even works when a thread is not parked yet, + * since the ThreadParker can be pre-unparked resulting in the parking call to return immediately. + */ + private val parkingQueue = ParkingQueue() + private val owningThread = atomic(null) + private val state = atomic(0) + private val holdCount = atomic(0) + + fun lock() { + tryLock(Duration.INFINITE) + } + + fun tryLock(duration: Duration): Boolean { + val currentParkingHandle = ParkingSupport.currentThreadHandle() + + // Has to be checked in this order! + if (holdCount.value > 0 && currentParkingHandle == owningThread.value) { + // Is reentring thread + holdCount.incrementAndGet() + return true + } + + // Otherwise try acquire lock + val newState = this@NativeMutex.state.incrementAndGet() + check(newState > 0) { "Negative mutex state should not be possible" } + + // If new state 1 than I have acquired lock skipping queue. + if (newState == 1) { + owningThread.value = currentParkingHandle + holdCount.incrementAndGet() + return true + } + + // If state larger than 1 -> enqueue and park + // When woken up thread has acquired lock and his node in the queue is therefore at the head. + // Remove head + val prevNode = parkingQueue.enqueue() + // if timeout + if (!prevNode.nodeWait(duration)) return false + parkingQueue.dequeue() + owningThread.value = currentParkingHandle + holdCount.incrementAndGet() + return true + } + + fun unlock() { + val currentThreadId = ParkingSupport.currentThreadHandle() + val currentOwnerId = owningThread.value + check(currentThreadId == currentOwnerId) { "Thread is not holding the lock" } + + // dec hold count + check(holdCount.value > 0) { "Thread unlocked more than it locked" } + val newHoldCount = holdCount.decrementAndGet() + if (newHoldCount > 0) return + + // Lock is released by decrementing (only if decremented to 0) + val currentState = this@NativeMutex.state.decrementAndGet() + if (currentState == 0) return + + check(currentState > 0) { "Negative mutex state should not be possible" } + // If waiters wake up the first in line. The woken up thread will dequeue the node. + var nextParker = parkingQueue.getHead() + // If cancelled and dequeue and try next + while (!nextParker.nodeWake()) { + // We only dequeue here in case of timed out node. + // Dequeueing woken nodes can lead to issues when pre-unparked. + parkingQueue.dequeue() + nextParker = parkingQueue.getHead() + + // If no nodes left leave mutex in unlocked state + if (this@NativeMutex.state.decrementAndGet() == 0) return + } + } + + fun tryLock(): Boolean { + val currentThreadId = ParkingSupport.currentThreadHandle() + if (holdCount.value > 0 && owningThread.value == currentThreadId || this@NativeMutex.state.compareAndSet(0, 1)) { + owningThread.value = currentThreadId + holdCount.incrementAndGet() + return true + } + return false + } + + // Based on Micheal-Scott Queue + inner class ParkingQueue { + private val head: AtomicRef + private val tail: AtomicRef + + init { + val first = Node() + head = atomic(first) + tail = atomic(first) + } + + fun getHead(): Node { + return head.value + } + + fun enqueue(): Node { + while (true) { + val node = Node() + val curTail = tail.value + if (curTail.next.compareAndSet(null, node)) { + tail.compareAndSet(curTail, node) + return curTail + } + else tail.compareAndSet(curTail, curTail.next.value!!) + } + } + + fun dequeue() { + while (true) { + val currentHead = head.value + val currentHeadNext = currentHead.next.value ?: throw IllegalStateException("Dequeing parker but already empty, should not be possible") + if (head.compareAndSet(currentHead, currentHeadNext)) return + } + } + + } + + /** + * A [Node] is one spot in the mutex's queue. + * It has a reference to the [next] node in the queue. + * + * It can be in one of four [state]. + * - [Empty] is the initial state where the node is not in use yet. + * - [ParkingHandle] is the waiting state. + * - [Awoken] is the woken state. + * - [TimedOut] is the state after timeout. + */ + inner class Node { + val state = atomic(Empty) + val next = atomic(null) + + fun nodeWait(duration: Duration): Boolean { + val deadline = TimeSource.Monotonic.markNow() + duration + while (true) { + when (state.value) { + Empty -> if (state.compareAndSet(Empty, ParkingSupport.currentThreadHandle())) { + park(deadline - TimeSource.Monotonic.markNow()) + if (deadline < TimeSource.Monotonic.markNow()) + state.compareAndSet(ParkingSupport.currentThreadHandle(), TimedOut) + } + is ParkingHandle -> { + park(deadline - TimeSource.Monotonic.markNow()) + if (deadline < TimeSource.Monotonic.markNow()) + state.compareAndSet(ParkingSupport.currentThreadHandle(), TimedOut) + } + Awoken -> return true + TimedOut -> return false + } + } + } + + fun nodeWake(): Boolean { + while (true) { + when (val currentState = state.value) { + Empty -> if (state.compareAndSet(Empty, Awoken)) return true + is ParkingHandle -> if (state.compareAndSet(currentState, Awoken)) { + unpark(currentState) + return true + } + Awoken -> throw IllegalStateException("Node is already woken") + TimedOut -> return false + } + } + } + } + + private object Empty + private object Awoken + private object TimedOut + + + /** + * For testing purposes only, not thread safe! + */ + internal fun getQueueSize(): Int { + var size = 0 + var node: Node? = parkingQueue.getHead() + while (node != null) { + node = node.next.value + size++ + } + return size + } +} diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LockWithTimoutTests.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LockWithTimoutTests.kt new file mode 100644 index 00000000..0a0999fe --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LockWithTimoutTests.kt @@ -0,0 +1,49 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.atomic +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.milliseconds + +private const val N_THREADS = 5 +private const val TARGET_COUNT = 1_000 +private fun getRandomWait() = Random.nextInt(0, 5).toLong() +private fun getRandomTimeout() = Random.nextInt(1, 10) + +class LockWithTimeoutTests { + val counter = atomic(0) + + @Test + fun timeoutLockStressTest() { + val mutex = SynchronousMutex() + val threads = List(N_THREADS) { threadId -> + testThread { + while (counter.value < TARGET_COUNT) { + // Try to acquire the lock with a timeout + if (mutex.tryLock(getRandomTimeout().milliseconds)) { + try { + // Increment the counter if lock was acquired + if (counter.value < TARGET_COUNT) { + counter.incrementAndGet() + } + // Random sleep after increment to increase variation + sleepMillis(getRandomWait()) + } finally { + mutex.unlock() + } + } + // Random sleep between increment attempts to increase variation + sleepMillis(getRandomWait()) + } + } + } + + // Wait for all threads to complete + threads.forEach { it.join() } + + // Verify the counter reached the target + assertEquals(TARGET_COUNT, counter.value) + } + +} diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTest.kt new file mode 100644 index 00000000..7e27b903 --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTest.kt @@ -0,0 +1,45 @@ +package kotlinx.atomicfu.locks + +import kotlin.test.Test +import kotlin.test.assertEquals + +private const val SLEEP_MILLIS = 10L +private const val N_REPEATS_SLOW = 100 +private const val N_REPEATS_FAST = 30_000 + +class NativeMutexTest { + + @Test + fun testNativeMutexSlow() = testNativeMutex(SLEEP_MILLIS, N_REPEATS_SLOW) + + @Test + fun testNativeMutexFast() = testNativeMutex(0, N_REPEATS_FAST) + + private fun testNativeMutex(sleepDuration: Long, iterations: Int) { + val mutex = NativeMutex() + val resultList = mutableListOf() + + fun addResults(id: String) { + repeat(iterations) { i -> + mutex.lock() + resultList.add("$id$i") + if (sleepDuration > 0) sleepMillis(sleepDuration) + resultList.add("$id$i") + mutex.unlock() + } + } + + val futA = Fut { addResults("a") } + val futB = Fut { addResults("b") } + + addResults("c") + + futA.waitThrowing() + futB.waitThrowing() + + resultList.filterIndexed { i, _ -> i % 2 == 0 } + .zip(resultList.filterIndexed { i, _ -> i % 2 == 1 }) { a, b -> + assertEquals(a, b) + } + } +} \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ReentrancyTests.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ReentrancyTests.kt new file mode 100644 index 00000000..28fe7c78 --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ReentrancyTests.kt @@ -0,0 +1,28 @@ +package kotlinx.atomicfu.locks + +import kotlin.test.Test +import kotlin.test.assertFails + +class ReentrancyTests { + + @Test + fun reentrantTestSuccess() { + val lock = NativeMutex() + lock.lock() + lock.lock() + lock.unlock() + lock.unlock() + } + + @Test + fun reentrantTestFail() { + val lock = NativeMutex() + lock.lock() + lock.lock() + lock.unlock() + lock.unlock() + assertFails { + lock.unlock() + } + } +} \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/VaryingContentionTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/VaryingContentionTest.kt new file mode 100644 index 00000000..9b9d1f68 --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/VaryingContentionTest.kt @@ -0,0 +1,53 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.atomic +import kotlin.test.Test +import kotlin.test.assertTrue + +class VaryingContentionTest { + + @Test + fun varyingContentionTest() { + val lockInt = LockInt() + multiTestLock(lockInt, 10, 100000) + multiTestLock(lockInt, 1, 200000) + multiTestLock(lockInt, 20, 300000) + multiTestLock(lockInt, 1, 400000) + } + + + private fun multiTestLock(lockInt: LockInt, nThreads: Int, countTo: Int) { + val futureList = List(nThreads) { i -> + testWithThread(lockInt, countTo, nThreads, i) + } + Fut.waitAllAndThrow(futureList) + } + + private fun testWithThread(lockInt: LockInt, max: Int, mod: Int, id: Int): Fut { + return Fut { + while (true) { + lockInt.lock() + try { + if (lockInt.n % mod == id) lockInt.n++ + if (lockInt.n >= max) break + } finally { + lockInt.unlock() + } + } + } + } + + class LockInt { + private val lock = SynchronousMutex() + private val check = atomic(0) + var n = 0 + fun lock() { + lock.lock() + assertTrue(check.incrementAndGet() == 1) + } + fun unlock() { + assertTrue(check.decrementAndGet() == 0) + lock.unlock() + } + } +} \ No newline at end of file diff --git a/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.jsAndWasmShared.kt b/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.jsAndWasmShared.kt new file mode 100644 index 00000000..12dd680f --- /dev/null +++ b/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.jsAndWasmShared.kt @@ -0,0 +1,20 @@ +package kotlinx.atomicfu.locks + +import kotlin.time.Duration + +/** + * Multiplatform mutex. + * Since this mutex will run in a single threaded environment, it doesn't provide any real synchronization. + * + * It does keep track of reentrancy. + */ +public actual class SynchronousMutex { + private var state = 0 + public actual fun tryLock(): Boolean = true + public actual fun tryLock(timeout: Duration): Boolean = true + public actual fun lock(): Unit { state++ } + public actual fun unlock(): Unit { + check(state > 0) { "Mutex already unlocked" } + state-- + } +} \ No newline at end of file diff --git a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt new file mode 100644 index 00000000..9d1e270d --- /dev/null +++ b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt @@ -0,0 +1,12 @@ +package kotlinx.atomicfu.locks + +import java.util.concurrent.TimeUnit +import kotlin.time.Duration + +public actual class SynchronousMutex { + private val reentrantLock = ReentrantLock() + public actual fun tryLock(timeout: Duration): Boolean = reentrantLock.tryLock(timeout.inWholeNanoseconds, TimeUnit.NANOSECONDS) + public actual fun tryLock(): Boolean = reentrantLock.tryLock() + public actual fun lock(): Unit = reentrantLock.lock() + public actual fun unlock(): Unit = reentrantLock.unlock() +} \ No newline at end of file diff --git a/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexLincheckTest.kt b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexLincheckTest.kt new file mode 100644 index 00000000..58a43079 --- /dev/null +++ b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexLincheckTest.kt @@ -0,0 +1,61 @@ +package kotlinx.atomicfu.locks + +import org.jetbrains.lincheck.datastructures.ModelCheckingOptions +import org.jetbrains.lincheck.datastructures.Operation +import org.jetbrains.lincheck.datastructures.Validate +import org.jetbrains.lincheck.util.LoggingLevel +import java.util.concurrent.ConcurrentHashMap +import kotlin.test.Test + +class NativeMutexLincheckTest { + class Counter { + private var value = 0 + fun inc(): Int = ++value + fun get() = value + } + + private val counter = Counter() + private val localParkers = ConcurrentHashMap() + + // Lazy prevents issue with lincheck not finding lambdas + private val lock by lazy { + NativeMutex( + park = { localParkers[ParkingSupport.currentThreadHandle()]!!.park() }, + unpark = { localParkers[it]!!.unpark() } + ) + } + + @Test + fun modelCheckingTest(): Unit = ModelCheckingOptions() + .iterations(2) // Change to 300 for exhaustive testing + .invocationsPerIteration(5_000) + .actorsBefore(1) + .threads(3) + .actorsPerThread(3) + .actorsAfter(0) + .hangingDetectionThreshold(100) + .logLevel(LoggingLevel.INFO) + .check(this::class.java) + + @Operation + fun inc() { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + counter.inc() + lock.unlock() + } + + @Operation + fun get() { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + counter.get() + lock.unlock() + } + + @Validate + fun validate() { + // Check queue is empty (only have head) + check(lock.getQueueSize() == 1) + } +} \ No newline at end of file diff --git a/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexReentrantLincheckTest.kt b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexReentrantLincheckTest.kt new file mode 100644 index 00000000..4a0dddd0 --- /dev/null +++ b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexReentrantLincheckTest.kt @@ -0,0 +1,71 @@ +package kotlinx.atomicfu.locks + +import org.jetbrains.lincheck.datastructures.ModelCheckingOptions +import org.jetbrains.lincheck.datastructures.Operation +import org.jetbrains.lincheck.datastructures.Validate +import org.jetbrains.lincheck.util.LoggingLevel +import java.util.concurrent.ConcurrentHashMap +import kotlin.test.Test + +class NativeMutexReentrantLincheckTest { + class Counter { + private var value = 0 + fun inc(): Int = ++value + fun get() = value + } + + private val counter = Counter() + private val localParkers = ConcurrentHashMap() + + // Lazy prevents issue with lincheck not finding lambdas + private val lock by lazy { + NativeMutex( + park = { localParkers[ParkingSupport.currentThreadHandle()]!!.park() }, + unpark = { localParkers[it]!!.unpark() } + ) + } + + @Test + fun modelCheckingTest(): Unit = ModelCheckingOptions() + .iterations(2) // Change to 300 for exhaustive testing + .invocationsPerIteration(5_000) + .actorsBefore(1) + .threads(3) + .actorsPerThread(3) + .actorsAfter(0) + .hangingDetectionThreshold(100) + .logLevel(LoggingLevel.INFO) + .check(this::class.java) + + @Operation + fun inc(): Int { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + check(lock.tryLock()) {"couldn't reenter with trylock"} + check(lock.tryLock()) {"couldn't reenter with trylock"} + val result = counter.inc() + lock.unlock() + lock.unlock() + lock.unlock() + return result + } + + @Operation + fun get(): Int { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + check(lock.tryLock()) {"couldn't reenter with trylock"} + check(lock.tryLock()) {"couldn't reenter with trylock"} + val result = counter.get() + lock.unlock() + lock.unlock() + lock.unlock() + return result + } + + @Validate + fun validate() { + // Check queue is empty (only have head) + check(lock.getQueueSize() == 1) + } +} diff --git a/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTimeoutLincheckTest.kt b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTimeoutLincheckTest.kt new file mode 100644 index 00000000..c91c15dd --- /dev/null +++ b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTimeoutLincheckTest.kt @@ -0,0 +1,72 @@ +package kotlinx.atomicfu.locks + +import org.jetbrains.lincheck.datastructures.ModelCheckingOptions +import org.jetbrains.lincheck.datastructures.Operation +import org.jetbrains.lincheck.datastructures.Validate +import org.jetbrains.lincheck.util.LoggingLevel +import org.junit.Test +import java.util.concurrent.ConcurrentHashMap +import kotlin.time.Duration.Companion.nanoseconds + +class NativeMutexTimeoutLincheckTest { + class Counter { + private var value = 0 + fun inc(): Int = ++value + fun get() = value + } + + private val counter = Counter() + private val localParkers = ConcurrentHashMap() + + // Lazy prevents issue with lincheck not finding lambdas + private val lock by lazy { + NativeMutex( + park = { localParkers[ParkingSupport.currentThreadHandle()]!!.parkNanos(it.inWholeNanoseconds) }, + unpark = { localParkers[it]!!.unpark() } + ) + } + + + @Test + fun modelCheckingTest(): Unit = ModelCheckingOptions() + .iterations(2) // Change to 300 for exhaustive testing + .invocationsPerIteration(5_000) + .actorsBefore(1) + .threads(3) + .actorsPerThread(3) + .actorsAfter(0) + .hangingDetectionThreshold(100) + .logLevel(LoggingLevel.INFO) + .check(this::class.java) + + @Operation + fun incNoTimeout() { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + counter.inc() + lock.unlock() + } + + @Operation + fun incTimeout() { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + if (lock.tryLock(0.nanoseconds)) { + counter.inc() + lock.unlock() + } + } + + @Operation + fun get() { + localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() } + lock.lock() + counter.get() + lock.unlock() + } + + @Validate + fun validate() { + // Check queue is empty (only have head) + check(lock.getQueueSize() == 1) + } +} \ No newline at end of file diff --git a/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt deleted file mode 100644 index e3e1f8d0..00000000 --- a/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.atomicfu.locks - -import kotlinx.atomicfu.atomic -import kotlinx.cinterop.Arena -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.alloc -import kotlinx.cinterop.ptr -import platform.posix.* - -@OptIn(ExperimentalForeignApi::class) -public actual class NativeMutexNode { - internal actual var next: NativeMutexNode? = null - - private val arena: Arena = Arena() - private val cond: pthread_cond_t = arena.alloc() - private val mutex: pthread_mutex_t = arena.alloc() - private val attr: pthread_mutexattr_t = arena.alloc() - - init { - require(pthread_cond_init(cond.ptr, null) == 0) - require(pthread_mutexattr_init(attr.ptr) == 0) - require(pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK.toInt()) == 0) - require(pthread_mutex_init(mutex.ptr, attr.ptr) == 0) - } - - public actual fun lock() { - pthread_mutex_lock(mutex.ptr) - } - - public actual fun unlock() { - pthread_mutex_unlock(mutex.ptr) - } - - internal actual fun wait(lockOwner: Long) { - pthread_cond_wait(cond.ptr, mutex.ptr) - } - - internal actual fun notify() { - pthread_cond_signal(cond.ptr) - } - - internal actual fun dispose() { - pthread_cond_destroy(cond.ptr) - pthread_mutex_destroy(mutex.ptr) - pthread_mutexattr_destroy(attr.ptr) - arena.clear() - } -} - -private val threadCounter = atomic(0L) - -internal actual fun createThreadId(): Long = threadCounter.incrementAndGet() diff --git a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt deleted file mode 100644 index f23ddb83..00000000 --- a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.atomicfu.locks - -import kotlinx.atomicfu.atomic -import kotlinx.cinterop.Arena -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.alloc -import kotlinx.cinterop.ptr -import platform.posix.* - -@OptIn(ExperimentalForeignApi::class) -public actual class NativeMutexNode { - internal actual var next: NativeMutexNode? = null - - private val arena: Arena = Arena() - private val cond: pthread_cond_tVar = arena.alloc() - private val mutex: pthread_mutex_tVar = arena.alloc() - private val attr: pthread_mutexattr_tVar = arena.alloc() - - init { - require(pthread_cond_init(cond.ptr, null) == 0) - require(pthread_mutexattr_init(attr.ptr) == 0) - require(pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK) == 0) - require(pthread_mutex_init(mutex.ptr, attr.ptr) == 0) - } - - public actual fun lock() { - pthread_mutex_lock(mutex.ptr) - } - - public actual fun unlock() { - pthread_mutex_unlock(mutex.ptr) - } - - internal actual fun wait(lockOwner: Long) { - pthread_cond_wait(cond.ptr, mutex.ptr) - } - - internal actual fun notify() { - pthread_cond_signal(cond.ptr) - } - - internal actual fun dispose() { - pthread_cond_destroy(cond.ptr) - pthread_mutex_destroy(mutex.ptr) - pthread_mutexattr_destroy(attr.ptr) - arena.clear() - } -} - -private val threadCounter = atomic(0L) - -internal actual fun createThreadId(): Long = threadCounter.incrementAndGet() diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt deleted file mode 100644 index 3b6be80d..00000000 --- a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.atomicfu.locks - - -public expect class NativeMutexNode() { - - internal var next: NativeMutexNode? - - public fun lock() - - public fun unlock() - - // The lockOwner is used for qos donation on iOS - internal fun wait(lockOwner: Long) - - internal fun notify() - - internal fun dispose() -} diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt index 2cfa72ce..115a0919 100644 --- a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt @@ -1,107 +1,13 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ package kotlinx.atomicfu.locks -import kotlin.native.ref.createCleaner -import kotlinx.atomicfu.* -import kotlin.experimental.ExperimentalNativeApi - - -import kotlin.native.concurrent.ThreadLocal - -internal const val NO_OWNER = 0L -private const val UNSET = 0L - -@ThreadLocal -private var currentThreadId = UNSET - -// Based on the compose-multiplatform-core implementation with added qos and the pool back-ported -// from the atomicfu implementation. public actual open class SynchronizedObject { - private val ownerThreadId: AtomicLong = atomic(NO_OWNER) - private var reEnterCount: Int = 0 - private val threadsOnLock: AtomicInt = atomic(0) - - private val monitor: MonitorWrapper by lazy { MonitorWrapper() } - - - public fun lock() { - var self = currentThreadId - if (self == UNSET) { - currentThreadId = createThreadId() - self = currentThreadId - } - if (ownerThreadId.value == self) { - reEnterCount += 1 - } else if (threadsOnLock.incrementAndGet() > 1) { - waitForUnlockAndLock(self) - } else { - if (!ownerThreadId.compareAndSet(NO_OWNER, self)) { - waitForUnlockAndLock(self) - } - } - } - - public fun tryLock(): Boolean { - var self = currentThreadId - if (self == 0L) { - currentThreadId = createThreadId() - self = currentThreadId - } - return if (ownerThreadId.value == self) { - reEnterCount += 1 - true - } else if (threadsOnLock.incrementAndGet() == 1 && ownerThreadId.compareAndSet(NO_OWNER, self)) { - true - } else { - threadsOnLock.decrementAndGet() - false - } - } - - - private fun waitForUnlockAndLock(self: Long) { - withMonitor(monitor) { - while (!ownerThreadId.compareAndSet(NO_OWNER, self)) { - monitor.nativeMutex.wait(ownerThreadId.value) - } - } - } - - public fun unlock() { - require (ownerThreadId.value == currentThreadId) - if (reEnterCount > 0) { - reEnterCount -= 1 - } else { - ownerThreadId.value = NO_OWNER - if (threadsOnLock.decrementAndGet() > 0) { - withMonitor(monitor) { - // We expect the highest priority thread to be woken up, but this should work - // in any case. - monitor.nativeMutex.notify() - } - } - } - } - - private inline fun withMonitor(monitor: MonitorWrapper, block: () -> Unit) { - monitor.nativeMutex.lock() - return try { - block() - } finally { - monitor.nativeMutex.unlock() - } - } - - @OptIn(ExperimentalNativeApi::class) - private class MonitorWrapper { - val nativeMutex = mutexPool.allocate() - val cleaner = createCleaner(nativeMutex) { mutexPool.release(it) } - } + + private val nativeMutex = SynchronousMutex() + public fun lock(): Unit = nativeMutex.lock() + public fun tryLock(): Boolean = nativeMutex.tryLock() + public fun unlock(): Unit = nativeMutex.unlock() } - public actual fun reentrantLock(): ReentrantLock = ReentrantLock() public actual typealias ReentrantLock = SynchronizedObject @@ -122,55 +28,4 @@ public actual inline fun synchronized(lock: SynchronizedObject, block: () -> } finally { lock.unlock() } -} - - -private const val INITIAL_POOL_CAPACITY = 64 -private const val MAX_POOL_SIZE = 1024 - -internal val mutexPool by lazy { MutexPool() } - -internal class MutexPool() { - private val size = atomic(0) - private val top = atomic(null) - - init { - // Immediately form a stack - for (i in 0 until INITIAL_POOL_CAPACITY) { - release(NativeMutexNode()) - } - } - - private fun allocMutexNode() = NativeMutexNode() - - fun allocate(): NativeMutexNode = pop() ?: allocMutexNode() - - fun release(mutexNode: NativeMutexNode) { - if (size.value > MAX_POOL_SIZE) { - mutexNode.dispose() - } else { - while (true) { - val oldTop = top.value - mutexNode.next = oldTop - if (top.compareAndSet(oldTop, mutexNode)) { - size.incrementAndGet() - return - } - } - } - } - - private fun pop(): NativeMutexNode? { - while (true) { - val oldTop = top.value - if (oldTop == null) { - return null - } - val newHead = oldTop.next - if (top.compareAndSet(oldTop, newHead)) { - size.decrementAndGet() - return oldTop - } - } - } -} +} \ No newline at end of file diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt new file mode 100644 index 00000000..cd50f621 --- /dev/null +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt @@ -0,0 +1,11 @@ +package kotlinx.atomicfu.locks + +import kotlin.time.Duration + +public actual class SynchronousMutex { + private val lock = NativeMutex() + public actual fun tryLock(): Boolean = lock.tryLock() + public actual fun tryLock(timeout: Duration): Boolean = lock.tryLock(timeout) + public actual fun lock(): Unit = lock.lock() + public actual fun unlock(): Unit = lock.unlock() +} \ No newline at end of file diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt deleted file mode 100644 index acb782bd..00000000 --- a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt +++ /dev/null @@ -1,3 +0,0 @@ -package kotlinx.atomicfu.locks - -internal expect fun createThreadId(): Long \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 49c27793..885b523c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -11,6 +11,7 @@ node-gradle = "3.1.1" rhino = "1.7.10" gradle-plugin-publish = "1.2.1" gradle-develocity = "3.17.6" +lincheck = "3.1.1" [libraries] @@ -40,6 +41,7 @@ slf4j-simple = { group = "org.slf4j", name = "slf4j-simple", version.ref = "slf4 # Testing depedencies junit-junit = { group = "junit", name = "junit", version.ref = "junit" } +jetbrains-lincheck = { group = "org.jetbrains.lincheck", name = "lincheck", version.ref = "lincheck" } # Maven dependencies maven-core = { group = "org.apache.maven", name = "maven-core", version.ref = "maven" }