0

编辑:看到这个更清晰和准确的问题: RxJava flatMap and backpressure 奇怪的行为

我目前正在使用 RxJava 编写数据同步作业,而且我对反应式编程和特别是 RxJava 库非常陌生。

我的工作很简单,我有一个元素 ID 列表,我调用一个 Web 服务按 ID 获取每个元素,进行一些处理并进行多次调用以将数据推送到数据库。

我使用 1 个 io 线程从 WS 加载数据,并使用多个 io 线程将数据推送到数据库。但是我总是以 OutOfMemory 错误告终。我首先认为从 WS 加载数据比将它们存储在数据库中要快。

但是作为 WS 调用和 DB 调用的同步调用,它们是否应该相互施加背压?

谢谢您的帮助。

我的代码几乎是这样的:

@Test
public void test() {
    int MAX_CONCURRENT_LOAD = 1;
    int MAX_CONCURRENT_STORE = 2;

    List<Integer> ids = IntStream.range(0, 10000).boxed().collect(Collectors.toList());
    Observable.from(ids)
        .flatMap(this::produce, MAX_CONCURRENT_LOAD)
        .flatMap(this::consume, MAX_CONCURRENT_STORE)
        .toBlocking().forEach(s -> System.out.println("Value " + s));

    System.out.println("Finished");
}

private Observable<Integer> produce(final int value) {
    return Observable.<Integer>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(500); //Here I call WS to retrieve data
                s.onNext(value);
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

private Observable<Boolean> consume(Integer value) {
    return Observable.<Boolean>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(10000); //Here I call DB to store data
                s.onNext(true);
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onNext(false);
            s.onCompleted();
        }
    }).subscribeOn(Schedulers.io());
}
4

1 回答 1

1

看起来你的 WS 是基于轮询的,所以如果你使用fromCallable而不是自定义 Observable,你会得到适当的背压:

return Observable.<Integer>fromCallabe(s -> {
    Thread.sleep(500); //Here I call WS to retrieve data
    return value;
}).subscribeOn(Schedulers.io());

否则,如果您有阻塞的 WS 和阻塞的数据库,您可以使用它们相互背压:

ids.map(id -> db.store(ws.get(id)).subscribeOn(Schedulers.io())
.toBlocking().subscribe(...)

并且可能会放弃 subscribeOn 和 toBlocking 。

于 2016-02-10T14:37:33.723 回答