0

我正在使用 RxJava2、Micronaut 和 Cassandra 处理响应式流。我是 rxjava 的新手,不确定以最佳异步方式返回 List Person 的正确方法是什么?

数据来自 Cassandra Dao 接口

public interface PersonDAO {    
    @Query("SELECT * FROM cass_drop.person;")
    CompletionStage<MappedAsyncPagingIterable<Person>> getAll(); 
}

注入到 micronaut 控制器中

   return Single.just(personDAO.getAll().toCompletableFuture().get().currentPage())
            .subscribeOn(Schedulers.io())
            .map(people -> HttpResponse.ok(people));

或者

return Single.just(HttpResponse.ok())
        .subscribeOn(Schedulers.io())
        .map(it -> it.body(personDAO.getAll().toCompletableFuture().get().currentPage()));

或切换到 RxJava3

    return Single.fromCompletionStage(personDAO.getAll())
            .map(page -> HttpResponse.ok(page.currentPage()))
            .onErrorReturn(throwable -> HttpResponse.ok(Collections.emptyList()));
4

2 回答 2

2

不是 RxJava 和 Cassandra 的专家:

在您的第一个和第二个示例中,您正在阻塞执行CompletionStagewith的线程get,即使您在 IO 线程中执行此操作,我也不建议您这样做。

您还使用了一个Single可以发出的,只有一个值或一个错误。既然你想返回一个List,我建议你至少去一个Observable

第三点,Cassandra的结果是分页的,我不知道是不是故意的,但你只列出了第一页,而错过了其他页面。

我会尝试像下面这样的解决方案,我一直使用 IO 线程(该操作在 IO 中可能代价高昂)并遍历 Cassandra fetch 页面:


    /* the main method of your controller */
    @Get()
    public Observable<Person> listPersons() {
        return next(personDAO.getAll()).subscribeOn(Schedulers.io());
    }

    private Observable<Person> next(CompletionStage<MappedAsyncPagingIterable<Person>> pageStage) {
        return Single.fromFuture(pageStage.toCompletableFuture())
                .flatMapObservable(personsPage -> {
                    var o = Observable.fromIterable(personsPage.currentPage());
                    if (!personsPage.hasMorePages()) {
                        return o;
                    }
                    return o.concatWith(next(personsPage.fetchNextPage()));
                });
    }
于 2020-09-02T15:40:25.337 回答
1

如果你打算使用reactor而不是RxJava,那么你可以试试cassandra-java-driver-reactive-mapper

语法相当简单,仅在编译时工作。

于 2021-02-04T17:41:59.203 回答