我正在开发一个 Spring 应用程序,它将每分钟接收大约 500 条 xml 消息。下面的 xml 配置只允许每分钟处理大约 60 条消息,其余消息存储在队列中(持久保存在 DB 中)并以每分钟 60 条消息的速度检索。
尝试从多个来源阅读文档,但仍不清楚 Poller 与任务执行者相结合的作用。我对目前每分钟处理 60 条消息的理解是因为轮询器配置中的“固定延迟”值设置为 10(因此它将在 1 分钟内轮询 6 次)和“max-messages-per-poll”设置为 10,因此每分钟处理 6x10=60 条消息。
如果我的理解不正确,请指教,并帮助修改xml配置以实现以更高的速率处理传入的消息。
任务执行者的作用也不清楚——是否意味着 pool-size="50" 将允许 50 个线程并行运行以处理轮询器轮询的消息?
我想要的全部是:
- JdbcChannelMessageStore 用于将传入的 xml 消息存储在数据库 (INT_CHANNEL_MESSAGE) 表中。这是必需的,因此在服务器重新启动的情况下,消息仍存储在表中并且不会丢失。
- 传入消息将并行执行,但数量受控/有限。根据系统处理这些消息的能力,我想限制系统应该并行处理多少条消息。
- 由于此配置将在集群中的多个服务器上使用,因此任何服务器都可以拾取任何消息,因此不会导致两个服务器处理同一消息的任何冲突。希望这是由 Spring Integration 处理的。
抱歉,如果这已在其他地方得到回答,但在阅读了许多帖子后,我仍然不明白这是如何工作的。
提前致谢。
<!-- Message Store configuration start -->
<!-- JDBC message store configuration -->
<bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
<property name="region" value="TX_TIMEOUT"/>
<property name="usingIdCache" value="true"/>
</bean>
<bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider" />
<int:transaction-synchronization-factory
id="syncFactory">
<int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>
<task:executor id="pool" pool-size="50" queue-capacity="100" rejection-policy="CALLER_RUNS" />
<int:poller id="messageStorePoller" fixed-delay="10"
receive-timeout="500" max-messages-per-poll="10" task-executor="pool"
default="true" time-unit="SECONDS">
<int:transactional propagation="REQUIRED"
synchronization-factory="syncFactory" isolation="READ_COMMITTED"
transaction-manager="transactionManager" />
</int:poller>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<!-- 1) Store the message in persistent message store -->
<int:channel id="incomingXmlProcessingChannel">
<int:queue message-store= "store" />
</int:channel>
<!-- 2) Check in, Enrich the headers, Check out -->
<!-- (This is the entry point for WebService requests) -->
<int:chain input-channel="incomingXmlProcessingChannel" output-channel="incomingXmlSplitterChannel">
<int:claim-check-in message-store="simpleMessageStore" />
<int:header-enricher >
<int:header name="CLAIM_CHECK_ID" expression="payload"/>
<int:header name="MESSAGE_ID" expression="headers.id" />
<int:header name="IMPORT_ID" value="XML_IMPORT"/>
</int:header-enricher>
<int:claim-check-out message-store="simpleMessageStore" />
</int:chain>
在 Artem 回复后添加:
谢谢阿尔乔姆。因此,在 10 秒的固定延迟后发生的每次轮询(根据上面的配置),任务执行器将检查任务队列,如果可能(并且需要)启动一个新任务?根据“maxMessagesPerPoll”配置,每个 pollingTask(线程)将从消息存储(队列)接收“10”条消息。
为了获得更高的传入消息处理时间,我是否应该减少 poller 上的 fixedDelay 以便任务执行器可以启动更多线程?如果我将 fixedDelay 设置为 2 秒,将启动一个新线程来执行 10 条消息,并且一分钟内将启动大约 30 个这样的线程,在一分钟内处理“大约”300 条传入消息。
很抱歉在一个问题上问得太多——只是想解释完整的问题。