2

我正在进行异步调用,10 秒后 1 分钟,这意味着将进行大约 6 次调用,但问题在于我希望它适用delay于特定condition

Observable
.just(listOfSomethings_Locally)
.take(1, TimeUnit.MINUTES)
.serialize()
.delaySubscription( // this is confusing part 
() ->
    Observable.just(listOfItems_Network).take(10,TimeUnit.SECONDS)
) 

我想要的是将网络呼叫延迟 10 秒,除了第一次呼叫,并在 10 秒后取消网络呼叫,所以我应该在 1 分钟内有准确的 6 个呼叫。

编辑

由于场景混乱,这里重新定义了场景:

我拥有的是本地驱动程序的大量列表,我想每 10 秒后向每个人发送请求,并听取另一个订阅者检查驱动程序是否在 10 秒内没有取消它,这个过程将持续大约 1 分钟,如果一位司机取消,我应该立即向下一位司机发送请求

到目前为止编写的代码:

Observable.from(driversGot)
                .take(1,TimeUnit.MINUTES)
                .serialize()
                .map(this::requestRydeObservable) // requesting for single driver from driversGot (it's a network call)
                .flatMap(dif ->
                        Observable.amb(
                                kh.getFCM().driverCanceledRyde(), // listen for if driver cancel request returns integer
                                kh.getFCM().userRydeAccepted()) // listen for driver accept returns RydeAccepted object
                                .map(o -> {
                                    if (o instanceof Integer) {
                                        return new RydeAccepted();
                                    } else if (o instanceof RydeAccepted) {
                                        return (RydeAccepted) o;
                                    }
                                    return null;
                                }).delaySubscription(10,TimeUnit.SECONDS)
                )
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(fua -> {
                    if (fua == null) {
                        UiHelpers.showToast(context, "Invalid Firebase response");
                    } else if (!fua.getStatus()) { // ryde is canceled because object is empty
                        UiHelpers.showToast(context, "User canceled ryde");
                    } else { // ryde is accepted
                        UiHelpers.showToast(context, "User accepted ryde");
                    }
                }, t -> {
                    t.printStackTrace();
                    UiHelpers.showToast(context,"Error sending driver requests");
                }, UiHelpers::stopLoading);
4

3 回答 3

2

对您的代码的反馈

您不需要take并且serialize已经just会立即+连续地发出东西。

delaySubscription似乎是一个奇怪的选择,因为在传递的 observable 生成事件之后,不会延迟进一步的事件(这与您的情况相矛盾) 延迟订阅

选项 #1,仅 rx

使用delay+ 计算其余事件的各个延迟(因此第 1 次延迟 0 秒,第 2 次延迟 1 秒,第 3 次延迟 3,...)

            AtomicLong counter = new AtomicLong(0);
    System.out.println(new Date());
    Observable.just("1", "2", "3", "4", "5", "6")
        .delay(item -> Observable.just(item).delay(counter.getAndIncrement(), TimeUnit.SECONDS))
        .subscribe(new Consumer<String>() {
            public void accept(String result) throws Exception {
                System.out.println(result + " " + new Date());
            }
        });        
        System.in.read();

选项 #2:速率限制

看来您的用例适合速率限制,因此我们可以使用来自 guava 的 RateLimiter:

            RateLimiter limiter = RateLimiter.create(1);
    System.out.println(new Date());
    Observable.just("1", "2", "3", "4", "5", "6")
        .map(r -> {
            limiter.acquire();
            return r;
        })
        .subscribe(new Consumer<String>() {
            public void accept(String result) throws Exception {
                System.out.println(result + " " + new Date());
            }
        });        
        System.in.read();

两者的工作方式相似:

Tue Nov 15 11:14:34 EET 2016
1 Tue Nov 15 11:14:34 EET 2016
2 Tue Nov 15 11:14:35 EET 2016
3 Tue Nov 15 11:14:36 EET 2016
4 Tue Nov 15 11:14:37 EET 2016
5 Tue Nov 15 11:14:38 EET 2016
6 Tue Nov 15 11:14:39 EET 2016

速率限制器在您的请求的情况下会更好地工作,例如处理 5 秒,然后它将允许下一个请求更快地补偿延迟并达到 1req/s 的目标 10 秒。

于 2016-11-15T09:16:26.270 回答
0

您好,您可以延迟重试,有一种方法可以在这里做到这一点

于 2016-11-15T00:29:38.293 回答
0

我想更新@Ivans 帖子,因为它缺少错误处理并且正在使用副作用。

这篇文章将只使用 RxJava-operators。observable 将每 10 秒提供一个值。请求将在 10 秒后超时。如果超时,将返回一个备用值。

第一个测试方法将在 60 秒内收到 10 个值。如果最后一个请求早于 10 秒完成,则 observable 可能会在 60 秒之前完成。

public class TimeOutTest {
    private static String DUMMY_VALUE = "ERROR";

    @Test
    public void handles_in_60_seconds() throws Exception {
        List<Integer> actions = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        Observable<Long> timer = Observable.interval(0, 10, TimeUnit.SECONDS).take(6);

        Observable<Integer> vales = Observable.fromIterable(actions)
                .take(6);

        Observable<Integer> observable = Observable.zip(timer, vales, (time, result) -> {
            return result;
        });

        Observable<String> stringObservable = observable.flatMap(integer -> {
            return longNetworkLong(9_000)
                    .timeout(10, TimeUnit.SECONDS)
                    .onErrorReturnItem(DUMMY_VALUE);
        }).doOnNext(s -> System.out.println("VALUE"));

        stringObservable.test()
                .awaitDone(60, TimeUnit.SECONDS)
                .assertValueCount(6);
    }

    @Test
    public void last_two_values_timeOut() throws Exception {
        List<Integer> actions = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        Observable<Long> timer = Observable.interval(0, 10, TimeUnit.SECONDS).take(6);

        Observable<Integer> vales = Observable.fromIterable(actions)
                .take(6);

        Observable<Integer> observable = Observable.zip(timer, vales, (time, result) -> {
            return result;
        });

        Observable<String> stringObservable = observable
                .map(integer -> integer * 2500)
                .flatMap(integer -> {
                    return longNetworkLong(integer)
                            .timeout(10, TimeUnit.SECONDS)
                            .doOnError(throwable -> System.out.print("Timeout hit?"))
                            .onErrorReturnItem(DUMMY_VALUE);
                })
                .doOnNext(s -> System.out.println("VALUE"))
                .filter(s -> !Objects.equals(s, DUMMY_VALUE));

        stringObservable.test()
                .awaitDone(60, TimeUnit.SECONDS)
                .assertValueCount(4);

    }

    private Observable<String> longNetworkLong(int delayTime) {
        return Observable.fromCallable(() -> {
            Thread.sleep(delayTime);
            return "result";
        });
    }
}
于 2016-11-15T10:13:49.987 回答