1

我可以使用 Rabbit Java API 编写一个 java 程序,执行以下操作:

  1. 客户端通过具有相关 ID 的 Rabbit MQ 交换/队列发送消息(例如 UUID -“348a07f5-8342-45ed-b40b-d44bfd9c4dde”)。

  2. 服务器收到消息。

  3. 服务器通过 Rabbit MQ 交换/队列发送具有相同相关 ID -“348a07f5-8342-45ed-b40b-d44bfd9c4dde”的响应消息。

  4. 客户端仅在与 1 相同的线程中收到相关消息。

下面是使用 Rabbit API 的 Send.java 和 Recv.java。我需要帮助来将此示例转换为使用 Spring AMQP 集成,尤其是第 4 步中的接收部分。我正在寻找类似接收方法的东西,它可以使用相关 ID 过滤消息。

发送.java:

import java.util.UUID;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Send {

    private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
    private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
        String message = "Hello World!";
        String cslTransactionId = UUID.randomUUID().toString();
        BasicProperties properties = (new BasicProperties.Builder())
            .correlationId(cslTransactionId)
            .replyTo(RESPONSE_QUEUE).build();

        channel.basicPublish("", REQUEST_QUEUE, properties, message.getBytes());

        System.out.println("Client Sent '" + message + "'");


        Channel responseChannel = connection.createChannel();
        responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        responseChannel.basicConsume(RESPONSE_QUEUE, false, consumer);
        String correlationId = null;
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String responseMessage = new String(delivery.getBody());
            correlationId = delivery.getProperties().getCorrelationId();
            System.out.println("Correlation Id:" + correlationId);
            if (correlationId.equals(cslTransactionId)) {
                    System.out.println("Client Received '" + responseMessage + "'");
                responseChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                break;
            }
        }

        channel.close();
        connection.close();
    }
}

Recv.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv {

    private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
    private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(REQUEST_QUEUE, true, consumer);
        String correlationId = null;
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            correlationId = delivery.getProperties().getCorrelationId();
            System.out.println("Correlation Id:" + correlationId);
            System.out.println("Server Received '" + message + "'");
            if (correlationId != null)
                break;
            }

            String responseMsg = "Response Message";
            Channel responseChannel = connection.createChannel();
            responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);
            BasicProperties properties = (new BasicProperties.Builder())
            .correlationId(correlationId).build();

            channel.basicPublish("", RESPONSE_QUEUE, properties,responseMsg.getBytes());

            System.out.println("Server Sent '" + responseMsg + "'");

            channel.close();
            connection.close();
       }
}

运行 gary 提供的 Java 配置后,我试图将配置移动到 XML 格式以供服务器端添加侦听器。下面是 XML 配置:

服务器.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean 
        id="serviceListenerContainer"
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="queues" ref="requestQueue"/>
            <property name="messageListener" ref="messageListenerAdaptor"/>
    </bean>

    <bean id="messageListenerAdaptor"
        class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="pojoListener" />
    </bean>

    <bean 
        id="pojoListener"
        class="PojoListener"/>

    <bean
        id="replyListenerContainer"
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="queues" ref="replyQueue"/>
        <property name="messageListener" ref="fixedReplyQRabbitTemplate"/>
    </bean>

    <!-- Infrastructure -->
    <rabbit:connection-factory 
        id="connectionFactory" 
        host="localhost" 
        username="guest" 
        password="guest" 
        cache-mode="CHANNEL" 
        channel-cache-size="5"/>

    <rabbit:template 
        id="fixedReplyQRabbitTemplate" 
        connection-factory="connectionFactory"
        exchange="fdms.exchange"
        routing-key="response.key"
        reply-queue="RESPONSE.QUEUE">
        <rabbit:reply-listener/>
    </rabbit:template>

    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:queue id="requestQueue" name="REQUEST.QUEUE" />
    <rabbit:queue id="replyQueue" name="RESPONSE.QUEUE" />

    <rabbit:direct-exchange name="fdms.exchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="RESPONSE.QUEUE" key="response.key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>

SpringReceive.java

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.support.ClassPathXmlApplicationContext;


public class SpringReceive {

/**
 * @param args
 */
public static void main(String[] args) {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("cslclient.xml");
    SimpleMessageListenerContainer serviceListenerContainer =     context.getBean("serviceListenerContainer", SimpleMessageListenerContainer.class);
    serviceListenerContainer.start();
    }
}
4

1 回答 1

2

您可以将RabbitTemplate.sendAndReceive()(或convertSendAndReceive())与回复侦听器容器一起使用(此处的文档);该模板将为您处理相关性。

如果您使用 Spring Integration,请使用带有适当配置的 rabbit 模板的出站网关。

于 2014-05-05T12:38:59.130 回答