2

我有以下场景,我需要在 RxJava 中对其进行转换:

  1. 从本地数据库获取 30 个项目
  2. 使用这 30 个条目调用 api(一个请求)
  3. 从本地数据库中删除项目(这需要访问项目列表)
  4. 循环(?)我想重复 1-3 直到 db 为空

到目前为止我所做的看起来像这样:

fetchAllFromDbObservable()
    .flatMap( (string) -> {
        return Model
    })
    .buffer(30)
    .publish( (List<Model>, Response) -> {
        return callApiObservable(List<Model>);  
    })
    .publish( (Response) -> {
        // how do I get access here to List<Model> ?
        ArrayList<Long> ids = getIdsFromList(List<Model>)
        return removeFromDbObservable(ids);  
    })

问题:

  1. 发布正确的调用方式还是我应该在何时/和/然后使用?
  2. 如何将模型列表从一个可观察对象传递到另一个对象?

谢谢

4

1 回答 1

0

您的要求的可能实现如下:

import rx.subjects.PublishSubject;
import java.util.List;
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toList;

public class Looper {

    // initialize db with 100 items
    private final List<Integer> db = IntStream.rangeClosed(1, 100)
            .boxed().collect(toList());

    private List<Integer> read30ItemsFromDb() {
        return db.stream().limit(30).collect(toList());
    }

    private void makeApiCall(List<Integer> items) {
        System.out.println(items);
    }

    private void removeFromDb(List<Integer> itemsToBeRemoved) {
        db.removeAll(itemsToBeRemoved);
    }

    private enum Command { NEXT }

    public static void main(String[] args) {
        Looper looper = new Looper();

        PublishSubject<Command> itemsProducer = PublishSubject.create();
        PublishSubject<List<Integer>> itemsConsumer = PublishSubject.create();

        itemsProducer.subscribe(command -> {
            switch (command) {
                case NEXT:
                    List<Integer> items = looper.read30ItemsFromDb();
                    if (!items.isEmpty()) itemsConsumer.onNext(items);
                    else itemsConsumer.onCompleted();
                default:
                    itemsConsumer.onCompleted();
            }
        });

        itemsConsumer
                .doOnCompleted(itemsProducer::onCompleted)
                .subscribe(items -> {
                    looper.makeApiCall(items);
                    looper.removeFromDb(items);
                    itemsProducer.onNext(Command.NEXT);
                });

        // start the loop
        itemsProducer.onNext(Command.NEXT);
    }
}
于 2016-01-14T01:58:21.823 回答