3

我正在尝试从 Couchbase 响应式获取大型数据集。我使用ReactiveCouchbaseRepositoryspring数据提供的。

public interface ReactiveFooRepository extends ReactiveCouchbaseRepository<Foo, String> {

    @Query("#{#n1ql.selectEntity} WHERE ... ORDER BY ...")
    Flux<Foo> findAll();
}

在我的服务中,我订阅如下

repository.findAll()
          .subscribe(this::process,
                     t -> LOGGER.error("Failed to process.", t));

在我的测试数据集上这工作正常,但是在生产中,查询返回一个大数据集并运行大约 20-25 秒。

我的理解是,这正是响应式存储库的用途:消耗大量结果而不需要显式分页等。

然而,我得到的是IllegalStateException

Failed to process.
java.lang.IllegalStateException: The content of this Observable (queryRow.13de03e2-9271-47e2-9d56-df01038011f9) is already released. Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting.
...

提高autoreleaseAfter超时似乎不是一个可靠的解决方案。在发布第一个结果元素之前,似乎整个结果都在 Couchbase 中缓冲。

我有什么问题吗?关于问题或解决方案可能是什么的任何想法?

编辑

我发现的一个问题是订购。我假设 Couchbase 必须先获取整个结果集,然后才能对其进行排序。删除ORDER BY子句后,我可以流式传输结果,但仅限于直接使用 java SDK 时。

以下代码工作并立即开始流式传输数据:

public void applyToAllFooAsync(Consumer<Optional<Foo>> consumer) {
    String queryString = String.format("SELECT meta().id, _class, field1, field2, ... "
                                           + "FROM %s "
                                           + "WHERE ... ",
                                       getQuotedBucketName());

    N1qlParams params = N1qlParams.build().consistency(ScanConsistency.STATEMENT_PLUS).pretty(false);
    N1qlQuery query = N1qlQuery.simple(queryString, params);

    asyncBucket
        .query(query)
        .flatMap(AsyncN1qlQueryResult::rows)
        .map(this::getFoo)
        .forEach(consumer::accept);
}

protected Optional<Foo> getFoo(AsyncN1qlQueryRow row) {
    try {
        return Optional.of(objectMapper.readValue(row.byteValue(), clazz));
    } catch (IOException e) {
        LOGGER.warn("Could not map Foo to object.", e);
        return Optional.empty();
    }
}

applyToAllOperationsAsync(System.out::println));

但是,如果我在 a 中使用完全相同的查询ReactiveCouchbaseRepository,则需要几秒钟,然后抛出上面显示的异常。

有人知道不同的行为可能来自哪里吗?有人可以指出在 spring 数据代码中使用实际 java SDK 的类或方法吗?

4

0 回答 0