0

我想在实现轮询时使用带有事务传播的 SourcePollingChannelAdapter,以便在发生错误时回滚所有操作。方法 setTransactionSynchronizationFactory 没有评论...非常感谢您的帮助!

在 XML 中,我可以这样做:

<int:poller fixed-rate="5000">
  <int:transactional transaction-manager="transactionManager" propagation="REQUIRED" />
</int:poller>

我想通过 SourcePollingChannelAdapter 和 PeriodicTrigger 以编程方式使用这样的事务,但我不知道如何。

我有这个 :

SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
adapter.setSource(source);
adapter.setTrigger(new PeriodicTrigger(5, TimeUnit.SECONDS));
adapter.setOutputChannel(channel);
adapter.setBeanFactory(ctx);
adapter.start();

当bean源被调用时,数据库中的一个元素被删除,一个消息被创建并在outputchannel中发送;但是,如果在输出通道之后我的流程出现错误,我希望数据库恢复并且元素回来......实际上是一个简单的事务与传播。我不明白怎么做。

输出通道是:

<int:channel id="channel" >
    <int:queue />
</int:channel>
<int-http:outbound-gateway request-channel="channel"
    url="http://localhost:8081/icopitole-ws/baseactive" http-method="GET"
    reply-channel="reresponseVersionChannel" expected-response-type="java.lang.String"  />

当 URL 没有响应时,会引发异常但不执行回滚,尽管我已经添加了 DefaultTransactionSynchronizationFactory 和 TransactionInterceptor 就像你说的那样:(

4

1 回答 1

0

如果我理解正确,您需要使用这个:DefaultTransactionSynchronizationFactory

这是如何配置它的快照:

SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
    ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
            new ExpressionEvaluatingTransactionSynchronizationProcessor();
    syncProcessor.setBeanFactory(mock(BeanFactory.class));
    PollableChannel queueChannel = new QueueChannel();
    syncProcessor.setBeforeCommitExpression(new SpelExpressionParser().parseExpression("#bix"));
    syncProcessor.setBeforeCommitChannel(queueChannel);
    syncProcessor.setAfterCommitChannel(queueChannel);
    syncProcessor.setAfterCommitExpression(new SpelExpressionParser().parseExpression("#baz"));

    DefaultTransactionSynchronizationFactory syncFactory =
            new DefaultTransactionSynchronizationFactory(syncProcessor);

    adapter.setTransactionSynchronizationFactory(syncFactory);

事务边界包含在 中SourcePollingChannelAdapter#adviceChain,因此应该像这样配置:

    TransactionInterceptor txAdvice = 
    new TransactionInterceptor(transactionManager, 
         new MatchAlwaysTransactionAttributeSource(new DefaultTransactionAttribute()));
    adapter.setAdviceChain(Collections.singletonList(txAdvice));

所以,现在每个“民意调查”都将被交易包裹起来,你syncFactory会做这些事情。

于 2013-10-17T13:01:32.277 回答