3

我刚开始使用协程/流(以及一般的 kotlin),我正在努力将回调流转换为共享流。

我将下面的简单示例放在一起只是为了展示我尝试过的内容,但没有成功。我的代码更复杂,但我相信这个例子反映了我想要实现的问题。

fun main() = runBlocking {

    getMySharedFlow().collect{
        println("collector 1 value: $it")
    }

    getMySharedFlow().collect{
        println("collector 2 value: $it")
    }

}

val sharedFlow = MutableSharedFlow<Int>()

suspend fun getMySharedFlow(): SharedFlow<Int> {
    println("inside sharedflow")
    getMyCallbackFlow().collect{
        println("emitting to sharedflow value: $it")
        sharedFlow.emit(it)
    }
    return sharedFlow
}

fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: ()->Unit) {
    println("fetching something...")
    myCallBack()
}

这个想法是fetchSomethingContinuously只调用一次,与 sharedFlow 的收集器数量无关。但正如您从输出中看到的那样,收集器永远不会获取值:

inside sharedflow
inside callbackflow producer
fetching something...
fetched something
emitting to sharedflow value: 1
emitting to sharedflow value: 2
emitting to sharedflow value: 3

我查看了shareIn运算符,但不确定如何准确使用它。

我怎么能做到这样的事情?任何提示将不胜感激。

4

1 回答 1

4

因此,您在这里缺少的是调用collect,emit()并且awaitClose()正在暂停并且仅在完成相应操作后才会完成的事实。

该函数getMySharedFlow()甚至没有返回以对其应用 collect 因为它正在收集callbackFlow,callbackFlow被卡在调用上,而调用awaitClose()又没有完成,因为fetchSomethingContinuously没有用close()函数结束回调。

您需要意识到您必须在这里创建一些明确的并行性,而不是生活在暂停的世界中。您的示例代码的工作变体将是:

val sharedFlow = MutableSharedFlow<Int>()

suspend fun startSharedFlow() {
    println("Starting Shared Flow callback collection")

    getMyCallbackFlow().collect {
        println("emitting to sharedflow value: $it")
        sharedFlow.emit(it)
    }
}

fun main() = runBlocking<Unit> {

    launch {
        startSharedFlow()
    }

    launch {
        sharedFlow.collect {
            println("collector 1 value: $it")
        }
    }

    launch {
        sharedFlow.collect {
            println("collector 2 value: $it")
        }
    }

}


fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
        //close() - call close here if you need to signal that this callback is done sending elements
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: () -> Unit) {
    println("fetching something...")
    myCallBack()
}

调用以launch允许异步执行发出和收集值。

此外,关于shareIn()运营商,它只是从指定的上游创建一个 SharedFlow,就像你想做的那样。此外,您可以使用参数指定何时开始共享started。更多关于这个here

这是您在示例中使用它的方式:

fun main() = runBlocking<Unit> {

    val sharedFlow = getMyCallbackFlow().shareIn(this, started = SharingStarted.Eagerly)

    launch {
        sharedFlow.collect {
            println("collector 1 value: $it")
        }
    }

    launch {
        sharedFlow.collect {
            println("collector 2 value: $it")
        }
    }

}


fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
        //close() - call close here if you need to signal that this callback is done sending elements
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: () -> Unit) {
    println("fetching something...")
    myCallBack()
}
于 2020-11-12T15:08:52.530 回答