1

在使用 Spring Webflux 时,我试图在领域对象服务器中插入一些数据,该服务器通过 Rest API 与 Java 应用程序交互。所以基本上我有一组学生,他们有一组科目,我的目标是以非阻塞方式坚持这些科目。所以我使用了一个通过休息端点公开的微服务,它为我提供了一个学生卷号的通量,对于那个通量,我使用另一个通过休息端点公开的微服务,它可以让我得到科目的通量,对于这些科目中的每一个,我想通过另一个休息端点将它们保存在领域服务器中。我想让这一切都变得非常非阻塞,这就是为什么我希望我的代码看起来像这样。

void foo() {
studentService.getAllRollnumbers().flatMap(rollnumber -> {
    return subjectDirectory.getAllSubjects().map(subject -> {
        return dbService.addSubject(subject);
    })
});

}

但这由于某种原因不起作用。但是,一旦我在这些东西上调用了块,它们就会到位,就像这样。

Flux<Done> foo() {
    List<Integer> rollNumbers = studentService.getAllRollnumbers().collectList().block();

    rollNumbers.forEach(rollNumber -> {
        List<Subject> subjects = subjectDirectory.getAllSubjects().collectList().block();

    subjects.forEach(subject -> {dbService.addSubject(subject).block();});
    });

    return Flux.just(new NotUsed());
}

getAllRollnumbers() returns a flux of integers.
getAllSubjects() returns a flux of subject.
and addSubject() returns a Mono of DBResponse pojo.

我能理解的是,执行这个函数的线程在它的大部分被触发之前就已经过期了。请帮助我以异步非阻塞方式处理此代码。

4

1 回答 1

0

您根本没有订阅发布者,这就是它没有执行的原因。你可以这样做:

studentService.getAllRollnumbers().flatMap(rollnumber -> {
    return subjectDirectory.getAllSubjects().map(subject -> {
        return dbService.addSubject(subject);
    })
}).subscribe();

然而,让框架处理订阅通常会更好,但没有看到我无法建议的其余代码。

于 2019-03-28T19:49:14.370 回答