3

当你有一个大的 Observable 图(即 observable 多次使用merge,groupBy等组成join)并且抛出异常时,有时很难弄清楚异常的来源。我想知道是否有可能找出在源文件中调用 Observable 运算符的位置。一个例子应该更清楚地说明这一点。

例如,给定以下内容IllegalStateException: Only one subscriber allowed!和堆栈跟踪,我想知道是否有可能找出我的源文件中调用的 line numberoperatorMerge等。operatorFilteroperatorGroupBy是否可以通过使用调试器、打印语句或其他方式以某种方式做到这一点?

java.lang.IllegalStateException: Only one subscriber allowed!
        at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS
ubscriber.java:124)
        at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS
ubscriber.java:81)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable.unsafeSubscribe(Observable.java:7531)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy.
java:251)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy.
java:236)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable.unsafeSubscribe(Observable.java:7531)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer
ge.java:215)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
85)
        at rx.internal.operators.**OperatorMerge**$MergeSubscriber.onNext(OperatorMerge.java:1
20)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.SingleDelayedProducer.emit(SingleDelayedProducer.java:80)

        at rx.internal.operators.SingleDelayedProducer.set(SingleDelayedProducer.java:63)
        at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservab
leList.java:93)
        at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:44)
        at rx.internal.operators.**OperatorFilter**$1.onCompleted(OperatorFilter.java:42)
        at rx.internal.operators.OperatorTakeUntilPredicate$ParentSubscriber.onNext(Operat
orTakeUntilPredicate.java:54)
        at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
        at rx.internal.operators.**OperatorGroupBy**$GroupBySubscriber$2$2.onNext(OperatorGrou
pBy.java:286)
        at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1
81)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB
y.java:340)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.
java:226)
        at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124
)
        at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis
h.java:560)
        at rx.internal.operators.**OperatorPublish**$PublishSubscriber.onNext(OperatorPublish.
java:258)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscr
ibeFromIterable.java:98)
        at rx.Subscriber.setProducer(Subscriber.java:177)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java
:50)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java
:33)
        at rx.Observable.unsafeSubscribe(Observable.java:7531)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer
ge.java:215)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
85)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
20)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124
)
        at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis
h.java:560)
        at rx.internal.operators.OperatorPublish$PublishSubscriber.onNext(OperatorPublish.
java:258)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:112)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2$2.onNext(OperatorGrou
pBy.java:286)
        at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1
81)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB
y.java:340)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.
java:226)
        at rx.lang.scala.Subscriber$$anon$3.onNext(Subscriber.scala:198)
...

之所以出现这个问题,主要是因为 Observable 的全部意义在于在执行时将 a) 代码与 b) 解耦。但是对于调试程序来说,这是一场噩梦。因此,重复我上面的问题,我想知道是否可以将每个组合追溯到源代码中的原始行。

4

2 回答 2

1

有一些关于额外调试信息的实验,但整个库的运行速度要慢 100 倍并被放弃。

问题可能出在您的 groupBy 之后的 flatMap 中,您在其中订阅了 GroupedObservable 并将其交还给现在无法订阅的 flatMap: GroupedObservable 只能使用一次。您需要使用publish()orreplay()运算符之一并相应地调整功能逻辑。

于 2015-11-03T12:14:07.007 回答
0

一年过去了,我仍然在为此苦苦挣扎,仍然没有找到追踪处决的好方法。我发现我依靠将打印语句放入代码中来查看发生了什么。只有这样我才能得到正在发生的事情的踪迹。

我发现唯一有用的是为此创建一个模式,所以我不必doOnNext(x => println(x))每次都写来显示发生了什么:

  implicit class ObservableTrace[T](o : rx.lang.scala.Observable[T]) {
    import java.time.LocalTime
    def trace(name : String) : rx.lang.scala.Observable[T] = {
      def print(s: String) = println(s"${LocalTime.now} : $name : $s")
      (o doOnNext (x => print("next:" + x))
        doOnSubscribe print("subscribed")
        doOnCompleted print("completed")
        doOnError (e => print("error: " + e))
        doOnUnsubscribe print("unsubscribed")
        )
    }

这使得编辑代码变得快速——只需在你的几个 observable 上编写myobservable.trace("My Observable"),这样就可以很容易地看到不同的生命周期事件何时发生。

于 2016-11-22T12:52:58.597 回答