6

我们的应用程序是使用 Spring Integration Framework 设计的。完整的消息操作流从侦听队列开始,其中使用了 JMS 消息驱动的适配器,之后定义了基于通道的队列端点,并且每个端点都由服务激活器处理。

我们目前处于性能阶段,我们正在产生 200 条消息请求。最初我们观察到消息不是并行执行的,经过阅读后发现通过向 JMS 消息驱动的侦听器适配器添加 concurrent-consumer 和 max-concurrent-consumer 属性将有助于启用多线程模式。确实,这有所帮助,但仍处于进程之间,我仍然看到单线程效果。这是由于端点的定义方式造成的吗?为每个端点添加队列容量有什么好处?您是否认为通过向每个 Channel 端点定义添加队列容量将再次有助于在多线程模式下运行。

根据要求的设计快照:

动作流

4

3 回答 3

0

查看您的流程图,看起来流程有很多单线程元素,并且可以优化为更多并发,并有望获得更高的吞吐量。

从消息驱动的通道适配器开始(您还没有显示配置),可以配置为具有超过 1 个默认使用者,并且可以使每个消耗周期消耗合理数量的消息

经过消息驱动的通道适配器,将消息放入直接通道 1 的线程将不幸地运行其余的流程,因为其他任何地方都没有缓冲,所以当您的消息被放入“直接通道 1”时,它将立即在同一个线程中调用路由器,然后在同一个线程中调用服务激活器和邮件适配器或 JMS 出站通道适配器。此处的更改可能是引入队列通道而不是直接通道 1,这样使用消息的线程只需将消息放置在队列通道中即可完成。

除了直接通道 1(更改为队列通道 1),我认为它可以是单线程的,具体取决于您的流量有多快或多慢,如果说邮件适配器很慢,那么直接通道 4 也可以作为队列通道,与直接频道 5

您能否看看这些以粗体突出显示的更改是否有助于改善流程

于 2012-06-27T01:00:01.560 回答
0

查看频道的确切定义会很有帮助。

默认情况下,Spring 通道在发送者的线程中使用它的消息。换句话说,它是同步的。如果您希望通道异步使用消息,则必须指定一个 TaskExecutor。见http://static.springsource.org/spring/docs/3.0.5.RELEASE/reference/scheduling.html

于 2012-06-26T20:28:24.090 回答
0

为了提高性能,我建议使用带有任务执行器的 executorchannel 来控制线程池大小的数量。这样,当消息到达 jms 队列时,消费者会收到消息并在单独的线程中处理流。请记住,在这种配置中,多线程工作由 taskexecutor 通道执行,taht 将在单独的线程中执行消息的接收,因此您已经很好地考虑了您想要的多线程等级。

对于队列消息通道,确实需要一个轮询器来轮询通道以执行接收,队列容量是幕后原子队列的容量。

你可以在xml中这样配置am executor channel

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:si="http://www.springframework.org/schema/integration"
       xmlns:tx="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration 
        http://www.springframework.org/schema/integration/spring-integration.xsd 
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

       <tx:executor id="taskExecutor" pool-size="10" queue-capacity="10"/>
       <si:channel id="ch" >
              <si:dispatcher task-executor="taskExecutor"/>       
       </si:channel>
</beans>

或以这种方式在 java-dsl

 @Bean
    public IntegrationFlow storeBookPageByPage(ConnectionFactory connectionFactory,
                                               @Qualifier("createBookQueue") Destination createBookQueue,
                                               @Qualifier("createBookResultQueue") Destination createBookResultQueue,
                                               PdfBookMasterRepository pdfBookMasterRepository,
                                               BookRepository bookRepository){
        String tempFilePathBaseDir = environment.getProperty("bookService.storeBookPageByPage.tempFilePathBaseDir");

        return IntegrationFlows.from(Jms.messageDriverChannelAdapter(connectionFactory)
                .destination(createBookQueue)
                .errorChannel(storeBookPageByPageErrorChannel()))
                .channel(channels -> channels.executor(Executors.newScheduledThreadPool(5)))
                .....

        }
于 2016-03-11T22:32:45.723 回答