Spring Integration 中所有这些基于队列的通道默认情况下仅将消息存储在内存中。当需要持久性时,您可以message-store
在 ' ' 元素中提供一个 ' ' 属性queue
来引用持久性 MessageStore 实现,或者您可以将本地通道替换为由持久性代理支持的通道,例如 JMS 支持的通道或通道适配器。后一个选项允许您利用任何 JMS 提供程序的实现来实现消息持久性。
您可以通过添加 message-store 属性来配置任何QueueChannel
消息存储,如下所示。
Spring Integration 通过以下方式为消息存储模式提供支持:a) 定义org.springframework.integration.store.MessageStore
策略接口,b) 提供此接口的多个实现,以及 c) 在所有能够缓冲消息的组件上公开消息存储属性,以便您可以注入任何实现 MessageStore 接口的实例。
我的示例使用JDBC Message Store,但还有其他几个可用的选项。
<bean id="myDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${my.jdbc.driver}"/>
<property name="url" value="${my.jdbc.url}"/>
<property name="username" value="${my.jdbc.username}"/>
<property name="password" value="${my.jdbc.password}"/>
<property name="minEvictableIdleTimeMillis" value="300000"/>
<property name="timeBetweenEvictionRunsMillis" value="60000"/>
<property name="connectionProperties" value="SetBigStringTryClob=true;"/>
</bean>
<!-- The poller is needed by any of the QueueChannels -->
<integration:poller id="myPoller" fixed-rate="5000" default="true"/>
<!-- The MessageStore is needed to persist messages used by any of the QueueChannels -->
<int-jdbc:message-store id="myMessageStore" data-source="myDataSource" table-prefix="MY_INT_"/>
<!-- Main entry point into the process -->
<integration:gateway id="myGateway"
service-interface="com.mycompany.myproject.integration.gateways.myGateway"
default-request-channel="myGatewayChannel"
/>
<!-- Map the initial input channel to the first step, MyFirstService -->
<integration:channel id="myGatewayChannel">
<integration:queue capacity="1000"/>
<integration:queue message-store="myMessageStore" capacity="1000"/>
</integration:channel>
<!-- Step 1: My First Service -->
<integration:service-activator
id="myFirstServiceActivator"
input-channel="myGatewayChannel"
output-channel="myNextChannel"
ref="myFirstService"
method="process"/>
<!-- LONG running process. Setup asynchronous queue channel. -->
<integration:channel id="myNextChannel">
<integration:queue capacity="1000"/>
<integration:queue message-store="myMessageStore" capacity="1000"/>
</integration:channel>