我正在使用 RxJava2、Micronaut 和 Cassandra 处理响应式流。我是 rxjava 的新手,不确定以最佳异步方式返回 List Person 的正确方法是什么?
数据来自 Cassandra Dao 接口
public interface PersonDAO {
@Query("SELECT * FROM cass_drop.person;")
CompletionStage<MappedAsyncPagingIterable<Person>> getAll();
}
注入到 micronaut 控制器中
return Single.just(personDAO.getAll().toCompletableFuture().get().currentPage()) .subscribeOn(Schedulers.io()) .map(people -> HttpResponse.ok(people));
或者
return Single.just(HttpResponse.ok()) .subscribeOn(Schedulers.io()) .map(it -> it.body(personDAO.getAll().toCompletableFuture().get().currentPage()));
或切换到 RxJava3
return Single.fromCompletionStage(personDAO.getAll()) .map(page -> HttpResponse.ok(page.currentPage())) .onErrorReturn(throwable -> HttpResponse.ok(Collections.emptyList()));