58

我正在尝试设置我的第一个 RabbitMQ 死信交换,以下是我通过 Web 管理界面使用的步骤:

  1. 使用名称“dead.letter.test”创建新的 DIRECT 交换
  2. 创建新队列“dead.letter.queue”
  3. 将“dead.letter.queue”绑定到“dead.letter.test”
  4. 创建新队列“test1”,死信交换设置为“dead.letter.test”
  5. 向“test1”发送消息
  6. Nack (with requeue = false) "test1" 中的消息

我期望这些步骤应该通过“dead.letter.test”交换将记录放入“dead.letter.queue”。这没有发生。

我可以手动将消息放入“dead.letter.test”交换中,它会显示在“dead.letter.queue”中,所以我知道这很好。

当我查看管理 UI 时,它显示 DLX 参数设置在队列“test1”上。

我哪里错了?

4

8 回答 8

83

Gentilissimo Signore 很友善地在 Twitter 上回答了我的问题。问题是,如果您的死信交换设置为 DIRECT,您必须指定死信路由键。如果您只想将所有 NACKed 消息放入死信桶以供以后调查(就像我一样),那么您的死信交换应该设置为 FANOUT。

以下是有效的更新步骤:

  1. 使用名称“dead.letter.test”创建新的FANOUT交换
  2. 创建新队列“dead.letter.queue”
  3. 将“dead.letter.queue”绑定到“dead.letter.test”
  4. 创建新队列“test1”,死信交换设置为“dead.letter.test”
  5. 向“test1”发送消息
  6. Nack (with requeue = false) "test1" 中的消息
于 2014-02-18T20:04:24.373 回答
17

无路由密钥和直接交换的死信交换


按照这些步骤肯定会起作用:-
1. 创建一个名为“ dead_queue ”的新队列。
2. 创建一个名为“ dead_exchange ”的交换,交换类型应为“直接”。
3. 绑定‘ dead_queue ’和‘ dead_exchange ’,不用路由键。
4.创建一个名为' test_queue '的新队列,并将其' x-dead-letter-exchange '名称设置为' dead_exchange '
5.创建一个名为' test_exchange '的交换器,交换器类型应为'direct'
6.绑定' test_exchange ' 和 '

最后我们会检查它。为此,在“ test_exchange ”上发布一些内容,参数“ expiration ”设置为 10000。此后,当在“ test_exchange ”上发布消息时,它将转到“ test_queue ”,当消息在队列中过期时,它将查找 DLX Parameter(Dead Letter Exchange name) 那里的消息找到名称' dead_exchange '然后该消息将到达' dead_exchange '将其传递到 ' dead queue ' ..如果您对此仍有任何问题并且如果我错过了理解您的问题.. . 写下你的问题,我一定会看的...谢谢..

注意:必须在 ' test_exchange ' 上发布消息,因为 test_queue 和 test_exchange 绑定没有路由密钥,它会正常工作,但是如果您在 ' test_queue ' 上发布消息,将使用默认交换和路由密钥。然后在消息队列到期后尝试使用一些默认路由键将死消息传递到 dead_exchange,并且消息不会进入该队列。

于 2015-10-31T05:46:14.997 回答
7

如果你想在死信交换上使用自定义路由键,你必须x-dead-letter-routing-key在声明工作队列时设置(在你的情况下是test1),否则将使用默认路由键。在您的情况下,RabbitMQ 代理检测到循环并简单地丢弃被拒绝的消息。

您需要的是在队列中设置x-dead-letter-exchange=dead.letter.testx-dead-letter-routing-key=dead.letter.queue参数。test1

于 2014-02-13T07:30:23.527 回答
4

如果您希望所有队列都具有相同的死信交换,则更容易设置一般策略:

sudo rabbitmqctl -p /my/vhost/path set_policy DLX ".*" '{"dead-letter-exchange":"MyExchange.DEAD"}' --apply-to queues
于 2016-03-29T14:12:02.310 回答
3

如果不是强制性的,则不需要创建 FANOUT 交换。

您可以使用已用于其他交换的相同路由密钥创建 DIRECT 交换。而且也不需要为新的交换创建一个新的队列。您可以将现有队列与新交换一起使用。您只需要将新的交换与队列绑定。

这是我的 receive.js 文件:

var amqp = require("amqplib/callback_api");
var crontab = require('node-crontab');

