1

我有一个持久的事务队列,其中包含我需要通过异步协议发送的消息。每条消息都需要在自己的事务中发送,但是在给定时间正在传输的消息数量排除了使用thread-per-message,而吞吐量要求排除了持久的中间状态。

查看 的代码JmsTransactionManager,我看到它正在使用TransactionSynchronizationManager,它将事务资源存储在ThreadLocal. 所以看来我需要实现一个PlatformTransactionManager以某种方式在一个线程中引导多个事务。这似乎有点极端……

是否有一些 Spring Integration 单元的安排可以避免这种复杂性?我是否应该查看 JTA/XA 信息?

4

1 回答 1

1

Spring Integration 中所有这些基于队列的通道默认情况下仅将消息存储在内存中。当需要持久性时,您可以message-store在 ' ' 元素中提供一个 ' ' 属性queue来引用持久性 MessageStore 实现,或者您可以将本地通道替换为由持久性代理支持的通道,例如 JMS 支持的通道或通道适配器。后一个选项允许您利用任何 JMS 提供程序的实现来实现消息持久性。

您可以通过添加 message-store 属性来配置任何QueueChannel消息存储,如下所示。

Spring Integration 通过以下方式为消息存储模式提供支持:a) 定义org.springframework.integration.store.MessageStore策略接口,b) 提供此接口的多个实现,以及 c) 在所有能够缓冲消息的组件上公开消息存储属性,以便您可以注入任何实现 MessageStore 接口的实例。

我的示例使用JDBC Message Store,但还有其他几个可用的选项。

<bean id="myDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="${my.jdbc.driver}"/>
    <property name="url" value="${my.jdbc.url}"/>
    <property name="username" value="${my.jdbc.username}"/>
    <property name="password" value="${my.jdbc.password}"/>
    <property name="minEvictableIdleTimeMillis" value="300000"/>
    <property name="timeBetweenEvictionRunsMillis" value="60000"/>
    <property name="connectionProperties" value="SetBigStringTryClob=true;"/>
</bean>

<!-- The poller is needed by any of the QueueChannels -->
<integration:poller id="myPoller" fixed-rate="5000" default="true"/>

<!-- The MessageStore is needed to persist messages used by any of the QueueChannels -->
<int-jdbc:message-store id="myMessageStore" data-source="myDataSource" table-prefix="MY_INT_"/>

<!-- Main entry point into the process -->
<integration:gateway id="myGateway"
                     service-interface="com.mycompany.myproject.integration.gateways.myGateway"
                     default-request-channel="myGatewayChannel"
        />

<!-- Map the initial input channel to the first step, MyFirstService -->
<integration:channel id="myGatewayChannel">
    <integration:queue capacity="1000"/>
    <integration:queue message-store="myMessageStore" capacity="1000"/>
</integration:channel>

<!-- Step 1: My First Service -->
<integration:service-activator
        id="myFirstServiceActivator"
        input-channel="myGatewayChannel"
        output-channel="myNextChannel"
        ref="myFirstService"
        method="process"/>    

<!-- LONG running process. Setup asynchronous queue channel. -->
<integration:channel id="myNextChannel">
    <integration:queue capacity="1000"/>
    <integration:queue message-store="myMessageStore" capacity="1000"/>
</integration:channel>    
于 2012-03-08T17:03:39.977 回答