Skip to content

Commit 7004801

Browse files
committed
fix: Serialize entry writes
1 parent da82150 commit 7004801

File tree

3 files changed

+66
-96
lines changed

3 files changed

+66
-96
lines changed

safebox/src/androidTest/java/com/harrytmthy/safebox/SafeBoxTest.kt

Lines changed: 24 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,20 @@ import kotlinx.coroutines.CoroutineDispatcher
2828
import kotlinx.coroutines.Dispatchers
2929
import kotlinx.coroutines.ExperimentalCoroutinesApi
3030
import kotlinx.coroutines.test.UnconfinedTestDispatcher
31+
import kotlinx.coroutines.test.runTest
32+
import kotlinx.coroutines.withTimeout
3133
import org.junit.After
3234
import org.junit.runner.RunWith
3335
import java.io.File
3436
import java.security.KeyStore
35-
import java.util.concurrent.CopyOnWriteArrayList
3637
import java.util.concurrent.atomic.AtomicBoolean
3738
import kotlin.test.Test
3839
import kotlin.test.assertContentEquals
3940
import kotlin.test.assertEquals
4041
import kotlin.test.assertFalse
4142
import kotlin.test.assertNull
4243
import kotlin.test.assertTrue
44+
import kotlin.time.Duration.Companion.seconds
4345

4446
@OptIn(ExperimentalCoroutinesApi::class)
4547
@RunWith(AndroidJUnit4::class)
@@ -179,47 +181,6 @@ class SafeBoxTest {
179181
assertContentEquals(expectedValueChanges, changedValues)
180182
}
181183

