2

我有一个配置的 spring 集成管道,其中 xml 文件被解析为各种对象。这些对象正在通过几个通道端点,它们在这些端点上略有修改——没什么特别的,只是添加了一些属性。

管道的最后一个端点是持久化器,对象在数据库中持久化。可能存在重复,因此在此端点中还会检查对象是已持久化还是新对象。我使用消息驱动的架构,带有简单的直接渠道。

<int:channel id="parsedObjects1" />
<int:channel id="parsedObjects2" />
<int:channel id="processedObjects" />
<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" />
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" />
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" />

目前只有一个数据源,我从中获取 xml 文件,一切顺利。当我需要附加第二个数据源时,问题就开始了。这些文件同时出现,所以我希望它们并行处理。所以,我放置了两个解析器实例,每个解析器都通过管道发送消息。我有直接通道的配置会产生并发问题,所以我尝试修改它。我已经尝试了 spring 集成文档中的几种配置,但到目前为止都没有成功。

我已经尝试过将最大池大小配置为 1 的调度程序 - 每个通道端点中的每条消息一个线程。

<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
    <int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
<int:channel id="parsedObjects2" >
    <int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
<int:channel id="processedObjects" >
    <int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>

我也尝试了 queue-poller 配置:

<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
    <int:rendezvous-queue/>
</int:channel>
<int:channel id="parsedObjects2" >
    <int:rendezvous-queue/>
</int:channel>
<int:channel id="processedObjects" >
    <int:rendezvous-queue/>
</int:channel>

<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
</int:service-activator>

基本上,我想摆脱通道端点中的任何竞争条件——在我的情况下是持久化。持久化通道端点应该为每条消息阻塞,因为如果它并行运行,我会在数据库中保留许多重复项。

编辑:

在我完成了一些调试之后,问题似乎出在端点逻辑而不是配置上。通过管道发送到持久化器的一些对象也存储在本地缓存中,直到文件解析完成 - 它们稍后也通过管道发送,以将一些连接表作为其他域的一部分持久化实体。碰巧使用上述配置,一些对象在管道中第二次发送时尚未持久化,所以最后我在数据库中得到了重复项。我在春季集成方面相当新,所以可能在这一点上我会问更一般的问题。在具有多个数据源的设置中 - 意味着解析器的多个实例等:

  1. 有没有一种通用的方法(最好的方法)来配置管道以启用并行化?
  2. 如果有需要,有没有办法序列化消息处理?

欢迎任何建议。提前致谢。

4

3 回答 3

1

首先,您能描述一下“并发问题”是什么吗?理想情况下,您不需要序列化消息处理,因此这是一个很好的起点。

其次,您配置的线程池不会完全序列化。您将在池中使用 1 个线程,但您选择的拒绝策略会导致调用者线程在队列满载的情况下运行任务本身(基本上是节流)。这意味着您将同时从池中获得一个调用者运行的线程。

于 2012-05-22T13:09:19.597 回答
0

我设法让管道正常工作。我不确定我是否会保留当前配置,或者进行更多实验,但现在,这是我最终得到的配置:

<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
<int:queue capacity="1000" />
</int:channel>
<int:channel id="parsedObjects2" >
<int:queue capacity="1000" />
</int:channel>
<int:channel id="processedObjects" >
<int:queue capacity="1000" />
</int:channel>

<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="100"  fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="100"  fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
</int:service-activator>
于 2012-05-24T10:53:26.533 回答
0

对于您的方案,我能想到的最佳方法是:

使您的 parsedObject1 和 parsedObject2 成为正常的队列通道,队列的容量可以适当设置(随时说 25):

<int:channel id="parsedObjects1" >
    <int:queue />
</int:channel>

现在,您的 xml 处理器针对 2 个通道 - parsedObjects1 和 parsedObjects2,将处理 xml 并应输出到processedObjects通道。您可以使用与您的配置类似的配置,除了我明确指定了处理对象通道 - :

<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" output-channel="processedObjects">
    <int:poller task-executor="channelTaskExecutor"/>
</int:service-activator>

第三步是我将偏离您的配置的地方,此时您说要序列化持久性,最好的方法是通过池大小为 1 的不同任务执行器来完成,这样只有 1 个实例您的持久化器在任何时间点都在运行:

<task:executor id="persisterpool" pool-size="1"/>
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" >
    <int:poller task-executor="persisterpool" fixed-delay="2"/>
</int:service-activator>
于 2012-05-23T08:10:35.290 回答