From ab0736b4ddf3f725aac6dd4fd78e271304634b83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petrus=20Nguy=E1=BB=85n=20Th=C3=A1i=20H=E1=BB=8Dc?= Date: Mon, 22 Sep 2025 01:37:48 +0700 Subject: [PATCH] Add publish selector test and remove debug output --- api/FlowExt.api | 4 +- .../hoc081098/flowext/internal/SimpleLazy.kt | 3 +- .../hoc081098/flowext/publishWithSelector.kt | 80 -------------- .../flowext/PublishWithSelectorTest.kt | 101 ++++++++++++++++++ 4 files changed, 104 insertions(+), 84 deletions(-) create mode 100644 src/commonTest/kotlin/com/hoc081098/flowext/PublishWithSelectorTest.kt diff --git a/api/FlowExt.api b/api/FlowExt.api index ac2c7dd0..73d323de 100644 --- a/api/FlowExt.api +++ b/api/FlowExt.api @@ -210,9 +210,7 @@ public abstract interface annotation class com/hoc081098/flowext/PublishSelector } public final class com/hoc081098/flowext/PublishWithSelectorKt { - public static final fun main (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun main ([Ljava/lang/String;)V - public static final fun publish (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun publish (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; } public final class com/hoc081098/flowext/RaceKt { diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt b/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt index 92dcdbd3..22a351af 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt @@ -31,7 +31,8 @@ import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.internal.SynchronizedObject import kotlinx.coroutines.internal.synchronized -// TODO: Remove SynchronizedObject +// Using SynchronizedObject from kotlinx.coroutines provides a multiplatform lock primitive +// while avoiding JVM-specific synchronization APIs. @OptIn(InternalCoroutinesApi::class) internal class SimpleLazy( initializer: () -> T, diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt index 722db892..a3ab5a73 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/publishWithSelector.kt @@ -34,26 +34,16 @@ import kotlin.jvm.JvmField import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.emitAll -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.filterIsInstance -import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onCompletion -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.shareIn as kotlinXFlowShareIn -import kotlinx.coroutines.flow.take -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch @FlowExtPreview @@ -138,8 +128,6 @@ private class DefaultSelectorScope( val stateRef = AtomicRef>(SelectorScopeState.Init) override fun select(block: SelectorFunction): Flow { - println("call select with block: $block") - stateRef.loop { state -> val updated = when (state) { SelectorScopeState.Closed -> { @@ -258,20 +246,12 @@ private class DefaultSelectorScope( if (stateRef.compareAndSet(expect = state, update = updated)) { // CAS success - println( - "onCompleteSelectedFlow: completedCount = ${(updated as? SelectorScopeState.Frozen)?.completedCount}, " + - "size = ${state.blocks.size}, " + - "(index = $index)", - ) - // Once state reaches DefaultSelectorScopeState.Closed, we can clear unused lazy if (updated is SelectorScopeState.Closed) { state.selectedFlowsAndChannels.run { getOrNull()?.first?.forEach { it.clear() } clear() } - - println("onCompleteSelectedFlow: cancel the publish scope") } return @@ -331,8 +311,6 @@ private class DefaultSelectorScope( } suspend fun send(value: T) { - println("send: $value") - val state = stateRef.value as? SelectorScopeState.Frozen ?: return val channels = state .selectedFlowsAndChannels @@ -394,12 +372,10 @@ private class DefaultSelectorScope( } fun close(e: Throwable?) { - println("close: $e") transitionToClosed { it.close(e) } } fun cancel(e: CancellationException) { - println("cancel: $e") transitionToClosed { it.cancel(e) } } } @@ -435,59 +411,3 @@ public fun Flow.publish(selector: suspend SelectorScope.() -> Flow< } } } - -@OptIn(FlowExtPreview::class, ExperimentalCoroutinesApi::class) -public suspend fun main() { - flow { - println("Collect...") - delay(100) - emit(1) - delay(100) - emit(2) - delay(100) - emit(3) - delay(100) - emit("4") - }.onEach { println(">>> onEach: $it") } - .publish { - delay(100) - - merge( - select { flow -> - delay(1) - val sharedFlow = flow.shareIn() - - interval(0, 100) - .onEach { println(">>> interval: $it") } - .flatMapMerge { value -> - timer(value, 50) - .withLatestFrom(sharedFlow) - .map { it to "shared" } - }.takeUntil(sharedFlow.filter { it == 3 }) - }, - select { flow -> - flow - .filterIsInstance() - .filter { it % 2 == 0 } - .map { it to "even" } - .take(1) - }, - select { flow -> - flow - .filterIsInstance() - .filter { it % 2 != 0 } - .map { it to "odd" } - .take(1) - }, - select { flow -> - flow - .filterIsInstance() - .map { it to "string" } - .take(1) - }, - ) - } - .toList() - .also { println(it) } - .let { check(it == listOf(Pair(1, "odd"), Pair(2, "even"), Pair("4", "string"))) } -} diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/PublishWithSelectorTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/PublishWithSelectorTest.kt new file mode 100644 index 00000000..c3c45c9d --- /dev/null +++ b/src/commonTest/kotlin/com/hoc081098/flowext/PublishWithSelectorTest.kt @@ -0,0 +1,101 @@ +/* + * MIT License + * + * Copyright (c) 2021-2024 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.utils.BaseTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.filterIsInstance +import kotlinx.coroutines.flow.flatMapMerge +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.merge +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList + +@OptIn(FlowExtPreview::class, ExperimentalCoroutinesApi::class) +class PublishWithSelectorTest : BaseTest() { + @Test + fun publishDispatchesSelectedFlows() = runTest { + val result = + flow { + delay(100) + emit(1) + delay(100) + emit(2) + delay(100) + emit(3) + delay(100) + emit("4") + } + .publish { + delay(100) + + merge( + select { flow -> + delay(1) + val sharedFlow = flow.shareIn() + + interval(0L, 100L) + .flatMapMerge { value -> + timer(value, 50L) + .withLatestFrom(sharedFlow) + .map { it to "shared" } + } + .takeUntil(sharedFlow.filter { it == 3 }) + }, + select { flow -> + flow + .filterIsInstance() + .filter { it % 2 == 0 } + .map { it to "even" } + .take(1) + }, + select { flow -> + flow + .filterIsInstance() + .filter { it % 2 != 0 } + .map { it to "odd" } + .take(1) + }, + select { flow -> + flow + .filterIsInstance() + .map { it to "string" } + .take(1) + }, + ) + } + .toList() + + assertEquals( + listOf(Pair(1, "odd"), Pair(2, "even"), Pair("4", "string")), + result, + ) + } +}