我正在为我们的一个应用程序开发消息接口。该应用程序是一种服务,旨在接受“作业”,进行一些处理并返回结果(实际上以文件的形式)。
这个想法是使用 RabbitMQ 作为消息传递基础设施和 Spring AMQP 来处理协议特定的细节。
我不想将我的代码与 Spring AMQP 紧密耦合,所以我想使用 Spring Integration 来隐藏消息传递 API。所以基本上我想要这个:
消息发送到 RabbitMQ ====> Spring AMQP ====> Spring Integration ====> MyService ====> 一路回复 RabbitMQ
我正在尝试制定将其连接在一起所需的 XML 配置,但我遇到了多层次抽象和不同术语的问题。事实证明,在 Spring AMQP/RabbitMQ 之上找到一个演示 Spring Integration 的工作示例非常困难,尽管这种设置对我来说感觉非常“最佳实践”。
1)所以..那里有一些聪明的灵魂可以快速看看这个,也许会把我推向正确的方向?我需要什么,不需要什么?:-)
2) 理想情况下,队列应该是多线程的,这意味着 taskExecutor 应该将多条消息传递给我的 jobService 以进行并行处理。需要什么配置?
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
">
<context:component-scan base-package="com.myprogram.etc" />
<!-- Messaging infrastructure: RabbitMQ -->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="${ei.messaging.amqp.servername}" />
<property name="username" value="${ei.messaging.amqp.username}" />
<property name="password" value="${ei.messaging.amqp.password}" />
</bean>
<rabbit:connection-factory id="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory"/>
<!-- From RabbitMQ -->
<int-amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP" queue-names="our-product-name-queue" connection-factory="connectionFactory"/>
<!-- Spring Integration configuration -->
<int:channel id="fromAMQP">
<!-- Is this necessary?? -->
<int:queue/>
</int:channel>
<!-- JobService is a @Service with a @ServiceActivator annotation -->
<int:service-activator input-channel="fromAMQP" ref="jobService"/>
</beans>