我开始学习响应式流是因为我对使用 RxJava 替代更传统的事件总线的新趋势感到好奇。 这篇博客文章是对如何完成此操作的典型描述。如果我理解正确的话,RxJava 1.x 并不是严格意义上的 Reactive Streams 的实现,但它非常相似。2.0 版包含一些兼容的类,或者至少通过了 TCK,因此此代码的更新版本可能看起来有些不同。
public class UserLocationModel {
private PublishSubject<LatLng> subject = PublishSubject.create();
public void setLocation(LatLng latLng) {
subject.onNext(latLng);
}
public Observable<LatLng> getUserLocation() {
return subject;
}
}
在 Reactive Streams 术语中,我认为subject
是 a Processor
,它既是 aPublisher
又是 a Subscriber
。
问题是调用未订阅任何内容onNext
的 aSubscriber
似乎违反了 Reactive Streams 规范,尤其是规则 1.9。
这仅仅是一个实现细节吗? 我认为您通常不能依赖于与兼容的 Reactive Streams 实现一起工作,我是否正确?