3

在入站网关接收到字符串消息后,我使用发布-订阅通道将其并行发送到记录器以记录消息并发送到转换器以转换消息。我希望这两项活动同时进行。

我的问题很简单 - spring 集成中的发布订阅通道是否将消息并行发送到它的订阅者?

下面是来自 spring-integration-context.xml 的代码片段。

<int:gateway id="gateway" service-interface="com.test.Gateway">
    </int:gateway>
<int:publish-subscribe-channel id="publishsubscribechannel" />

<int:service-activator input-channel="publishsubscribechannel"
        method="transformEvent" ref="transformer" output-channel="transformerreplychannel">
</int:service-activator>
<int:service-activator input-channel="publishsubscribechannel"
        method="logMessage" ref="logger">
</int:service-activator>

这里的转换器和记录器是发布订阅频道的 2 个订阅者。在此设置中,默认情况下,从网关到记录器和转换器的消息流会异步发生吗??...或者我需要做一些其他配置来实现相同的目的。

4

3 回答 3

7

...或使用 JavaConfig

@Bean
public MessageChannel publishsubscribechannel() {

return new PublishSubscribeChannel(executor());
} 

@Bean
public ThreadPoolTaskExecutor executor() {

ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(10);
pool.setMaxPoolSize(10);
pool.setWaitForTasksToCompleteOnShutdown(true);
return pool;
}
于 2015-09-08T12:15:35.610 回答
6

默认情况下,它按顺序运行。所以在你的情况下,它将是transformer,然后是logger。如果要并行运行,需要指定task-executor

<int:publish-subscribe-channel id="publishsubscribechannel" task-executor="executor" />
...
<task:executor id="executor" pool-size="10" />

并且通过使用任务执行器,消息处理是异步执行的。

于 2014-01-09T05:15:50.267 回答
5

正如 Spring Integration 文档中所述

发布订阅频道

PublishSubscribeChannel 实现将发送给它的任何消息广播到其所有订阅的处理程序。这最常用于发送主要作用是通知的事件消息,而不是通常旨在由单个处理程序处理的文档消息。请注意,PublishSubscribeChannel 仅用于发送。由于它在调用其 send(Message) 方法时直接向其订阅者广播,因此消费者无法轮询消息(它没有实现 PollableChannel,因此没有 receive() 方法)。相反,任何订阅者本身必须是一个MessageHandler,并且订阅者的handleMessage(Message) 方法将被依次调用。

在 3.0 版之前,在没有订阅者的 PublishSubscribeChannel 上调用 send 方法会返回 false。与 MessagingTemplate 结合使用时,会引发 MessageDeliveryException。从版本 3.0 开始,行为发生了变化,如果至少存在最少的订阅者(并成功处理消息),则始终认为发送成功。可以通过设置 minSubscribers 属性来修改此行为,该属性默认为 0。

[注意] 注意 如果使用 TaskExecutor,则仅使用正确数量的订阅者来进行此确定,因为消息的实际处理是异步执行的。

请注意注。它确实提到如果使用 TaskExecutor,消息处理将是异步的。

所以,是的,您必须添加一个 TaskExecutor 才能使其异步。

于 2014-01-09T05:30:39.847 回答