5

我有一个可供客户使用的 API,可以简化为:

public class API {
  public void sendEvent(Event e);
}

Event每当客户端调用 API(技术上通过 Binder 到Service派生类)时,实例就会进入我的系统,然后对这些 API 进行处理、过滤并分派到其他内部组件。我不关心过去的事件,只关心订阅者订阅时可用的事件。它似乎很适合 Rx 范式,我刚刚开始接触它。

我需要一个创建一次的 Observable,允许多个订阅者,并且可以提供实例Event,然后通过反应管道发送给观察者。ASubject似乎适合我想要做的事情(特别是,这个问题的答案引起了我的共鸣)。

其他 RxJava 用户推荐什么?

4

2 回答 2

11

例如,按照我的简短评论:

public class API implements OnSubscribe<Event> {
    private List<Subscriber<Event>> subscribers = new ArrayList<>();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        for (Subscriber<Event> sub : subscribers) {
            sub.onNext(event);
        }
    }
    public void call(Subscriber<Event> sub) {
        subscribers.add(sub);
    }
}

那么你可能在某处有一个实例:API api = ...

你的 Observable 是这样获得的:Observable.create(api);然后你可以用 Observable 做任何正常的事情。

未订阅的 s 的过滤Subscriber留给读者作为练习。

编辑

更多研究表明这PublishSubject应该会有所帮助:

public class API {
    private PublishSubject<Event> subject = PublishSubject.create();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        // Then publish it
        subject.onNext(event);
    }
    public Observable<Event> getObservable() {
        return subject.asObservable();
    }
}

这样,你就可以订阅这个 Observable,并且每次发送一个事件到API,它都会发布给所有的订阅者。

像这样使用:

API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));
于 2015-07-14T19:19:32.093 回答
1

试试observable.share()哪个在幕后调用.publish().refCount()。它将仅使用一个基础订阅并为您提供您指定的多个订阅行为。

于 2015-07-15T00:02:14.260 回答