0

编辑:这只发生在 MSSQL 和 jtds 1.2.6 仍在调查中...
**重复的:Mule 3.3.0 Jdbc 事务不需要的提交
**Mule 文档很烂。

我想回滚具有多个数据库端点的流中的所有内容。
我有一个单一的 JDBC 数据源资源(即不需要花哨的 XA、2PC 等)。
我已经设法将 Mule 配置为,至少不会抱怨没有配置事务管理器等......但是:它不起作用;即发生异常时它不会回滚事务。
由于我是独立运行 Mule,我没有花哨的 weblogic、jboss 等事务管理器,所以我想我可以使用 Spring 的 DataSourceTransactionManager。对此我还有什么选择?

这是我的流程(flow1 仅用于触发 flow2,这是我想要交易的流程):

流动

和 XML:

<?xml version="1.0" encoding="UTF-8"?>
<mule version="CE-3.3.0">
    <spring:beans>
        <spring:bean id="transactionManager"
            class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <spring:property name="dataSource" ref="dataSource" />
        </spring:bean>

        <spring:bean id="transactionFactory"
            class="org.mule.module.spring.transaction.SpringTransactionFactory">
            <spring:property name="manager" ref="transactionManager" />
        </spring:bean>

        <spring:bean id="dataSource" name="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
            <spring:property name="driverClassName" value="com.mysql.jdbc.Driver"/>
            <spring:property name="url" value="jdbc:mysql://localhost/mydb"/>
            <spring:property name="username" value="sa"/>
            <spring:property name="password" value=""/>
        </spring:bean>    
    </spring:beans>

    <jdbc:connector name="jdbcConnector" dataSource-ref="dataSource"
        transactionPerMessage="true" queryTimeout="20000" pollingFrequency="10000"
        doc:name="Database" validateConnections="false"></jdbc:connector>
    <flow name="flow1" doc:name="flow1">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP"/>
        <vm:outbound-endpoint exchange-pattern="request-response" path="toFlow2" doc:name="VM"/>
    </flow>
    <flow name="flow2" doc:name="flow2">
        <vm:inbound-endpoint exchange-pattern="request-response" path="toFlow2" doc:name="VM">
        <custom-transaction factory-ref="transactionFactory" action="ALWAYS_BEGIN" timeout="10"/>
    </vm:inbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="query1" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database">
            <jdbc:query key="query1" value="insert into Foo (field1) values ('bar')"/>
            <jdbc:transaction action="ALWAYS_JOIN"/>
        </jdbc:outbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="query2" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database">
            <jdbc:query key="query2" value="insert into Bar (field1) values ('foo')"/>
            <jdbc:transaction action="ALWAYS_JOIN"/>
        </jdbc:outbound-endpoint>
    </flow>

</mule>

这里没有显示,我还有一个默认的异常捕获策略,它只是将错误的有效负载写入文件。我不知道我是否需要明确地回滚,但我没有找到方法。

任何帮助将非常感激。

4

2 回答 2

3

经过两天的头撞骡子后,我终于成功地完成了这项工作。
结论:

  • 您需要使用jTDS自己的 DataSource (net.sourceforge.jtds.jdbcx.JtdsDataSource)
  • 你需要使用mule的TransactionFactory (org.mule.transport.jdbc.JdbcTransactionFactory)
  • 您需要使用未记录(或至少记录不足)的标签来开始交易
  • jTDS ando/或 Mule 很烂(我不知道谁对这个特定问题负责)。
  • Mule 文档很糟糕:这个这个
  • MuleStudio 很浪费时间(Sloooow,有缺陷,无法通过 GUI 连接事务,断言异常,如果您拖放脚本,流程会变得混乱,等等)。

最终工作流程:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:jdbc="http://www.mulesoft.org/schema/mule/jdbc" xmlns:http="http://www.mulesoft.org/schema/mule/http"
    xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans"
    version="CE-3.3.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd 
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd 
http://www.mulesoft.org/schema/mule/jdbc http://www.mulesoft.org/schema/mule/jdbc/current/mule-jdbc.xsd 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd 
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd ">

    <spring:beans>
        <spring:bean id="jtdsDataSource" name="jtdsDataSource" class="net.sourceforge.jtds.jdbcx.JtdsDataSource">
            <spring:property name="user" value="your_user" />
            <spring:property name="password" value="your_password" />
            <spring:property name="databaseName" value="your_database" />
            <spring:property name="serverName" value="your_host" />
        </spring:bean>

        <spring:bean id="transactionFactory" name="transactionFactory" class="org.mule.transport.jdbc.JdbcTransactionFactory" />

    </spring:beans>

    <jdbc:connector name="dbConnector" dataSource-ref="jtdsDataSource" validateConnections="true" queryTimeout="-1" pollingFrequency="0" doc:name="Database" />

    <flow name="TriggerTxFlow" doc:name="TriggerTxFlow">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP" />
        <vm:outbound-endpoint exchange-pattern="request-response" path="toTxFlow" doc:name="VM" />
    </flow>
    <flow name="TxFlow" doc:name="TxFlow">
        <vm:inbound-endpoint exchange-pattern="request-response" path="toTxFlow" doc:name="VM">
            <custom-transaction factory-ref="transactionFactory" action="ALWAYS_BEGIN" timeout="10" />
        </vm:inbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert" queryTimeout="-1" connector-ref="dbConnector" doc:name="Database">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert" value="insert into test values (1, 'Test 1')" />
        </jdbc:outbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert2" queryTimeout="-1" connector-ref="dbConnector" doc:name="Database">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert2" value="insert into test values (2, 'Test 2')" />
        </jdbc:outbound-endpoint>
    </flow>
</mule>
于 2012-12-05T02:39:54.173 回答
2

在 Mule 中,您可以访问事务管理器。例如,您可以通过将其添加到配置中来配置 jboss 事务管理器:

<jbossts:transaction-manager/>

使用 XA 不需要自定义事务。您只需要在每个端点内使用 xa-transaction 元素。

在这里,您已更新配置以使用 Mule 中的 xa 事务。

<?xml version="1.0" encoding="UTF-8"?>
<mule version="CE-3.3.0">

    <jdbc:connector name="jdbcConnector" dataSource-ref="dataSource"
        transactionPerMessage="true" queryTimeout="20000" pollingFrequency="10000"
        doc:name="Database" validateConnections="false">
    </jdbc:connector>

    <flow name="flow1" doc:name="flow1">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost"      port="8081" doc:name="HTTP"/>
        <vm:outbound-endpoint exchange-pattern="request-response" path="toFlow2" doc:name="VM"/>
    </flow>

    <flow name="flow2" doc:name="flow2">
        <vm:inbound-endpoint exchange-pattern="request-response" path="toFlow2" doc:name="VM">
            <xa-transaction action="ALWAYS_BEGIN" timeout="10"/>
        </vm:inbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="query1" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database">
            <jdbc:query key="query1" value="insert into Foo (field1) values ('bar')"/>
            <xa-transaction action="ALWAYS_JOIN"/>
        </jdbc:outbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="query2" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database">
            <jdbc:query key="query2" value="insert into Bar (field1) values ('foo')"/>
            <xa-transaction action="ALWAYS_JOIN"/>
         </jdbc:outbound-endpoint>
     </flow>

我同意文档不如应有的那么好,但它正在变得更好。事实上,你提到的第一页是从头开始重写的。

HTH,巴勃罗。

于 2013-04-05T18:29:09.817 回答