1

I am using Reactor and am am creating a flux to which I am publishing some events. My issue is that the subscribers that I create with filters fail after a while unless I add a non filter subscriber on the flux.

import reactor.core.publisher.EmitterProcessor

class PublishSubscribe {

companion object {
    @JvmStatic
    fun main(args: Array<String>) {
        val publisher = EmitterProcessor.create<String>().connect()
        writeAndGet(publisher)
        writeAndGet(publisher)
        writeAndGet(publisher)

    }

    fun writeAndGet(publisher: EmitterProcessor<String>) {
        val result = publisher
                .filter { true }
                .takeUntil { it == "end" }
                .collectList()
                .subscribe()

        val result2 = publisher
                .filter { true }
                .takeUntil { it == "end" }
                .collectList()
                .subscribe()


        Thread.sleep(1000)

        publisher.onNext("unu")
        publisher.onNext("end")

        try {

            println("X=" + result.blockMillis(3000))
            println("Y=" + result2.blockMillis(3000))

        } catch (e: Exception) {
            e.printStackTrace()
        }
        println(result.isTerminated)
        println(result2.isTerminated)
        println("---")

    }

}

}

The code works fine if I an extra subscriber.

...
val publisher = EmitterProcessor.create<String>().connect()
publisher.subscribe()  //this solves the issue
writeAndGet(publisher)
...

Any ideas about what I am doing wrong?

Best regards

4

1 回答 1

1

EmitterProcessor.create()创建autoCancel标志设置为的处理器true。这意味着一旦所有订阅者取消订阅,它就会自动取消。

使您的订阅者退订的不是filter运营takeUntil商。

添加额外的永久订阅者可防止处理器自动取消,但这似乎不是一个好的解决方案。

为了使您的测试用例正常工作,您必须使用EmitterProcessor.create(false). 这设置autoCancelfalse,因此您可以一次又一次地重新订阅。

于 2017-01-09T13:57:24.090 回答