0

我有一个Observable<List<Event>>并且我希望这个 Observable 被多个订阅者共享。每个订阅者将过滤每个事件并对其进行处理。

Observable<List<Event>>已经以这种方式创建:

    @Override
    public List<Event> findNewEvents() {
        List<Event> results = new ArrayList<>();
        while(! fetchedEvents.isEmpty()) {
            results.add(fetchedEvents.poll());
        }
        return results;
    }

    @Override
    public Observable<List<Event>> findNewObservableEvents() {
        return Observable.just(findNewEvents());
    }

这是代码:

            Observable<List<Event>> newEvents = reader.findNewObservableEvents();

            Disposable riskApproveRiskEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
                    .flatMap(Observable::just)
                    .filter(risk::isForRiskApproval)
                    .subscribe(risk::approveRisk);

            Disposable fundingCheckFundabilityEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
                    .flatMap(Observable::just)
                    .filter(funding::isForFundingFundabilityCheck)
                    .subscribe(funding::checkFundability);

            Disposable fundingFundEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
                    .flatMap(Observable::just)
                    .filter(funding::isForFundingFund)
                    .subscribe(funding::fund);

我努力了 :

newEvents.share()还有newEvents.publish()

尝试时:newEvents.create()我需要提供一个ObservableOnSubscribe对象,但我不明白如何获取它。

诀窍是什么?

4

1 回答 1

2

如果您不想findNewObservableEvents多次消费,请使用publish,一旦订阅者订阅,请connect调用ConnectableObservable

ConnectableObservable<List<Event>> newEvents = reader.findNewObservableEvents().publish();

Disposable riskApproveRiskEventsDisposable = 
         newEvents.flatMapIterable(riskEvents -> riskEvents)
                .flatMap(Observable::just)
                .filter(risk::isForRiskApproval)
                .subscribe(risk::approveRisk);

 Disposable fundingCheckFundabilityEventsDisposable = 
         newEvents.flatMapIterable(riskEvents -> riskEvents)
                .flatMap(Observable::just)
                .filter(funding::isForFundingFundabilityCheck)
                .subscribe(funding::checkFundability);

Disposable fundingFundEventsDisposable = 
         newEvents.flatMapIterable(riskEvents -> riskEvents)
                .flatMap(Observable::just)
                .filter(funding::isForFundingFund)
                .subscribe(funding::fund);

Disposable connection = newEvents.connect();
于 2020-11-26T14:15:28.520 回答