3

我开始学习响应式流是因为我对使用 RxJava 替代更传统的事件总线的新趋势感到好奇。 这篇博客文章是对如何完成此操作的典型描述。如果我理解正确的话,RxJava 1.x 并不是严格意义上的 Reactive Streams 的实现,但它非常相似。2.0 版包含一些兼容的类,或者至少通过了 TCK,因此此代码的更新版本可能看起来有些不同。

public class UserLocationModel {

  private PublishSubject<LatLng> subject = PublishSubject.create();

  public void setLocation(LatLng latLng) {
    subject.onNext(latLng);
  }

  public Observable<LatLng> getUserLocation() {
    return subject;
  }
}

在 Reactive Streams 术语中,我认为subject是 a Processor,它既是 aPublisher又是 a Subscriber

问题是调用未订阅任何内容onNext的 aSubscriber似乎违反了 Reactive Streams 规范,尤其是规则 1.9

这仅仅是一个实现细节吗? 我认为您通常不能依赖于与兼容的 Reactive Streams 实现一起工作,我是否正确?

4

1 回答 1

7

Subject标准 RxJava 2 的s 和Processors 是宽松的,因此您不必onSubscribe在调用其他方法之前调用它们。这部分是由于传统性,因为 1.x 主题没有 a onSubscribe,部分是由于 RxJava 2 处理器不会选择协调Subscriber边和Publisher边之间的请求,因此没有使用 a Subscription

如果您订阅 RxJavaProcessor任何符合 RS的,它们似乎会尽可能多Publisher地请求和中继信号。Long.MAX_VALUE如果您订阅了一个符合SubscriberRxJavaProcessor的 RS,它们将尊重这些Subscribers 的背压并且永远不会溢出它们,但是,缺少请求可能会导致个体MissingBackpressureException被发出并被Subscriber“扔掉”。扩展库Publisher中有一个自定义可以协调请求。

我认为您通常不能依赖于使用兼容的 Reactive Streams 实现,我是否正确。

规范中没有任何内容,因此没有在 TCK 中测试Processor如果没有收到onSubscribe调用但它需要它会发生什么,因此,我认为这已成为一个实现细节。

这里有两个更大的问题:

  1. 发明主题是为了将命令式世界与反应式世界联系起来,并在 GUI 案例和非背压案例中作为事件的多播器很好地工作。在反应式-反应式多播中,它们是更好、更直接的替代方案,例如publish(Function).
  2. 在事件总线中思考是一种倒退,因为您通过在单个“轨道”上铲入和排出事件来创建单个阻塞点。相比之下,响应式设计更倾向于单个且通常独立的流,其中每个流可以根据需要在线程之间跳转,并且可能直到最后一刻才避开主线程。
于 2017-05-04T08:41:27.950 回答