0

我有一个需要很长时间才能完成的异步操作管道,如果管道在截止日期之前没有生成项目,我想让 Uni 继续使用占位符项目,如下所示:

Uni<Item> u = processingPipeline();
return u.ifNoItem().after(Duration.of(10, SECONDS)).recoverWith(placeholderItem);

但是当使用这种方法时,处理流水线被取消并且操作链被中断。是否可以在截止日期到期而不取消处理管道的情况下使用占位符项目完成 Uni?即使错过了最后期限,我也希望管道能够持续到最后。如果有帮助,可以使用 Vert.x。

谢谢

编辑:

这是我迄今为止尝试过的:

Uni<Item> u = processingPipeline();
return Uni.createFrom().emitter(emitter -> {
    vertx.executeBlocking(handler -> {
        u.subscribe().with(emitter::complete);
    }, resultHandler -> {});
    vertx.setTimer(TimeUnit.SECONDS.toMillis(10), timerId -> {
        emitter.complete(placeholderItem);
    });
});

当处理在截止日期之前完成时,这可以正常工作,但是如果截止日期到期并且计时器被触发,它会javax.enterprise.context.ContextNotActiveException在发出占位符项目时崩溃,并且应用程序似乎陷入某种死锁。

编辑 2

原来我的大部分问题实际上是由于不正确使用 Hibernate 引起的。在对 Hibernate 事务的管理方式进行了一些重构之后,所有随机死锁和其他问题都消失了。

这似乎是原始问题最优雅的解决方案,即如何在不中断处理的情况下使用虚拟项目进行响应,除非它在截止日期之前完成:

Uni<Item> u = processingPipeline();
return Uni.createFrom().emitter(emitter -> {
    u.subscribe().with(emitter::complete);
    vertx.setTimer(TimeUnit.SECONDS.toMillis(10), timerId -> {
        emitter.complete(placeholderItem);
    });
});
4

1 回答 1

2

管道的取消是意料之中的,因为ifNoItem().after(duration)触发了TimeoutException. 所以这个异常作为失败传播,在这个特定点的上游被取消,这符合反应流语义。

recoverWith是一个故障恢复操作符,并且在订阅该操作符之后的任何内容。

您可能想查看recoverWithUni,您将在其中提供 aUni作为恢复,这Uni将捕获在此特定超时失败点之后可能重新订阅的管道的其余部分。

希望有帮助。

于 2021-04-02T07:38:02.783 回答