您的解决方案确实是正确的,但是您需要考虑不要将消息转换为一个,Exectuor因为这样您就会跳出事务边界。
您在同一个线程中处理了 10 条消息这一事实正是一个实现细节,它看起来像这样:
AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
int count = 0;
while (AbstractPollingEndpoint.this.initialized
&& (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
|| count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
try {
if (!Poller.this.pollingTask.call()) {
break;
}
count++;
}
因此,我们轮询消息,直到maxMessagesPerPoll在同一个线程中。
为了使它真正更加并行并且仍然保持事务不要丢失您需要考虑使用的消息fixedRate:
/**
* Specify whether the periodic interval should be measured between the
* scheduled start times rather than between actual completion times.
* The latter, "fixed delay" behavior, is the default.
*/
public void setFixedRate(boolean fixedRate)
并增加用于TaskScheduler轮询的线程数量。您可以通过声明一个ThreadPoolTaskScheduler名称为 as 的 beanIntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME来覆盖池中的默认值 as 10。或者使用全局属性来覆盖默认的池大小TaskScheduler:https ://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/configuration.html#global-properties