1

我刚刚开始学习 spring-integration 我想在队列上接收消息并并行执行 2 个步骤:步骤 1 -> 使用 bean 处理它步骤 2 -> 转换并将其发送到另一个队列。就像是 :

return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queue1")
            .configureContainer(simpleMessageListenerContainerSpec -> {
                simpleMessageListenerContainerSpec.concurrentConsumers(3);
            }))
            .log(message -> "SERVICE EVENT QUEUE : Received Message : " + message.getPayload())
            .handle(serviceBean, "process")
            .<String,String>transform(String::toLowerCase)
            .log(message -> "SERVICE EVENT QUEUE : Transformed Message : " + message.getPayload())
            .handle(
                    Amqp.outboundAdapter(rabbitTemplate)
                            .exchangeName("exchange")
                            .routingKey("queue2.routing"))
            .get();

我错过了什么?第一个句柄之后的动作没有被执行。我想我没有正确理解这部分。另外我怎样才能并行执行这两个步骤?

4

1 回答 1

2

您应该从理论入手,了解 Spring Integration 中的许多概念和组件。

“并行 2 步” - 正是一种发布-订阅模式:https ://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html并且 Spring Integration 为其提供了一个实现:https ://docs.spring.io /spring-integration/docs/current/reference/html/core.html#channel-implementations-publishsubscribechannel。正如您根据文档看到的那样,为了实现并行性,您需要使用TaskExecutor.

使用 Java DSL,我们为发布-订阅配置提供了高级 API:

https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-subflows

所以,为了让你的.handle(serviceBean, "process").<String,String>transform(String::toLowerCase)平行,你需要有这样的东西:

return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queue1")
        .configureContainer(simpleMessageListenerContainerSpec -> {
            simpleMessageListenerContainerSpec.concurrentConsumers(3);
        }))
        .log(message -> "SERVICE EVENT QUEUE : Received Message : " + message.getPayload())
        .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                .subscribe(f -> f
                         .handle(serviceBean, "process")))
        .<String,String>transform(String::toLowerCase)
        .log(message -> "SERVICE EVENT QUEUE : Transformed Message : " + message.getPayload())
        .handle(
                Amqp.outboundAdapter(rabbitTemplate)
                        .exchangeName("exchange")
                        .routingKey("queue2.routing"))
        .get();
于 2020-06-30T17:24:53.560 回答