我有一个Observable<List<Event>>
并且我希望这个 Observable 被多个订阅者共享。每个订阅者将过滤每个事件并对其进行处理。
Observable<List<Event>>
已经以这种方式创建:
@Override
public List<Event> findNewEvents() {
List<Event> results = new ArrayList<>();
while(! fetchedEvents.isEmpty()) {
results.add(fetchedEvents.poll());
}
return results;
}
@Override
public Observable<List<Event>> findNewObservableEvents() {
return Observable.just(findNewEvents());
}
这是代码:
Observable<List<Event>> newEvents = reader.findNewObservableEvents();
Disposable riskApproveRiskEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(risk::isForRiskApproval)
.subscribe(risk::approveRisk);
Disposable fundingCheckFundabilityEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(funding::isForFundingFundabilityCheck)
.subscribe(funding::checkFundability);
Disposable fundingFundEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(funding::isForFundingFund)
.subscribe(funding::fund);
我努力了 :
newEvents.share()
还有newEvents.publish()
。
尝试时:newEvents.create()
我需要提供一个ObservableOnSubscribe
对象,但我不明白如何获取它。
诀窍是什么?