我正在尝试从 Couchbase 响应式获取大型数据集。我使用ReactiveCouchbaseRepository
spring数据提供的。
public interface ReactiveFooRepository extends ReactiveCouchbaseRepository<Foo, String> {
@Query("#{#n1ql.selectEntity} WHERE ... ORDER BY ...")
Flux<Foo> findAll();
}
在我的服务中,我订阅如下
repository.findAll()
.subscribe(this::process,
t -> LOGGER.error("Failed to process.", t));
在我的测试数据集上这工作正常,但是在生产中,查询返回一个大数据集并运行大约 20-25 秒。
我的理解是,这正是响应式存储库的用途:消耗大量结果而不需要显式分页等。
然而,我得到的是IllegalStateException
Failed to process.
java.lang.IllegalStateException: The content of this Observable (queryRow.13de03e2-9271-47e2-9d56-df01038011f9) is already released. Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting.
...
提高autoreleaseAfter
超时似乎不是一个可靠的解决方案。在发布第一个结果元素之前,似乎整个结果都在 Couchbase 中缓冲。
我有什么问题吗?关于问题或解决方案可能是什么的任何想法?
编辑
我发现的一个问题是订购。我假设 Couchbase 必须先获取整个结果集,然后才能对其进行排序。删除ORDER BY
子句后,我可以流式传输结果,但仅限于直接使用 java SDK 时。
以下代码工作并立即开始流式传输数据:
public void applyToAllFooAsync(Consumer<Optional<Foo>> consumer) {
String queryString = String.format("SELECT meta().id, _class, field1, field2, ... "
+ "FROM %s "
+ "WHERE ... ",
getQuotedBucketName());
N1qlParams params = N1qlParams.build().consistency(ScanConsistency.STATEMENT_PLUS).pretty(false);
N1qlQuery query = N1qlQuery.simple(queryString, params);
asyncBucket
.query(query)
.flatMap(AsyncN1qlQueryResult::rows)
.map(this::getFoo)
.forEach(consumer::accept);
}
protected Optional<Foo> getFoo(AsyncN1qlQueryRow row) {
try {
return Optional.of(objectMapper.readValue(row.byteValue(), clazz));
} catch (IOException e) {
LOGGER.warn("Could not map Foo to object.", e);
return Optional.empty();
}
}
applyToAllOperationsAsync(System.out::println));
但是,如果我在 a 中使用完全相同的查询ReactiveCouchbaseRepository
,则需要几秒钟,然后抛出上面显示的异常。
有人知道不同的行为可能来自哪里吗?有人可以指出在 spring 数据代码中使用实际 java SDK 的类或方法吗?