Skip to content

Commit 9493a04

Browse files
authored
Tidy up our synchronization code (#413) (#415)
* Confine all interop signatures into NativeMutexNode to simplify further working and maintenance * Get rid of mutex_node_t
1 parent 5fe9b3d commit 9493a04

File tree

3 files changed

+80
-40
lines changed

3 files changed

+80
-40
lines changed

atomicfu/src/nativeInterop/cinterop/interop.def

-12
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,13 @@ typedef struct lock_support {
99
pthread_cond_t cond;
1010
} lock_support_t;
1111

12-
typedef struct mutex_node {
13-
lock_support_t* mutex;
14-
struct mutex_node* next;
15-
} mutex_node_t;
16-
1712
lock_support_t* lock_support_init() {
1813
lock_support_t * ls = (lock_support_t *) malloc(sizeof(lock_support_t));
1914
ls->locked = 0;
2015
pthread_mutex_init(&ls->mutex, NULL);
2116
pthread_cond_init(&ls->cond, NULL);
2217
return ls;
2318
}
24-
25-
mutex_node_t* mutex_node_init(mutex_node_t* mutexNode) {
26-
mutexNode->mutex = lock_support_init();
27-
mutexNode->next = NULL;
28-
return mutexNode;
29-
}
30-
3119
void lock(lock_support_t* ls) {
3220
pthread_mutex_lock(&ls->mutex);
3321
while (ls->locked == 1) { // wait till locked are available
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package kotlinx.atomicfu.locks
2+
3+
import interop.*
4+
import kotlinx.cinterop.*
5+
import platform.posix.*
6+
import kotlin.concurrent.*
7+
8+
public class NativeMutexNode {
9+
private val mutex: CPointer<lock_support_t> = lock_support_init()!!
10+
11+
internal var next: NativeMutexNode? = null
12+
13+
fun lock() {
14+
interop.lock(mutex)
15+
}
16+
17+
fun unlock() {
18+
interop.unlock(mutex)
19+
}
20+
}
21+
22+
/**
23+
* This is a trivial counter-part of NativeMutexNode that does not rely on interop.def
24+
* The problem is, commonizer cannot commonize pthreads, thus this declaration should be duplicated
25+
* over multiple Native source-sets to work properly
26+
*/
27+
//public class NativeMutexNode {
28+
//
29+
// @Volatile
30+
// private var isLocked = false
31+
// private val pMutex = nativeHeap.alloc<pthread_mutex_t>().apply { pthread_mutex_init(ptr, null) }
32+
// private val pCond = nativeHeap.alloc<pthread_cond_t>().apply { pthread_cond_init(ptr, null) }
33+
//
34+
// internal var next: NativeMutexNode? = null
35+
//
36+
// fun lock() {
37+
// pthread_mutex_lock(pMutex.ptr)
38+
// while (isLocked) { // wait till locked are available
39+
// pthread_cond_wait(pCond.ptr, pMutex.ptr)
40+
// }
41+
// isLocked = true
42+
// pthread_mutex_unlock(pMutex.ptr)
43+
// }
44+
//
45+
// fun unlock() {
46+
// pthread_mutex_lock(pMutex.ptr)
47+
// isLocked = false
48+
// pthread_cond_broadcast(pCond.ptr)
49+
// pthread_mutex_unlock(pMutex.ptr)
50+
// }
51+
//}

atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt

+29-28
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
package kotlinx.atomicfu.locks
22

33
import platform.posix.*
4-
import interop.*
5-
import kotlinx.cinterop.*
6-
import kotlin.native.internal.NativePtr
74
import kotlinx.atomicfu.locks.SynchronizedObject.Status.*
8-
import kotlin.concurrent.AtomicNativePtr
95
import kotlin.concurrent.AtomicReference
10-
import kotlin.native.concurrent.*
116

127
public actual open class SynchronizedObject {
138

@@ -23,6 +18,7 @@ public actual open class SynchronizedObject {
2318
if (lock.compareAndSet(state, thinLock))
2419
return
2520
}
21+
2622
THIN -> {
2723
if (currentThreadId == state.ownerThreadId) {
2824
// reentrant lock
@@ -46,13 +42,16 @@ public actual open class SynchronizedObject {
4642
}
4743
}
4844
}
45+
4946
FAT -> {
5047
if (currentThreadId == state.ownerThreadId) {
5148
// reentrant lock
52-
val nestedFatLock = LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex)
49+
val nestedFatLock =
50+
LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex)
5351
if (lock.compareAndSet(state, nestedFatLock)) return
5452
} else if (state.ownerThreadId != null) {
55-
val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex)
53+
val fatLock =
54+
LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex)
5655
if (lock.compareAndSet(state, fatLock)) {
5756
fatLock.mutex!!.lock()
5857
tryLockAfterResume(currentThreadId)
@@ -74,7 +73,8 @@ public actual open class SynchronizedObject {
7473
return true
7574
} else {
7675
if (currentThreadId == state.ownerThreadId) {
77-
val nestedLock = LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex)
76+
val nestedLock =
77+
LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex)
7878
if (lock.compareAndSet(state, nestedLock))
7979
return true
8080
} else {
@@ -103,6 +103,7 @@ public actual open class SynchronizedObject {
103103
return
104104
}
105105
}
106+
106107
FAT -> {
107108
if (state.nestedLocks == 1) {
108109
// last nested unlock -> release completely, resume some waiter
@@ -119,6 +120,7 @@ public actual open class SynchronizedObject {
119120
return
120121
}
121122
}
123+
122124
else -> error("It is not possible to unlock the mutex that is not obtained")
123125
}
124126
}
@@ -146,14 +148,10 @@ public actual open class SynchronizedObject {
146148
val nestedLocks: Int,
147149
val waiters: Int,
148150
val ownerThreadId: pthread_t? = null,
149-
val mutex: CPointer<mutex_node_t>? = null
151+
val mutex: NativeMutexNode? = null
150152
)
151153

152154
protected enum class Status { UNLOCKED, THIN, FAT }
153-
154-
private fun CPointer<mutex_node_t>.lock() = lock(this.pointed.mutex)
155-
156-
private fun CPointer<mutex_node_t>.unlock() = unlock(this.pointed.mutex)
157155
}
158156

