我有两个 IntegrationFlows 都从 Apache Kafka 接收消息
第一个 IntegrationFlow - 在输入通道中,Consumer1(concurrency=4) 读取topic_1
第二个 IntegrationFlow - 在输入通道中,Consumer2(concurrency=4) 读取topic_2
但是这两个 IntegrationFlows将消息发送到输出通道,其中指定了一个公共类 MyMessageHandler
像这样:
@Bean
public IntegrationFlow sendFromQueueFlow1(MyMessageHandler message) {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory1, "topic_1")
.configureListenerContainer(configureListenerContainer_priority1)
)
.handle(message)
.get();
}
@Bean
public IntegrationFlow sendFromQueueFlow2(MyMessageHandler message) {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory2, "topic_2")
.configureListenerContainer(configureListenerContainer_priority2)
)
.handle(message)
.get();
}
MyMessageHandler类有方法send(message),此方法将消息进一步传递给另一个服务
class MyMessageHandler {
protected void handleMessageInternal(Message<?> message)
{
String postResponse = myService.send(message); // remote service calling
msgsStatisticsService.sendMessage(message, postResponse);
// *******
}
}
在每个 IntegrationFlow 中,有 4 个 Consumer-threads 在工作(总共 8 个线程),它们都进入一个 MyMessageHandler 类,进入一个方法send()
会出现什么问题? 两个IntegrationFlow,当他们将消息传递给一个公共类时,他们看到对方了吗???我需要在 MyMessageHandler 类中提供线程安全吗???我需要在 send() 方法前面加上同步这个词吗???
但是如果我们做第三个IntegrationFlow呢?
以便只有一个 IntegrationFlow 可以通过自身将消息传递给 MyMessageHandler 类?那么它是线程安全的吗?例子:
@Bean
public IntegrationFlow sendFromQueueFlow1() {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory1, "topic_1")
.configureListenerContainer(configureListenerContainer_priority1)
)
.channel(**SOME_CHANNEL**())
.get();
}
@Bean
public IntegrationFlow sendFromQueueFlow2() {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory2, "topic_2")
.configureListenerContainer(configureListenerContainer_priority2)
)
.channel(**SOME_CHANNEL**())
.get();
}
@Bean
public MessageChannel **SOME_CHANNEL**() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
public IntegrationFlow sendALLFromQueueFlow(MyMessageHandler message) {
return IntegrationFlows
.from(**SOME_CHANNEL**())
.handle(message)
.get();
}