3

我想同时处理一组消息,但除非我将虚拟机设置为请求-响应,否则我无法使它们成为事务性的……在这种情况下,处理不是并发的。

Mule 文档指出“Mule 事务是在同步端点上配置的”,但我不太了解这个限制。
很明显,在您希望成为事务性的流中,不应产生异步流,但(对我而言)不清楚为什么不能(从非 tx 主流)启动任意数量的异步流交易性的。

换句话说,为什么这可以正常工作:

在此处输入图像描述

但是如果我将虚拟机更改为“单向”,它会失败:

org.mule.transaction.IllegalTransactionStateException: Can only bind "javax.sql.DataSource/java.sql.Connection" type resources

有没有解决的办法?

流的 XML:

<?xml version="1.0" encoding="UTF-8"?>
<mule>
    <spring:beans>
        <spring:bean id="dataSource" name="dataSource" class="org.enhydra.jdbc.standard.StandardDataSource" destroy-method="shutdown">
            <spring:property name="driverName" value="org.h2.Driver" />
            <spring:property name="url" value="jdbc:h2:tcp://localhost/~/mule" />
            <spring:property name="user" value="sa" />
            <spring:property name="password">
                <spring:value></spring:value>
            </spring:property>
        </spring:bean>

        <spring:bean id="transactionFactory" name="transactionFactory" class="org.mule.transport.jdbc.JdbcTransactionFactory" />

    </spring:beans>

    <jdbc:connector name="dbConnector" dataSource-ref="dataSource" validateConnections="true" queryTimeout="-1" pollingFrequency="0" doc:name="Database" />

    <flow name="triggerFlow" doc:name="triggerFlow">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP" />
        <set-payload value="#[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]" doc:name="[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]"/>
        <collection-splitter doc:name="Collection Splitter"/>
        <vm:outbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow" />
    </flow>

    <flow name="flow" doc:name="flow">
        <vm:inbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow">
            <custom-transaction factory-ref="transactionFactory" action="ALWAYS_BEGIN" timeout="10" />
        </vm:inbound-endpoint>
        <logger message="#[payload]" level="INFO" doc:name="Logger"/>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 1">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert" value="insert into test values (#[payload], 'Test 1')" />
        </jdbc:outbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert2" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 2">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert2" value="insert into test values (#[payload + 10], 'Test 2')" />
        </jdbc:outbound-endpoint>
    </flow>

</mule>

提前致谢。

4

3 回答 3

3

Mule 文档指出“Mule 事务是在同步端点上配置的”,但我不太了解这个限制。

这种限制是因为在 Mule 中和 Spring 中一样,更一般地在 Java 中,事务是线程绑定的。使用异步流,涉及多个线程,因此无法维护事务-线程关联。

所以不,您不能拆分/分叉/并行化/异步处理消息,并且在 Mule 中也有事务。

org.mule.transaction.IllegalTransactionStateException: 只能绑定“javax.sql.DataSource/java.sql.Connection”类型的资源

但这与第一个问题 IMO 无关:这是因为您通过 a 强制custom-transaction尝试在 JDBC 事务中注册 VM 端点。这是行不通的。如果要注册异构资源,请使用 XA 事务。

编辑:根据您在评论中所说,您不想在事务中注册 VM 端点,因此只需在此处注册 JDBC 端点:

<transactional action="ALWAYS_BEGIN">
    <jdbc:outbound-endpoint exchange-pattern="request-response"
        queryKey="insert" queryTimeout="-1" connector-ref="dbConnector"
        doc:name="insert into test values (1, 'Test 1')">
        <jdbc:transaction action="ALWAYS_JOIN" />
        <jdbc:query key="insert"
            value="insert into test values (1, 'Test 1')" />
    </jdbc:outbound-endpoint>
    <jdbc:outbound-endpoint exchange-pattern="request-response"
        queryKey="insert2" queryTimeout="-1" connector-ref="dbConnector"
        doc:name="insert into test values (2, 'Test 2')">
        <jdbc:transaction action="ALWAYS_JOIN" />
        <jdbc:query key="insert2"
            value="insert into test values (2, 'Test 2')" />
    </jdbc:outbound-endpoint>
</transactional>

这适用于one-way入站端点。

于 2013-01-11T01:47:06.723 回答
1

我设法使它工作。我必须添加一个中间异步流来调用同步/发送流:

在此处输入图像描述

我认为这是丑陋和不必要的,将它作为原始帖子调用它是完全可以的,但由于我无法理解的原因,Mule 让你为此跳过箍。

以下是流的 XML:

<?xml version="1.0" encoding="UTF-8"?>
    <mule>
    <spring:beans>
        <spring:bean id="dataSource" name="dataSource" class="org.enhydra.jdbc.standard.StandardDataSource" destroy-method="shutdown">
            <spring:property name="driverName" value="org.h2.Driver" />
            <spring:property name="url" value="jdbc:h2:tcp://localhost/~/mule" />
            <spring:property name="user" value="sa" />
            <spring:property name="password">
                <spring:value></spring:value>
            </spring:property>
        </spring:bean>

        <spring:bean id="transactionFactory" name="transactionFactory" class="org.mule.transport.jdbc.JdbcTransactionFactory" />

    </spring:beans>

    <jdbc:connector name="dbConnector" dataSource-ref="dataSource" validateConnections="true" queryTimeout="-1" pollingFrequency="0" doc:name="Database" />

    <flow name="triggerFlow" doc:name="triggerFlow">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP" />
        <set-payload value="#[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]" doc:name="[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]"/>
        <collection-splitter doc:name="Collection Splitter"/>
        <vm:outbound-endpoint exchange-pattern="one-way" path="async" doc:name="async" />
    </flow>
    <flow name="simpletransactionFlow1" doc:name="simpletransactionFlow1">
        <vm:inbound-endpoint exchange-pattern="one-way" path="async" doc:name="async"/>
        <vm:outbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow"/>
    </flow>

    <flow name="flow" doc:name="flow">
        <vm:inbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow">
            <custom-transaction factory-ref="transactionFactory" action="ALWAYS_BEGIN" timeout="10" />
        </vm:inbound-endpoint>
        <logger message="#[groovy:Thread.currentThread().getName()], payload=#[payload]" level="INFO" doc:name="Logger"/>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 1">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert" value="insert into test values (#[payload], 'Test 1')" />
        </jdbc:outbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert2" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 2">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert2" value="insert into test values (#[payload + 10], 'Test 2')" />
        </jdbc:outbound-endpoint>
    </flow>

</mule>
于 2013-01-11T17:07:02.547 回答
0

这在Mule 文档中进行了讨论。它可能需要稍微麻烦一点,但还不错,实际上只需要一个 VM 传输,而不是两个:

在此处输入图像描述

我更改它是因为我想使用 TCP 而不是 HTTP,但它基本上是相同的示例。

第一个流接收未处理的 TCP,用它做一些事情(这里只是一个字节数组到字符串,但可能你有验证或其他东西),将输入推送到 VM 连接器,然后将输入回显到 TCP 流。在 VM 连接器保证它可以释放它的消息的 TCP 源之后返回,如果它正在做一些低级别的保证传递。

VM 连接器是单向的,并且有一个连接到它的持久存储,因此它不会丢失消息。这在 Mule Studio 3.5 中显示了一个错误,但它工作正常。

然后中间流接管,并包含一个事务,因此除非业务逻辑子流成功完成,否则 VM 连接器不会释放消息。

最后子流程运行;目前只是 Thread.sleep()ing 5 秒钟,所以我可以看到它一切正常。Telnet 进入会立即回显到 Telnet 控制台,然后在五秒钟后再次回显。

希望有帮助!我只花了几个小时和骡子在一起,但这似乎是正确的。

于 2013-09-23T09:44:40.930 回答