0

我想在向反应式消息连接器发送消息的事务性 REST 端点之间传播 JTA 状态(= 事务)。

@Inject
@Channel("test")
Emitter<String> emitter;

@POST
@Transactional
public Response test() {
    emitter.send("test");
}

@ApplicationScoped
@Connector("test")
public class TestConnector implements OutgoingConnectorFactory {

    @Inject
    TransactionManager tm;

    @Override
    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        return ReactiveStreams.<Message<?>>builder()
            .flatMapCompletionStage(message -> {
                tm.getTransaction(); // = null
                return message.ack();
            })
            .ignore();
    }
}

据我了解,上下文传播负责使事务可用(请参阅 参考资料io.smallrye.context.jta.context.propagation.JtaContextProvider#currentContext)。问题似乎是,它currentContext是在订阅时创建的,这在注入点 ( Emitter<String> emitter) 获取其实例时发生。要正确捕获交易还为时过早。

我错过了什么?

顺便说一句,我在使用@Incoming/@Outgoing而不是发射器时遇到了同样的问题。我决定给你这个例子是因为它很容易理解和重现。

4

1 回答 1

0

目前,您需要在消息元数据中传递当前事务。因此,它将传播到您不同的下游组件(以及连接器)。

请注意,Transaction 往往附加到请求范围,这意味着在您的连接器中,使用它可能已经太晚了。因此,请确保您的端点是异步的,并且仅在发出的消息被确认时才返回。

在这种情况下,上下文传播不会有帮助,因为底层流是在启动时构建的(在 Quarkus 中的构建时),因此没有捕获上下文。

于 2021-02-24T06:39:26.013 回答