0

我有两个 IntegrationFlows 都从 Apache Kafka 接收消息

第一个 IntegrationFlow - 在输入通道中,Consumer1(concurrency=4) 读取topic_1

第二个 IntegrationFlow - 在输入通道中,Consumer2(concurrency=4) 读取topic_2

但是这两个 IntegrationFlows将消息发送到输出通道,其中指定了一个公共类 MyMessageHandler

像这样:

@Bean
public IntegrationFlow sendFromQueueFlow1(MyMessageHandler message) {
    return IntegrationFlows
            .from(Kafka
                    .messageDrivenChannelAdapter(consumerFactory1, "topic_1")
                    .configureListenerContainer(configureListenerContainer_priority1)
                    )
            .handle(message)
            .get();
}


@Bean
public IntegrationFlow sendFromQueueFlow2(MyMessageHandler message) {
    return IntegrationFlows
            .from(Kafka
                    .messageDrivenChannelAdapter(consumerFactory2, "topic_2")
                    .configureListenerContainer(configureListenerContainer_priority2)
                    )
            .handle(message)
            .get();
}

MyMessageHandler类有方法send(message),此方法将消息进一步传递给另一个服务

class MyMessageHandler {
            
    protected void handleMessageInternal(Message<?> message)
    {
        String postResponse = myService.send(message); // remote service calling
        msgsStatisticsService.sendMessage(message, postResponse);
        // *******
    }
}

在每个 IntegrationFlow 中,有 4 个 Consumer-threads 在工作(总共 8 个线程),它们都进入一个 MyMessageHandler 类,进入一个方法send()

会出现什么问题? 两个IntegrationFlow,当他们将消息传递给一个公共类时,他们看到对方了吗???我需要在 MyMessageHandler 类中提供线程安全吗???我需要在 send() 方法前面加上同步这个词吗???

但是如果我们做第三个IntegrationFlow呢?

以便只有一个 IntegrationFlow 可以通过自身将消息传递给 MyMessageHandler 类?那么它是线程安全的吗?例子:

@Bean
public IntegrationFlow sendFromQueueFlow1() {
return IntegrationFlows
        .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory1, "topic_1")
                .configureListenerContainer(configureListenerContainer_priority1)
        )
        .channel(**SOME_CHANNEL**())
        .get();

}

@Bean
public IntegrationFlow sendFromQueueFlow2() {
return IntegrationFlows
        .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory2, "topic_2")
                .configureListenerContainer(configureListenerContainer_priority2)
        )
        .channel(**SOME_CHANNEL**())
        .get();

}

@Bean
public MessageChannel **SOME_CHANNEL**() {

    DirectChannel channel = new DirectChannel();
    return channel;
 }

@Bean
public IntegrationFlow sendALLFromQueueFlow(MyMessageHandler message) {

return IntegrationFlows
        .from(**SOME_CHANNEL**())
        .handle(message)
        .get();
}
4

1 回答 1

0

您需要使您的处理程序代码线程安全。

使用synchronized整个方法,您将有效地禁用并发性。

最好使用线程安全技术——没有可变字段或使用有限的同步块,只围绕关键代码。

于 2021-02-10T15:21:20.643 回答