159157
public actual fun reentrantLock() = ReentrantLock()
@@ -183,37 +181,40 @@ private const val INITIAL_POOL_CAPACITY = 64
183181
private val mutexPool by lazy { MutexPool(INITIAL_POOL_CAPACITY) }
184182

185183
class MutexPool(capacity: Int) {
186-
private val top = AtomicNativePtr(NativePtr.NULL)
184+
private val top = AtomicReference<NativeMutexNode?>(null)
187185

188-
private val mutexes = nativeHeap.allocArray<mutex_node_t>(capacity) { mutex_node_init(ptr) }
186+
private val mutexes = Array<NativeMutexNode>(capacity) { NativeMutexNode() }
189187

190188
init {
191-
for (i in 0 until capacity) {
192-
release(interpretCPointer<mutex_node_t>(mutexes.rawValue.plus(i * sizeOf<mutex_node_t>()))!!)
189+
// Immediately form a stack
190+
for (mutex in mutexes) {
191+
release(mutex)
193192
}
194193
}
195194

196-
private fun allocMutexNode() = nativeHeap.alloc<mutex_node_t> { mutex_node_init(ptr) }.ptr
195+
private fun allocMutexNode() = NativeMutexNode()
197196

198-
fun allocate(): CPointer<mutex_node_t> = pop() ?: allocMutexNode()
197+
fun allocate(): NativeMutexNode = pop() ?: allocMutexNode()
199198

200-
fun release(mutexNode: CPointer<mutex_node_t>) {
199+
fun release(mutexNode: NativeMutexNode) {
201200
while (true) {
202-
val oldTop = interpretCPointer<mutex_node_t>(top.value)
203-
mutexNode.pointed.next = oldTop
204-
if (top.compareAndSet(oldTop.rawValue, mutexNode.rawValue))
201+
val oldTop = top.value
202+
mutexNode.next = oldTop
203+
if (top.compareAndSet(oldTop, mutexNode)) {
205204
return
205+
}
206206
}
207207
}
208208

209-
private fun pop(): CPointer<mutex_node_t>? {
209+
private fun pop(): NativeMutexNode? {
210210
while (true) {
211-
val oldTop = interpretCPointer<mutex_node_t>(top.value)
212-
if (oldTop.rawValue === NativePtr.NULL)
211+
val oldTop = top.value
212+
if (oldTop == null)
213213
return null
214-
val newHead = oldTop!!.pointed.next
215-
if (top.compareAndSet(oldTop.rawValue, newHead.rawValue))
214+
val newHead = oldTop.next
215+
if (top.compareAndSet(oldTop, newHead)) {
216216
return oldTop
217+
}
217218
}
218219
}
219220
}

0 commit comments

Comments
 (0)