182-
@Test
183-
fun closeWhenIdle_shouldWaitUntilWritesAreDoneBeforeClosing() {
184-
val observedStates = CopyOnWriteArrayList<SafeBoxState>()
185-
val closed = AtomicBoolean(false)
186-
safeBox = createSafeBox(
187-
ioDispatcher = Dispatchers.IO,
188-
stateListener = SafeBoxStateListener { state ->
189-
observedStates.add(state)
190-
closed.set(state == SafeBoxState.CLOSED)
191-
},
192-
)
193-
repeat(5) {
194-
safeBox.edit()
195-
.putString("key", "value")
196-
.apply()
197-
}
198-
safeBox.closeWhenIdle()
199-
repeat(5) {
200-
safeBox.edit()
201-
.putString("key", "value")
202-
.apply()
203-
}
204-
while (!closed.get()) {
205-
Thread.sleep(3)
206-
}
207-
val expectedOnSlowInit = listOf(
208-
SafeBoxState.STARTING,
209-
SafeBoxState.WRITING,
210-
SafeBoxState.IDLE,
211-
SafeBoxState.CLOSED,
212-
)
213-
val expectedOnFastInit = listOf(
214-
SafeBoxState.STARTING,
215-
SafeBoxState.IDLE, // finished STARTING before launching any write operation
216-
SafeBoxState.WRITING,
217-
SafeBoxState.IDLE,
218-
SafeBoxState.CLOSED,
219-
)
220-
assertTrue(observedStates == expectedOnSlowInit || observedStates == expectedOnFastInit)
221-
}
222-
223184
@Test
224185
fun putString_shouldDoNothingAfterClosing() {
225186
val hasEmissionAfterClose = AtomicBoolean(false)
@@ -241,6 +202,27 @@ class SafeBoxTest {
241202
assertFalse(hasEmissionAfterClose.get())
242203
}
243204

205+
@Test
206+
fun apply_then_commit_shouldHaveCorrectOrder() = runTest {
207+
safeBox = createSafeBox(ioDispatcher = Dispatchers.IO)
208+
209+
withTimeout(10.seconds) {
210+
safeBox.edit().putInt("0", 0).apply()
211+
safeBox.edit().putInt("1", 1).apply()
212+
assertTrue(safeBox.edit().clear().commit())
213+
safeBox.edit().putInt("2", 2).apply()
214+
safeBox.edit().putInt("3", 3).apply()
215+
assertTrue(safeBox.edit().clear().commit())
216+
safeBox.edit().putInt("4", 4).apply()
217+
}
218+
219+
assertEquals(4, safeBox.getInt("4", -1))
220+
assertEquals(-1, safeBox.getInt("3", -1))
221+
assertEquals(-1, safeBox.getInt("2", -1))
222+
assertEquals(-1, safeBox.getInt("1", -1))
223+
assertEquals(-1, safeBox.getInt("0", -1))
224+
}
225+
244226
private fun createSafeBox(
245227
ioDispatcher: CoroutineDispatcher = UnconfinedTestDispatcher(),
246228
stateListener: SafeBoxStateListener? = null,

safebox/src/main/java/com/harrytmthy/safebox/SafeBox.kt

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import com.harrytmthy.safebox.strategy.ValueFallbackStrategy
4141
import com.harrytmthy.safebox.strategy.ValueFallbackStrategy.WARN
4242
import kotlinx.coroutines.CompletableDeferred
4343
import kotlinx.coroutines.CoroutineDispatcher
44-
import kotlinx.coroutines.CoroutineExceptionHandler
4544
import kotlinx.coroutines.Dispatchers
4645
import kotlinx.coroutines.sync.Mutex
4746
import kotlinx.coroutines.sync.withLock
@@ -86,38 +85,34 @@ public class SafeBox private constructor(
8685

8786
private val listeners = CopyOnWriteArrayList<OnSharedPreferenceChangeListener>()
8887

89-
private val commitMutex = Mutex()
88+
private val writeMutex = Mutex()
9089

91-
private val applyMutex = Mutex()
92-
93-
@Volatile
94-
private var applyCompleted = CompletableDeferred<Unit>().apply { complete(Unit) }
90+
private val writeBarrier = AtomicReference(CompletableDeferred<Unit>().apply { complete(Unit) })
9591

9692
private val delegate = object : Delegate {
9793

9894
private val updateLock = Any()
9995

100-
private val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
101-
Log.e("SafeBox", "Failed to apply changes.", throwable)
102-
applyCompleted.complete(Unit)
103-
}
104-
10596
override fun commit(entries: LinkedHashMap<String, Action>, cleared: Boolean): Boolean {
10697
val entriesToWrite = LinkedHashMap(entries)
10798
synchronized(updateLock) {
10899
entries.clear() // Prevents stale mutations on reused editor instance
109100
updateEntries(entriesToWrite, cleared)
110101
}
102+
val currentWriteBarrier = CompletableDeferred<Unit>()
103+
val previousWriteBarrier = writeBarrier.getAndSet(currentWriteBarrier)
111104
return stateManager.launchCommitWithWritingState {
112105
try {
113-
applyCompleted.await()
114-
commitMutex.withLock {
106+
previousWriteBarrier.await()
107+
writeMutex.withLock {
115108
applyChanges(entriesToWrite, cleared)
116109
}
117110
true
118111
} catch (e: Exception) {
119112
Log.e("SafeBox", "Failed to commit changes.", e)
120113
false
114+
} finally {
115+
currentWriteBarrier.complete(Unit)
121116
}
122117
}
123118
}
@@ -128,12 +123,19 @@ public class SafeBox private constructor(
128123
entries.clear() // Prevents stale mutations on reused editor instance
129124
updateEntries(entriesToWrite, cleared)
130125
}
131-
stateManager.launchApplyWithWritingState(exceptionHandler) {
132-
applyCompleted = CompletableDeferred()
133-
applyMutex.withLock {
134-
applyChanges(entriesToWrite, cleared)
126+
val currentWriteBarrier = CompletableDeferred<Unit>()
127+
val previousWriteBarrier = writeBarrier.getAndSet(currentWriteBarrier)
128+
stateManager.launchApplyWithWritingState {
129+
try {
130+
previousWriteBarrier.await()
131+
writeMutex.withLock {
132+
applyChanges(entriesToWrite, cleared)
133+
}
134+
} catch (e: Exception) {
135+
Log.e("SafeBox", "Failed to commit changes.", e)
136+
} finally {
137+
currentWriteBarrier.complete(Unit)
135138
}
136-
applyCompleted.complete(Unit)
137139
}
138140
}
139141

safebox/src/main/java/com/harrytmthy/safebox/state/SafeBoxStateManager.kt

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import com.harrytmthy.safebox.state.SafeBoxState.STARTING
2424
import com.harrytmthy.safebox.state.SafeBoxState.WRITING
2525
import kotlinx.coroutines.CompletableDeferred
2626
import kotlinx.coroutines.CoroutineDispatcher
27-
import kotlinx.coroutines.CoroutineExceptionHandler
2827
import kotlinx.coroutines.launch
2928
import kotlinx.coroutines.runBlocking
3029
import java.util.concurrent.atomic.AtomicBoolean
@@ -64,52 +63,46 @@ internal class SafeBoxStateManager(
6463
inline fun launchWithStartingState(crossinline block: suspend () -> Unit) {
6564
updateState(STARTING)
6665
safeBoxScope.launch(ioDispatcher) {
67-
block()
68-
if (concurrentWriteCount.get() == 0) {
69-
updateState(IDLE)
70-
} else {
71-
updateState(WRITING)
66+
try {
67+
block()
68+
} finally {
69+
initialReadCompleted.complete(Unit)
70+
if (concurrentWriteCount.get() == 0) {
71+
updateState(IDLE)
72+
}
7273
}
73-
initialReadCompleted.complete(Unit)
7474
}
7575
}
7676

7777
inline fun launchCommitWithWritingState(crossinline block: suspend () -> Boolean): Boolean {
7878
if (closed.get()) {
7979
return false
8080
}
81-
if (concurrentWriteCount.incrementAndGet() == 1) {
82-
writeCompleted.set(CompletableDeferred())
83-
if (initialReadCompleted.isCompleted) {
84-
updateState(WRITING)
85-
}
86-
}
8781
return runBlocking {
88-
initialReadCompleted.await()
89-
val result = block()
90-
finalizeWriting()
91-
result
82+
withStateTransition(block)
9283
}
9384
}
9485

95-
inline fun launchApplyWithWritingState(
96-
exceptionHandler: CoroutineExceptionHandler,
97-
crossinline block: suspend () -> Unit,
98-
) {
86+
inline fun launchApplyWithWritingState(crossinline block: suspend () -> Unit) {
9987
if (closed.get()) {
10088
return
10189
}
90+
safeBoxScope.launch(ioDispatcher) {
91+
withStateTransition(block)
92+
}
93+
}
94+
95+
private suspend inline fun <T> withStateTransition(crossinline block: suspend () -> T): T {
96+
initialReadCompleted.await()
10297
if (concurrentWriteCount.incrementAndGet() == 1) {
103-
writeCompleted.set(CompletableDeferred())
104-
if (initialReadCompleted.isCompleted) {
105-
updateState(WRITING)
106-
}
98+
updateState(WRITING)
10799
}
108-
safeBoxScope.launch(ioDispatcher + exceptionHandler) {
109-
initialReadCompleted.await()
100+
return try {
110101
block()
111-
}.invokeOnCompletion {
112-
finalizeWriting()
102+
} finally {
103+
if (concurrentWriteCount.decrementAndGet() == 0) {
104+
updateState(IDLE)
105+
}
113106
}
114107
}
115108

@@ -133,11 +126,4 @@ internal class SafeBoxStateManager(
133126
stateListener?.onStateChanged(newState)
134127
SafeBoxGlobalStateObserver.updateState(fileName, newState)
135128
}
136-
137-
private fun finalizeWriting() {
138-
if (concurrentWriteCount.decrementAndGet() == 0) {
139-
updateState(IDLE)
140-
writeCompleted.get()?.complete(Unit)
141-
}
142-
}
143129
}

0 commit comments

Comments
 (0)