当我将 Reactive Streams ( https://github.com/reactor/reactor-core ) 与自定义功能Publisher
结合使用时publishOn
,我总是得到一个 NPE。我的代码有什么问题?我是否Publisher
以错误的方式使用?
Flux.from(MyPublisher())
.publishOn(Schedulers.single())
.subscribe { println("<-- $it received") }
class MyPublisher : Publisher<Int> {
override fun subscribe(sub: Subscriber<in Int>) {
while (true) {
Thread.sleep(300)
sub.onNext(1)
}
}
}
例外是:
Exception in thread "main" java.lang.NullPointerException
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:212)
at org.guenhter.kotlin.hello.MyPublisher.subscribe(HelloWorld.kt:18)
at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52)
at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:96)
at reactor.core.publisher.Flux.subscribe(Flux.java:6447)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614)
at reactor.core.publisher.Flux.subscribe(Flux.java:6440)
at reactor.core.publisher.Flux.subscribe(Flux.java:6404)
at reactor.core.publisher.Flux.subscribe(Flux.java:6347)
at org.guenhter.kotlin.hello.HelloWorldKt.main(HelloWorld.kt:11)