0

我的应用程序中有一个由改造服务支持的数据层。(到目前为止,只有网络上的持久性。当我进一步开发时,我将添加离线优先本地存储)

Observable<List<Item>>当我调用服务器时,Retrofit 会返回给我。这很好用。在我的订阅者中,我在订阅时会收到列表,然后可以使用Items.

我遇到的问题是:如果列表被修改(通过某种外部机制),我如何使可观察的重新查询改造服务并发出新的项目列表。我知道数据已过时,但我不确定如何启动重新查询。

这是我的精简版DataManager

class DataManager {

    // Retrofit
    RetrofitItemsService itemsService;

    // The observalble provided by retrofit
    Observable<List<Item>> itemsObservable;

    //ctor
    public DataManager(RetrofitItemsService itemsService) {
        this.itemsService = itemsService;
    }

    /* Creates and stores an observable if one has not been created yet.
     * Returns the observable so that it can be subscribed to by the function caller
     */
    public Observable<List<Item>> getItems(){
        if(itemsObservable == null){
            itemsObservable = itemsService.getItems();
        }

        return itemsObservable;
    }

    /* Adds a new Item to the list.
     */
    public Completable addItem(Item item){
        Completable call = itemsService.addItem(item);

        call.subscribe(()->{
            /*
             < < < Here > > >
             If someone has previously called getItems before this item was added, they now have stale data.

             How can I call something like:

             itemsObservable.refreshAllSubscribers()
            */
        });

        return call;
    }
}
4

2 回答 2

2

您在这里要解决的问题在于可观察到的冷之间的差异。有很多很棒的文章你可以用谷歌搜索出来,它们详细描述了这些差异,所以让我只描述基础知识。

Cold observable 为每个订阅者创建一个新的生产者这意味着,当两个单独的订阅者订阅同一个冷可观察对象时,他们每个人都会收到这些排放的不同实例。它们可能(!)是相等的,但它们永远是不同的对象。在此处应用于您的案例,每个订阅者都有自己的生产者,该生产者向服务器请求数据并将其传递到流中。每个订阅者都从自己的生产者那里获得数据。

Hot observable 与其所有观察者共享一个生产者。例如,如果生产者正在迭代对象集合,则在发射过程中加入第二个订阅者意味着它将仅获得之后发射的项目(如果它没有通过类似的操作符进行修改replay)。任何订阅者收到的每个对象在所有观察者中也是相同的实例,因为它来自单个生产者。

因此,从外观上看,您需要有一个热的 observable 来分发您的数据,这样当您知道它不再有效时,您只需通过这个热的 observable 发出一次,每个观察者都会对更新感到满意。

幸运的是,将可观察到的冷态转变为热态通常没什么大不了的。您可以创建自己的生产者来模仿这种行为,使用流行的操作符之一,share或者您可以转换流,使其表现得像一个。

我建议使用 PublishSubject 刷新数据并将其与原始的冷可观察数据合并,如下所示:

class DataManager {

    .....

    PublishSubject<Boolean> refreshSubject = PublishSubject.create();

    // The observable for retrieving always fresh data
    Observable<List<Item>> itemsObservable;

    //ctor
    public DataManager(RetrofitItemsService itemsService) {
        this.itemsService = itemsService;
        itemsObservable = itemsService.getItems()
                              .mergeWith(refreshSubject.flatMap(refresh -> itemsService.getItems()))
    }


    public Observable<List<Item>> getItems(){
        return itemsObservable;
    }

    /* Adds a new Item to the list.
     */
    public Completable addItem(Item item){
        Completable call = itemsService.addItem(item);

        call.subscribe(()->{
            refreshSubject.onNext(true);
        });

        return call;
    }
}
于 2016-12-20T11:54:25.987 回答
1

我猜想itemsService.getItems()返回单个元素Observable,因此消费者无论如何都必须重新订阅才能获得新数据,并且他们会得到它,因为 Retrofit Observables 也是延迟/惰性的。

您可以在数据更改时触发一个单独Observable的“long” :PublishSubject

final Subject<Object> onItemsChanged = PublishSubject.create().toSerialized();

public Observable<Object> itemsChanged() {
    return onItemsChanged;
}

public Completable addItem(Item item){
    Completable call = itemsService.addItem(item);

    // prevent triggering the addItem multiple times
    // Needs RxJava 2 Extensions library for now
    // as there is no Completable.cache() or equivalent in 2.0.3
    CompletableSubject cs = CompletableSubject.create();

    call.doOnComplete(() -> onItemsChanged.onNext("changed"))
    .subscribe(cs);

    return cs;
}
于 2016-12-20T11:55:47.567 回答