我有一个配置的 spring 集成管道,其中 xml 文件被解析为各种对象。这些对象正在通过几个通道端点,它们在这些端点上略有修改——没什么特别的,只是添加了一些属性。
管道的最后一个端点是持久化器,对象在数据库中持久化。可能存在重复,因此在此端点中还会检查对象是已持久化还是新对象。我使用消息驱动的架构,带有简单的直接渠道。
<int:channel id="parsedObjects1" />
<int:channel id="parsedObjects2" />
<int:channel id="processedObjects" />
<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" />
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" />
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" />
目前只有一个数据源,我从中获取 xml 文件,一切顺利。当我需要附加第二个数据源时,问题就开始了。这些文件同时出现,所以我希望它们并行处理。所以,我放置了两个解析器实例,每个解析器都通过管道发送消息。我有直接通道的配置会产生并发问题,所以我尝试修改它。我已经尝试了 spring 集成文档中的几种配置,但到目前为止都没有成功。
我已经尝试过将最大池大小配置为 1 的调度程序 - 每个通道端点中的每条消息一个线程。
<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
<int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
<int:channel id="parsedObjects2" >
<int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
<int:channel id="processedObjects" >
<int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
我也尝试了 queue-poller 配置:
<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
<int:rendezvous-queue/>
</int:channel>
<int:channel id="parsedObjects2" >
<int:rendezvous-queue/>
</int:channel>
<int:channel id="processedObjects" >
<int:rendezvous-queue/>
</int:channel>
<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1" fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1" fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1" fixed-rate="2" />
</int:service-activator>
基本上,我想摆脱通道端点中的任何竞争条件——在我的情况下是持久化。持久化通道端点应该为每条消息阻塞,因为如果它并行运行,我会在数据库中保留许多重复项。
编辑:
在我完成了一些调试之后,问题似乎出在端点逻辑而不是配置上。通过管道发送到持久化器的一些对象也存储在本地缓存中,直到文件解析完成 - 它们稍后也通过管道发送,以将一些连接表作为其他域的一部分持久化实体。碰巧使用上述配置,一些对象在管道中第二次发送时尚未持久化,所以最后我在数据库中得到了重复项。我在春季集成方面相当新,所以可能在这一点上我会问更一般的问题。在具有多个数据源的设置中 - 意味着解析器的多个实例等:
- 有没有一种通用的方法(最好的方法)来配置管道以启用并行化?
- 如果有需要,有没有办法序列化消息处理?
欢迎任何建议。提前致谢。