2

这是使用广播通道的事件总线实现。(不允许使用 RxJava2 :( )

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.*
import kotlin.coroutines.CoroutineContext

class EventBus(override val coroutineContext: CoroutineContext
               = Dispatchers.Default) :CoroutineScope {

    @ExperimentalCoroutinesApi
    private val channel = BroadcastChannel<Any>(1)

    @ExperimentalCoroutinesApi
    suspend fun send(event: Any) {
            channel.send(event)
    }

    @ExperimentalCoroutinesApi
    fun subscribe(): ReceiveChannel<Any> =
        channel.openSubscription()

    @ExperimentalCoroutinesApi
    inline fun <reified T> subscribeToEvent() =
        subscribe().let {
            produce<T>(coroutineContext) {
                for (t in it){
                    if(t is T)
                        send(t as T)
                }
            }

        }
}

这是我的测试代码。

@Test
fun testEventBus(){
    val bus = EventBus()
    var i = 1;
    var isFinish = false
    CoroutineScope(Dispatchers.IO).launch{
        println("launching_subsc")
        bus.subscribe().consumeEach {
            println("received $it")
            assert(it == i++)
            isFinish = (it == 5)
        }
        withContext(bus.coroutineContext){

        }
    }
    bus.launch {
        delay(500)
        for (j in 1..5) {
            println("sending $j")
            bus.send(j)
            sleep(500)
        }
    }

    while (!isFinish)
        sleep(50)
}

该测试效果很好,但我想删除延迟(500),但希望测试能够正常工作。如果我现在删除延迟(500),输出是

launching_subsc
sending 1
sending 2
received 2
...

或者

sending 1
launching_subsc
sending 2
received 2 
...

我的实际需求是,在实际项目场景中,我希望在订阅后发布数据,同样,如果没有订阅者,则需要删除事件。因此,如果在发布调用之前有订阅调用(与调度程序无关),订阅者必须接收所有事件。

我尝试使用相同的范围/调度程序,但没有任何效果。

4

0 回答 0