更新:
Kotlin Coroutines1.4.0
现在可用于MutableSharedFlow
,它取代了对Channel
. MutableSharedFlow
cleanup 也是内置的,所以你不需要手动打开和关闭它,不像Channel
. MutableSharedFlow
如果您需要类似主题的 api,请使用Flow
原始答案
由于您的问题有android
标签,我将添加一个 Android 实现,使您可以轻松创建一个BehaviorSubject
或一个PublishSubject
来处理其自己的生命周期。
这在 Android 中是相关的,因为您不想忘记关闭通道并泄漏内存。此实现通过将其与 Fragment/Activity 的创建和销毁联系起来,避免了显式“处理”反应流的需要。如同LiveData
interface EventReceiver<Message> {
val eventFlow: Flow<Message>
}
interface EventSender<Message> {
fun postEvent(message: Message)
val initialMessage: Message?
}
class LifecycleEventSender<Message>(
lifecycle: Lifecycle,
private val coroutineScope: CoroutineScope,
private val channel: BroadcastChannel<Message>,
override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {
init {
lifecycle.addObserver(this)
}
override fun postEvent(message: Message) {
if (!channel.isClosedForSend) {
coroutineScope.launch { channel.send(message) }
} else {
Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
fun create() {
channel.openSubscription()
initialMessage?.let { postEvent(it) }
}
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun destroy() {
channel.close()
}
}
class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
EventReceiver<Message> {
override val eventFlow: Flow<Message> = channel.asFlow()
}
abstract class EventRelay<Message>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
channel: BroadcastChannel<Message>,
initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
EventSender<Message> by LifecycleEventSender<Message>(
lifecycle,
coroutineScope,
channel,
initialMessage
)
通过使用Lifecycle
来自 Android 的库,我现在可以创建一个BehaviorSubject
在活动/片段被销毁后自行清理的
class BehaviorSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
ConflatedBroadcastChannel(),
initialMessage
)
或者我可以PublishSubject
使用缓冲创建一个BroadcastChannel
class PublishSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
BroadcastChannel(Channel.BUFFERED),
initialMessage
)
现在我可以做这样的事情
class MyActivity: Activity() {
val behaviorSubject = BehaviorSubject(
this@MyActivity.lifecycle,
this@MyActivity.lifecycleScope
)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
if (savedInstanceState == null) {
behaviorSubject.eventFlow
.onEach { stringEvent ->
Log.d("BehaviorSubjectFlow", stringEvent)
// "BehaviorSubjectFlow: Initial Message"
// "BehaviorSubjectFlow: Next Message"
}
.flowOn(Dispatchers.Main)
.launchIn(this@MyActivity.lifecycleScope)
}
}
override fun onResume() {
super.onResume()
behaviorSubject.postEvent("Next Message")
}
}