1

我正在尝试实现一个 RabbitMQ 配置,它允许我使用固定的回复队列,而不是出现数百个临时队列。我发布的第一条消息通过回复队列得到立即响应,第二条、第三条甚至有时甚至是第五条消息,只是给了我一个堆栈跟踪信息Reply received after timeout。如果我稍等片刻,然后发送另一条消息,我会再次收到响应,任何立即连续的消息都会再次失败并出现相同的错误。

在发布者方面,我有以下配置:

<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
    <property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
    <property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
        id="connectionFactory"
        port="${rabbit.port}"
        virtual-host="${rabbit.virtual}"
        host="${rabbit.host}"
        username="${rabbit.username}"
        password="${rabbit.password}"
        connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
        id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-timeout="${rabbit.rpc.timeout}"
        reply-queue="reply">
    <rabbit:reply-listener />
</rabbit:template>

<rabbit:queue id="reply" name="reply" />

在消费者方面,我有以下配置:

<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
    <property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
    <property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
        id="connectionFactory"
        port="${rabbit.port}"
        virtual-host="${rabbit.virtual}"
        host="${rabbit.host}"
        username="${rabbit.username}"
        password="${rabbit.password}"
        connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
        id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-timeout="${rabbit.rpc.timeout}"
        reply-queue="reply">
    <rabbit:reply-listener concurrency="${rabbit.consumers}" />
</rabbit:template>

<!-- Register Queue Listener Beans -->
<rabbit:listener-container
        connection-factory="connectionFactory"
        channel-transacted="true"
        requeue-rejected="true"
        concurrency="${rabbit.consumers}">
    <rabbit:listener queues="test" ref="TestProcessor" method="onMessage" />
</rabbit:listener-container>

<rabbit:queue id="test" name="test" />
<rabbit:queue id="reply" name="reply" />

我正在使用 spring-amqp 1.4.4 以防万一:

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.4.4.RELEASE</version>
    </dependency>

这就是我构建消息并发布它的方式:

MessageProperties properties = new MessageProperties();          
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(toJson(request).getBytes(), properties);
Message res = getTemplate().sendAndReceive(exchange, queue, message);

模板只是 AmqpTemplate 的一个自动装配实例:

@Autowired
AmqpTemplate template;

第一条消息得到立即响应,第二条消息(以及第三条消息等等)在消费者端获得以下堆栈跟踪:

2015-04-22 07:53:03,329 [SimpleAsyncTaskExecutor-1] WARN  org.springframework.amqp.rabbit.core.RabbitTemplate - Reply received after timeout for 4bfb2f6f-2e31-414c-9ec3-a4672e4c7e34
2015-04-22 07:53:03,330 [SimpleAsyncTaskExecutor-1] WARN  org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
    at java.lang.Thread.run(Thread.java:744)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
    at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:1276)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
    ... 10 more

...而发布者在回复队列上没有得到任何响应后只是超时。

这就是我在消费者端回复消息的方式:

   @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            ...
            System.out.println(message);
            // handle reply-to
            if (message.getMessageProperties() != null && message.getMessageProperties().getReplyTo() != null) {

                Message res = new Message(toJson(response).getBytes(), message.getMessageProperties());
                getTemplate().send("", message.getMessageProperties().getReplyTo(), res);

            }
        } catch (Exception e) {
            e.printStackTrace();
            // TODO: forward to exception queue here
        }    
    }

System.out.println(message);打印以下内容:

(Body:'{"message":"Sent 'Test Text' on Wed Apr 22 08:17:13 SAST 2015"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=[56, 50, 98, 100, 100, 56, 53, 54, 45, 57, 101, 100, 102, 45, 52, 99, 54, 97, 45, 97, 55, 51, 101, 45, 102, 54, 48, 101, 50, 49, 48, 53, 55, 101, 97, 48], replyTo=reply, contentType=application/json, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=test, deliveryTag=1, messageCount=0])

有任何想法吗?

4

2 回答 2

2

您有 2 个兔子模板,每个模板都使用相同的reply队列 - 因此消费者端模板正在“接收”“第二个”回复(因此日志消息是因为它在没有等待的未完成请求时收到“回复”回复 - 在生产者方面已经结束)。

请注意,从 rabbitmq 3.4 开始,通常最好使用新的 rabbit 内置的直接回复功能;它通常解决了我们必须实现固定回复队列机制的所有原因。在 Spring AMQP 1.4.1.RELEASE 中添加了对直接回复的支持。

于 2015-04-22T07:28:55.680 回答
0

经过几天的干预,我对 rabit 模板的 sendAndReceive() 唯一了解的就是永远不要干预绑定键并让框架将其设置为队列名称。从这个意义上说,它工作得很好,但如果我用我的大脑来设置它,很多事情都会出错。

现在我被困在获取相关 ID 并且我收到的和我发送的不一样。这怎么可能 ?

于 2015-11-09T05:32:51.940 回答