1

我想在 manuel 中使用 execute.Async 调用对 cassandra db 进行异步调用我找到了这段代码,但我不明白如何将所有行收集到任何列表中。像 Select * from table 这样的基本调用,我想存储所有结果。

https://docs.datastax.com/en/developer/java-driver/4.4/manual/core/async/

CompletionStage<CqlSession> sessionStage = CqlSession.builder().buildAsync();

// Chain one async operation after another:
CompletionStage<AsyncResultSet> responseStage =
    sessionStage.thenCompose(
        session -> session.executeAsync("SELECT release_version FROM system.local"));

// Apply a synchronous computation:
CompletionStage<String> resultStage =
    responseStage.thenApply(resultSet -> resultSet.one().getString("release_version"));

// Perform an action once a stage is complete:
resultStage.whenComplete(
    (version, error) -> {
      if (error != null) {
        System.out.printf("Failed to retrieve the version: %s%n", error.getMessage());
      } else {
        System.out.printf("Server version: %s%n", version);
      }
      sessionStage.thenAccept(CqlSession::closeAsync);
    });
4

2 回答 2

0

您需要参考关于异步分页的部分- 您需要提供一个回调,它将数据收集到作为外部对象提供的列表中。文档有以下示例:

CompletionStage<AsyncResultSet> futureRs =
    session.executeAsync("SELECT * FROM myTable WHERE id = 1");
futureRs.whenComplete(this::processRows);

void processRows(AsyncResultSet rs, Throwable error) {
  if (error != null) {
    // The query failed, process the error
  } else {
    for (Row row : rs.currentPage()) {
      // Process the row...
    }
    if (rs.hasMorePages()) {
      rs.fetchNextPage().whenComplete(this::processRows);
    }
  }
}

在这种情况下processRows,可以将数据存储在作为当前对象一部分的列表中,如下所示:

class Abc {
  List<Row> rows = new ArrayList<>();

  // call to executeAsync

  void processRows(AsyncResultSet rs, Throwable error) {
....
    for (Row row : rs.currentPage()) {
      rows.add(row);
    }
....

  }
}

但是您需要非常小心,select * from table因为它可能会返回很多结果,而且如果您有太多数据,它可能会超时 - 在这种情况下,最好执行令牌范围扫描(我有一个驱动程序 3.x 的示例,但 4.x 还没有)。

于 2020-02-09T11:41:04.740 回答
0

这是 4.x 的示例(您还可以从 4.4 BTW 找到响应式代码示例)

https://github.com/datastax/cassandra-reactive-demo/blob/master/2_async/src/main/java/com/datastax/demo/async/repository/AsyncStockRepository.java

于 2020-02-10T12:06:41.427 回答