diff --git a/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/KottageEventFlow.kt b/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/KottageEventFlow.kt index ab103eb3..6514118e 100644 --- a/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/KottageEventFlow.kt +++ b/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/KottageEventFlow.kt @@ -36,6 +36,7 @@ class KottageEventFlow internal constructor( ): Flow { private var lastEventTime: Long? = initialTime private val subscribing = Mutex() + private val initializing = Mutex(locked = true) @OptIn(FlowPreview::class) private val flow: Flow = flow { @@ -116,6 +117,7 @@ class KottageEventFlow internal constructor( bridgeFlow.emit(KottageEvent.from(it)) } } + initializing.unlock() } }) }.flattenConcat() @@ -127,6 +129,18 @@ class KottageEventFlow internal constructor( flow.collect(collector) } + /** + * ensure and wait until subscription starts. + * + * This method is mainly for debug purpose. + */ + suspend fun ensureSubscribed() { + if (initializing.isLocked) { + initializing.lock() + initializing.unlock() + } + } + internal sealed interface EventFlowType { object All : EventFlowType data class Item( diff --git a/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/internal/KottageDatabaseManager.kt b/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/internal/KottageDatabaseManager.kt index 488db6dd..b96db40c 100644 --- a/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/internal/KottageDatabaseManager.kt +++ b/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/internal/KottageDatabaseManager.kt @@ -151,8 +151,8 @@ internal class KottageDatabaseManager( suspend fun onEventCreated() { val operator = operator.await() val limit = 100L - _eventFlow.updateWithLock { lastEvent, emit -> - var lastEventTime = lastEvent.time + _eventFlow.updateWithLock { latestEventTime, emit -> + var lastEventTime = latestEventTime var remains = true while (remains) { val events = databaseConnection.transactionWithResult { @@ -170,6 +170,7 @@ internal class KottageDatabaseManager( emit(it) } } + lastEventTime } } diff --git a/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/internal/model/ItemEventFlow.kt b/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/internal/model/ItemEventFlow.kt index 67f9483b..b005fd6a 100644 --- a/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/internal/model/ItemEventFlow.kt +++ b/kottage/src/commonMain/kotlin/io/github/irgaly/kottage/internal/model/ItemEventFlow.kt @@ -2,7 +2,9 @@ package io.github.irgaly.kottage.internal.model import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -14,28 +16,21 @@ internal class ItemEventFlow(initialTime: Long, scope: CoroutineScope) { // イベント送信漏れが発生しないように SUSPEND onBufferOverflow = BufferOverflow.SUSPEND ) - private val lastEvent = source.map { - Event(it.createdAt, it) - }.stateIn(scope, SharingStarted.Eagerly, Event(initialTime, null)) + private var lastEventTime: Long = initialTime init { flow = source.asSharedFlow() } - suspend fun updateWithLock(block: suspend (latestEvent: Event, emit: suspend (event: ItemEvent) -> Unit) -> Unit) { + suspend fun updateWithLock(block: suspend (latestEventTime: Long, emit: suspend (event: ItemEvent) -> Unit) -> Long) { mutex.withLock { - block(lastEvent.value, source::emit) + lastEventTime = block(lastEventTime, source::emit) } } - suspend fun withLock(block: suspend (latestEvent: Event) -> Unit) { + suspend fun withLock(block: suspend (latestEventTime: Long) -> Unit) { mutex.withLock { - block(lastEvent.value) + block(lastEventTime) } } - - data class Event( - val time: Long, - val event: ItemEvent? - ) } diff --git a/kottage/src/commonTest/kotlin/io/github/irgaly/kottage/KottageEventTest.kt b/kottage/src/commonTest/kotlin/io/github/irgaly/kottage/KottageEventTest.kt index 6f7af04d..cf512e48 100644 --- a/kottage/src/commonTest/kotlin/io/github/irgaly/kottage/KottageEventTest.kt +++ b/kottage/src/commonTest/kotlin/io/github/irgaly/kottage/KottageEventTest.kt @@ -11,10 +11,12 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import kotlin.time.Duration.Companion.days +import kotlin.time.ExperimentalTime /** * Event 関連のテスト */ +@OptIn(ExperimentalTime::class) class KottageEventTest : KottageSpec("kottage_event", body = { describe("Kottage Event Test") { context("Storage 操作") { @@ -71,65 +73,71 @@ class KottageEventTest : KottageSpec("kottage_event", body = { val (kottage, calendar) = kottage("event_list") val cache = kottage.cache("event_list") val list = cache.list("list_event_list") - cache.eventFlow().test { - val entry1 = list.add("key1", "value1") - awaitItem() should { - it.itemKey shouldBe entry1.itemKey - it.listType shouldBe null - it.eventType shouldBe KottageEventType.Create - } - cache.put("key2", "value2") - awaitItem() should { - it.itemKey shouldBe "key2" - it.listType shouldBe null - it.eventType shouldBe KottageEventType.Create + cache.eventFlow().let { flow -> + flow.test { + flow.ensureSubscribed() + val entry1 = list.add("key1", "value1") + awaitItem() should { + it.itemKey shouldBe entry1.itemKey + it.listType shouldBe null + it.eventType shouldBe KottageEventType.Create + } + cache.put("key2", "value2") + awaitItem() should { + it.itemKey shouldBe "key2" + it.listType shouldBe null + it.eventType shouldBe KottageEventType.Create + } } } - list.eventFlow().test { - val entry3 = list.add("key3", "value3") - awaitItem().let { - it.listPositionId shouldNotBe null - it.listType shouldBe "list_event_list" - it.eventType shouldBe KottageEventType.Create - } - cache.put("key3", "value3-2") - awaitItem().let { - it.listPositionId shouldNotBe null - it.listType shouldBe "list_event_list" - it.eventType shouldBe KottageEventType.Update - } - list.update(entry3.positionId, "key4", "value4") - awaitItem().let { - it.listPositionId shouldNotBe null - it.listType shouldBe "list_event_list" - it.itemKey shouldBe "key3" - it.eventType shouldBe KottageEventType.Delete - } - awaitItem().let { - it.listPositionId shouldNotBe null - it.listType shouldBe "list_event_list" - it.itemKey shouldBe "key4" - it.eventType shouldBe KottageEventType.Create - } - cache.put("key5", "value5") - list.updateKey(entry3.positionId, "key5") - awaitItem().let { - it.listPositionId shouldNotBe null - it.listType shouldBe "list_event_list" - it.itemKey shouldBe "key4" - it.eventType shouldBe KottageEventType.Delete - } - awaitItem().let { - it.listPositionId shouldNotBe null - it.listType shouldBe "list_event_list" - it.itemKey shouldBe "key5" - it.eventType shouldBe KottageEventType.Create - } - list.remove(entry3.positionId) - awaitItem().let { - it.listPositionId shouldNotBe null - it.listType shouldBe "list_event_list" - it.eventType shouldBe KottageEventType.Delete + list.eventFlow().let { flow -> + flow.test { + flow.ensureSubscribed() + val entry3 = list.add("key3", "value3") + awaitItem().let { + it.listPositionId shouldNotBe null + it.listType shouldBe "list_event_list" + it.eventType shouldBe KottageEventType.Create + } + cache.put("key3", "value3-2") + awaitItem().let { + it.listPositionId shouldNotBe null + it.listType shouldBe "list_event_list" + it.eventType shouldBe KottageEventType.Update + } + list.update(entry3.positionId, "key4", "value4") + awaitItem().let { + it.listPositionId shouldNotBe null + it.listType shouldBe "list_event_list" + it.itemKey shouldBe "key3" + it.eventType shouldBe KottageEventType.Delete + } + awaitItem().let { + it.listPositionId shouldNotBe null + it.listType shouldBe "list_event_list" + it.itemKey shouldBe "key4" + it.eventType shouldBe KottageEventType.Create + } + cache.put("key5", "value5") + list.updateKey(entry3.positionId, "key5") + awaitItem().let { + it.listPositionId shouldNotBe null + it.listType shouldBe "list_event_list" + it.itemKey shouldBe "key4" + it.eventType shouldBe KottageEventType.Delete + } + awaitItem().let { + it.listPositionId shouldNotBe null + it.listType shouldBe "list_event_list" + it.itemKey shouldBe "key5" + it.eventType shouldBe KottageEventType.Create + } + list.remove(entry3.positionId) + awaitItem().let { + it.listPositionId shouldNotBe null + it.listType shouldBe "list_event_list" + it.eventType shouldBe KottageEventType.Delete + } } } }