38

我想知道如何向 a 发送/发送项目Kotlin.Flow,所以我的用例是:

在 consumer/ViewModel/Presenter 中,我可以使用以下功能订阅collect

fun observe() {
 coroutineScope.launch {
    // 1. Send event
    reopsitory.observe().collect {
      println(it)
    }
  }
}

但问题在于Repository,使用 RxJava,我们可以使用Behaviorsubject将其公开为 anObservable/Flowable并发出如下新项目:

behaviourSubject.onNext(true)

但是每当我建立一个新流程时:

flow {

}

我只能收集。如何将值发送到流?

4

3 回答 3

48

如果您想获得订阅/收藏的最新值,您应该使用ConflatedBroadcastChannel

private val channel = ConflatedBroadcastChannel<Boolean>()

这将复制BehaviourSubject,以将通道公开为流:

// Repository
fun observe() {
  return channel.asFlow()
}

现在将事件/值发送到暴露的Flow简单发送到此通道。

// Repository
fun someLogicalOp() {
  channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}

安慰:

错误的

如果您希望仅在开始收集接收值,则应使用 aBroadcastChannel代替。

说清楚:

表现得像一个 Rx 的PublishedSubject

private val channel = BroadcastChannel<Boolean>(1)

fun broadcastChannelTest() {
  // 1. Send event
  channel.send(true)

  // 2. Start collecting
  channel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  channel.send(false)
}

错误的

false在第一个事件之前 collect { }发送时打印。


表现得像一个 Rx 的BehaviourSubject

private val confChannel = ConflatedBroadcastChannel<Boolean>()

fun conflatedBroadcastChannelTest() {
  // 1. Send event
  confChannel.send(true)

  // 2. Start collecting
  confChannel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  confChannel.send(false)
}

真的

错误的

两个事件都被打印出来,你总是得到最新的值(如果存在的话)。

另外,想提一下 Kotlin 的团队开发DataFlow(名称待定):

这似乎更适合这个用例(因为它将是一个冷流)。

于 2019-08-04T09:12:57.433 回答
18

查看MutableStateFlow文档,因为它ConflatedBroadcastChannel很快就会被弃用。

为了更好地了解上下文,请查看Github 上 Kotlin 存储库上关于原始问题的整个讨论

于 2020-06-22T02:22:19.673 回答
9

更新

Kotlin Coroutines1.4.0现在可用于MutableSharedFlow,它取代了对Channel. MutableSharedFlowcleanup 也是内置的,所以你不需要手动打开和关闭它,不像Channel. MutableSharedFlow如果您需要类似主题的 api,请使用Flow

原始答案

由于您的问题有android标签,我将添加一个 Android 实现,使您可以轻松创建一个BehaviorSubject或一个PublishSubject来处理其自己的生命周期。

这在 Android 中是相关的,因为您不想忘记关闭通道并泄漏内存。此实现通过将其与 Fragment/Activity 的创建和销毁联系起来,避免了显式“处理”反应流的需要。如同LiveData

interface EventReceiver<Message> {
    val eventFlow: Flow<Message>
}

interface EventSender<Message> {
    fun postEvent(message: Message)
    val initialMessage: Message?
}

class LifecycleEventSender<Message>(
    lifecycle: Lifecycle,
    private val coroutineScope: CoroutineScope,
    private val channel: BroadcastChannel<Message>,
    override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {

    init {
        lifecycle.addObserver(this)
    }

    override fun postEvent(message: Message) {
        if (!channel.isClosedForSend) {
            coroutineScope.launch { channel.send(message) }
        } else {
            Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
        }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
    fun create() {
        channel.openSubscription()
        initialMessage?.let { postEvent(it) }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun destroy() {
        channel.close()
    }
}

class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
    EventReceiver<Message> {
    override val eventFlow: Flow<Message> = channel.asFlow()
}

abstract class EventRelay<Message>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    channel: BroadcastChannel<Message>,
    initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
    EventSender<Message> by LifecycleEventSender<Message>(
        lifecycle,
        coroutineScope,
        channel,
        initialMessage
    )

通过使用Lifecycle来自 Android 的库,我现在可以创建一个BehaviorSubject在活动/片段被销毁后自行清理的

class BehaviorSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    ConflatedBroadcastChannel(),
    initialMessage
)

或者我可以PublishSubject使用缓冲创建一个BroadcastChannel

class PublishSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    BroadcastChannel(Channel.BUFFERED),
    initialMessage
)

现在我可以做这样的事情

class MyActivity: Activity() {

    val behaviorSubject = BehaviorSubject(
        this@MyActivity.lifecycle,
        this@MyActivity.lifecycleScope
    )

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        if (savedInstanceState == null) { 

            behaviorSubject.eventFlow
                .onEach { stringEvent ->
                    Log.d("BehaviorSubjectFlow", stringEvent)
                    // "BehaviorSubjectFlow: Initial Message"
                    // "BehaviorSubjectFlow: Next Message"
                }
                .flowOn(Dispatchers.Main)
                .launchIn(this@MyActivity.lifecycleScope)

        }
    }

    override fun onResume() {
        super.onResume()

        behaviorSubject.postEvent("Next Message")
    }
}
于 2020-05-13T21:18:06.950 回答