这是使用广播通道的事件总线实现。(不允许使用 RxJava2 :( )
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.*
import kotlin.coroutines.CoroutineContext
class EventBus(override val coroutineContext: CoroutineContext
= Dispatchers.Default) :CoroutineScope {
@ExperimentalCoroutinesApi
private val channel = BroadcastChannel<Any>(1)
@ExperimentalCoroutinesApi
suspend fun send(event: Any) {
channel.send(event)
}
@ExperimentalCoroutinesApi
fun subscribe(): ReceiveChannel<Any> =
channel.openSubscription()
@ExperimentalCoroutinesApi
inline fun <reified T> subscribeToEvent() =
subscribe().let {
produce<T>(coroutineContext) {
for (t in it){
if(t is T)
send(t as T)
}
}
}
}
这是我的测试代码。
@Test
fun testEventBus(){
val bus = EventBus()
var i = 1;
var isFinish = false
CoroutineScope(Dispatchers.IO).launch{
println("launching_subsc")
bus.subscribe().consumeEach {
println("received $it")
assert(it == i++)
isFinish = (it == 5)
}
withContext(bus.coroutineContext){
}
}
bus.launch {
delay(500)
for (j in 1..5) {
println("sending $j")
bus.send(j)
sleep(500)
}
}
while (!isFinish)
sleep(50)
}
该测试效果很好,但我想删除延迟(500),但希望测试能够正常工作。如果我现在删除延迟(500),输出是
launching_subsc
sending 1
sending 2
received 2
...
或者
sending 1
launching_subsc
sending 2
received 2
...
我的实际需求是,在实际项目场景中,我希望在订阅后发布数据,同样,如果没有订阅者,则需要删除事件。因此,如果在发布调用之前有订阅调用(与调度程序无关),订阅者必须接收所有事件。
我尝试使用相同的范围/调度程序,但没有任何效果。