Skip to content

Commit f2a2b69

Browse files
amrfarid140solcott
authored andcommitted
Avoid deadlock in RealMutableStore (#658)
* Add test case Signed-off-by: Amr Yousef <[email protected]> * Always Release storeLock Signed-off-by: Amr Yousef <[email protected]> * Update kermit to 2.0.4 (#655) Fixes #653 and #654 Signed-off-by: Scott Olcott <[email protected]> Signed-off-by: Amr Yousef <[email protected]> * Revert "Update kermit to 2.0.4 (#655)" This reverts commit 76f34d4. Signed-off-by: Amr Yousef <[email protected]> --------- Signed-off-by: Amr Yousef <[email protected]> Signed-off-by: Scott Olcott <[email protected]> Co-authored-by: Scott Olcott <[email protected]>
1 parent 219a251 commit f2a2b69

File tree

2 files changed

+99
-4
lines changed

2 files changed

+99
-4
lines changed

store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,13 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
193193
@AnyThread
194194
private suspend fun <Output : Any?> withThreadSafety(key: Key, block: suspend ThreadSafety.() -> Output): Output {
195195
storeLock.lock()
196-
val threadSafety = requireNotNull(keyToThreadSafety[key])
197-
val output = threadSafety.block()
198-
storeLock.unlock()
199-
return output
196+
try {
197+
val threadSafety = requireNotNull(keyToThreadSafety[key])
198+
val output = threadSafety.block()
199+
return output
200+
} finally {
201+
storeLock.unlock()
202+
}
200203
}
201204

202205
private suspend fun conflictsMightExist(key: Key): Boolean {

store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/StoreWithInMemoryCacheTests.kt

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
package org.mobilenativefoundation.store.store5
22

3+
import kotlinx.coroutines.CoroutineScope
4+
import kotlinx.coroutines.Dispatchers
35
import kotlinx.coroutines.ExperimentalCoroutinesApi
46
import kotlinx.coroutines.FlowPreview
7+
import kotlinx.coroutines.Job
8+
import kotlinx.coroutines.async
9+
import kotlinx.coroutines.awaitAll
10+
import kotlinx.coroutines.cancel
11+
import kotlinx.coroutines.flow.*
512
import kotlinx.coroutines.test.TestScope
613
import kotlinx.coroutines.test.runTest
714
import org.mobilenativefoundation.store.store5.impl.extensions.get
815
import kotlin.test.Test
916
import kotlin.test.assertEquals
17+
import kotlin.test.assertIs
18+
import kotlin.test.assertNotNull
1019
import kotlin.time.Duration.Companion.hours
1120

1221
@FlowPreview
@@ -37,4 +46,87 @@ class StoreWithInMemoryCacheTests {
3746
assertEquals("result", c)
3847
assertEquals("result", d)
3948
}
49+
50+
@Test
51+
fun storeDeadlock() =
52+
testScope.runTest {
53+
repeat(1000) {
54+
val store =
55+
StoreBuilder
56+
.from(
57+
fetcher = Fetcher.of { key: Int -> "fetcher_${key}" },
58+
sourceOfTruth = SourceOfTruth.Companion.of(
59+
reader = { key ->
60+
flow<String> {
61+
emit("source_of_truth_${key}")
62+
}
63+
},
64+
writer = { key: Int, local: String ->
65+
66+
}
67+
)
68+
)
69+
.disableCache()
70+
.toMutableStoreBuilder(
71+
converter = object : Converter<String, String, String> {
72+
override fun fromNetworkToLocal(network: String): String {
73+
return network
74+
}
75+
76+
override fun fromOutputToLocal(output: String): String {
77+
return output
78+
}
79+
},
80+
)
81+
.build(
82+
updater = object : Updater<Int, String, Unit> {
83+
var callCount = -1
84+
override suspend fun post(key: Int, value: String): UpdaterResult {
85+
callCount += 1
86+
if (callCount % 2 == 0) {
87+
throw IllegalArgumentException(key.toString() + "value:$value")
88+
} else {
89+
return UpdaterResult.Success.Untyped("")
90+
}
91+
}
92+
93+
override val onCompletion: OnUpdaterCompletion<Unit>?
94+
get() = null
95+
96+
}
97+
)
98+
99+
val jobs = mutableListOf<Job>()
100+
jobs.add(
101+
store.stream<Nothing>(StoreReadRequest.cached(1, refresh = true))
102+
.mapNotNull { it.dataOrNull() }
103+
.launchIn(CoroutineScope(Dispatchers.Default))
104+
)
105+
val job1 = store.stream<Nothing>(StoreReadRequest.cached(0, refresh = true))
106+
.mapNotNull { it.dataOrNull() }
107+
.launchIn(CoroutineScope(Dispatchers.Default))
108+
jobs.add(
109+
store.stream<Nothing>(StoreReadRequest.cached(2, refresh = true))
110+
.mapNotNull { it.dataOrNull() }
111+
.launchIn(CoroutineScope(Dispatchers.Default)))
112+
jobs.add(
113+
store.stream<Nothing>(StoreReadRequest.cached(3, refresh = true))
114+
.mapNotNull { it.dataOrNull() }
115+
.launchIn(CoroutineScope(Dispatchers.Default)))
116+
job1.cancel()
117+
assertEquals(
118+
expected = "source_of_truth_0",
119+
actual = store.stream<Nothing>(StoreReadRequest.cached(0, refresh = true))
120+
.mapNotNull { it.dataOrNull() }.first()
121+
)
122+
jobs.forEach {
123+
it.cancel()
124+
assertEquals(
125+
expected = "source_of_truth_0",
126+
actual = store.stream<Nothing>(StoreReadRequest.cached(0, refresh = true))
127+
.mapNotNull { it.dataOrNull() }.first()
128+
)
129+
}
130+
}
131+
}
40132
}

0 commit comments

Comments
 (0)