编辑:看到这个更清晰和准确的问题: RxJava flatMap and backpressure 奇怪的行为
我目前正在使用 RxJava 编写数据同步作业,而且我对反应式编程和特别是 RxJava 库非常陌生。
我的工作很简单,我有一个元素 ID 列表,我调用一个 Web 服务按 ID 获取每个元素,进行一些处理并进行多次调用以将数据推送到数据库。
我使用 1 个 io 线程从 WS 加载数据,并使用多个 io 线程将数据推送到数据库。但是我总是以 OutOfMemory 错误告终。我首先认为从 WS 加载数据比将它们存储在数据库中要快。
但是作为 WS 调用和 DB 调用的同步调用,它们是否应该相互施加背压?
谢谢您的帮助。
我的代码几乎是这样的:
@Test
public void test() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;
List<Integer> ids = IntStream.range(0, 10000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.toBlocking().forEach(s -> System.out.println("Value " + s));
System.out.println("Finished");
}
private Observable<Integer> produce(final int value) {
return Observable.<Integer>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(500); //Here I call WS to retrieve data
s.onNext(value);
s.onCompleted();
}
} catch (Exception e) {
s.onError(e);
}
}).subscribeOn(Schedulers.io());
}
private Observable<Boolean> consume(Integer value) {
return Observable.<Boolean>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(10000); //Here I call DB to store data
s.onNext(true);
s.onCompleted();
}
} catch (Exception e) {
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(Schedulers.io());
}