2

我有一个从很多来源获取项目的 observable:

Source { List<Item> data }

源和项目之间的关系是多对多的,并且在不同的源中,项目可能会自我复制。项目是应该上传到服务器的实体,服务器不接受重复项。为了实现这一点,我合并源并通过它们的 id 区分它们的项目,然后将唯一项目上传到服务器。如下所示:

Observable.merge(source1(), source2(), source3())
            .flatMapIterable(sources -> sources)
            .flatMapIterable(source::getItems)
            .distinct(item -> item.getId())
            .flatMapCompletabale(item -> uploadItem(item))

项目上传可能会发出几个错误,其中一些我应该稍后重试上传项目并继续另一个项目,而“失败”的项目正在等待重试。

我如何推迟重试上传“失败”的项目并继续其他项目,而这个项目正在等待它的尝试?

提前致谢!

4

3 回答 3

4

要仅处理一次上传失败,您可以在最后一步添加一个运算符:

  .flatMapCompletable(item->uploadItem(item))

应该成为

  .flatMapCompletable(item->uploadItem(item)
                              .retryWhen(throwable -> 
                                  throwable.delay(5, TimeUnit.SECONDS)))

retryWhen()编辑:我从这个 Dan Lew 博客条目中学到了很多关于操作员的知识。您会发现几个示例,包括使用zip()运算符 withObservable.range(3)来限制重试次数。

于 2017-09-11T18:53:27.913 回答
0

我不得不修改上面的例子来创建一个 Flowable 来 retryWhen a Single 在我的 RxJava2 项目中:

import io.reactivex.Flowable; import io.reactivex.functions.Function;

import java.util.concurrent.TimeUnit;

public class RetryWithDelay implements Function<Flowable<? extends Throwable>, Flowable<?>> {

    private final int maxRetryCount;
    private final int retryDelay;
    private int retryCount;
    private TimeUnit timeUnit;

    public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
        this.maxRetryCount = maxRetryCount;
        this.retryDelay = retryDelay;
        this.timeUnit = timeUnit;
        this.retryCount = 0;
    }

    @Override
    public Flowable<?> apply(final Flowable<? extends Throwable> attempts) {

        return attempts.flatMap((Function<Throwable, Flowable<?>>) throwable -> {

            if (++retryCount < maxRetryCount) {
                return Flowable.timer(retryDelay, timeUnit);
            }

            return Flowable.error(throwable);
        });
    } }

并将其应用于我的单曲:

.retryWhen(new RetryWithDelay(5, 2, TimeUnit.SECONDS))
于 2018-09-30T19:05:43.410 回答
0

我将此函数放入 retryWhen 方法并使其工作。

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {

private final int maxRetryCount;
private final int retryDelay;
private int retryCount;
private TimeUnit timeUnit;

public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
    this.maxRetryCount = maxRetryCount;
    this.retryDelay = retryDelay;
    this.timeUnit = timeUnit;
    this.retryCount = 0;
}

@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
    return attempts.flatMap((Function<Throwable, Observable<?>>) throwable -> {

        if (++retryCount < maxRetryCount) {
            return Observable.timer(retryDelay, timeUnit);
        }

        return Observable.error(throwable);
    });
}
}
于 2017-09-20T07:06:44.493 回答