amqp.connect("amqp://localhost", function (err, conn) {
conn.createChannel(function (err, ch) {
    var ex = 'direct_logs';
    var ex2 = 'dead-letter-test';
    var severity = 'enterprise-1-key';

    //assert "direct" exchange
    ch.assertExchange(ex, 'direct', { durable: true });
    //assert "dead-letter-test" exchange
    ch.assertExchange(ex2, 'direct', { durable: true });

    //if acknowledgement is nack() then message will be stored in second exchange i.e. ex2="dead-letter-test"
    ch.assertQueue('enterprise-11', { exclusive: false, deadLetterExchange: ex2 }, function (err, q) {
        var n = 0;
        console.log(' [*] Waiting for logs. To exit press CTRL+C');
        console.log(q);

        //Binding queue with "direct_logs" exchange
        ch.bindQueue(q.queue, ex, severity);
        //Binding the same queue with "dead-letter-test"
        ch.bindQueue(q.queue, ex2, severity);

        ch.consume(q.queue, function (msg) {
            // consume messages via "dead-letter-exchange" exchange at every second.
            if (msg.fields.exchange === ex2) {
                crontab.scheduleJob("* * * * * *", function () {
                    console.log("Received by latest exchange %s", msg.fields.routingKey, msg.content.toString());
                });
            } else {
                console.log("Received %s", msg.fields.routingKey, msg.content.toString());
            }

            if (n < 1) {
                // this will executes first time only. Here I'm sending nack() so message will be stored in "deadLetterExchange"
                ch.nack(msg, false, false);
                n += 1;
            } else {
                ch.ack(msg)
                n = 0
            }
        }, { noAck: false });
    });
  });
});
于 2018-08-30T13:49:50.063 回答
2

使用名称“dead.letter.test”创建新的 DIRECT 交换

正确的

创建新队列“dead.letter.queue”

正确的

将“dead.letter.queue”绑定到“dead.letter.test”

正确的

创建新队列“test1”,死信交换设置为“dead.letter.test”

我假设您正在创建 test1 队列并将其绑定到 dead.letter.test 交换

向“test1”发送消息

如果您希望 dead.letter.queue 接收您的消息,则必须在发送消息时提供路由键,并且使用 dead.letter.queue 的客户端也应该使用相同的路由键

如果您在没有路由密钥的情况下发布,那么只有订阅了 test1 的客户端才会收到消息。

如果您将消息发布到 direct.letter.test 交换,那么所有队列都会收到该消息。它将像扇出交换一样工作

因此,如果您希望 dead.letter.queue 接收消息,则必须在该队列中发布消息,否则您必须在发布和订阅以及发布消息以进行交换时使用相同的路由密钥

于 2014-02-12T23:50:01.107 回答
0

就我而言,问题是因为队列有

ackMode="MANUAL"

但我从未设置它(因为运行时异常),而是使用默认 ACK。

于 2020-04-07T16:12:38.067 回答
0

对于那些使用 Spring-AMQP 的人

在我的情况下,问题是不同的。我希望死信交换是直接类型的。我为队列设置了x-dead-letter-exchangex-dead-letter-routing-key。再加上我spring.rabbitmq.listener.simple.default-requeue-rejected=falseapplication.properties.

似乎一切都很好,但是在调试时我注意到我SimpleRabbitListenerContainerFactorydefaultRequeueRejected值为空。所以原因是当你SimpleRabbitListenerContainerFactory在你的 中声明时@Configuration,你创建了一个新的“非默认”bean。默认值是在您的属性中为您在幕后创建的。但是你SimpleRabbitListenerContainerFactory@Config里面,这些属性是不读的,必须自己读,在java代码里设置。

它发生在我身上,因为当我想配置并发时,我只是从Spring-AMQP 文档中复制粘贴了配置。但是你应该在一个地方做所有事情,无论是在属性中,比如

spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

或者完全在java中,比如

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(5);
        factory.setMaxConcurrentConsumers(10);
        factory.setDefaultRequeueRejected(false);
        return factory;
    }

上面这2个是一样的。

我希望当我使用第二个(java)选项时,仍然会从 application.properties 中获取属性,然后我在 java 中进行自定义,但它不会像这样工作。是的,“复制粘贴”是邪恶的:)

于 2021-01-15T10:51:13.430 回答