val observable = Observable(...)
.publish
val subscription = observable.connect
observable.doOnsubscribe(() => doSomething)
.subscribe()
doSomething
永远不会被调用。RxJava 的完全相同的代码可以正常工作。无论出于何种原因,它似乎从未传播到底层的 Java Observable
更新:所以我的解决方法是
observable.asJavaObservable
.doOnSubscribe(new Action0 {
override def call(): Unit = {
doSomething
}
}}.asScala
.subscribe()