1

我最近发现了允许您延迟消息的 RabbitMQ 功能,并且效果很好,尽管我找不到与我需要的类似的示例:

假设有 3 种类型的消息:A、B 和 C。我们有 2 个具有 1 小时和 2 小时'x-message-ttl值的延迟队列。还有 3 种类型的destination_queues - 每种用于特定的消息类型。

我想要实现的是,在 delay_queues 之一中的消息达到其 TTL 后,它将根据其类型被路由到 destination_queues 之一。像这样的东西:

在此处输入图像描述

这甚至可以使用 RabbitMQ 消息属性吗?有任何想法吗?我的代码将消息发送到延迟队列(到期后它们被发送到 hello 队列):

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))

channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)

channel.queue_bind(exchange='amq.direct',
                   queue='hello')

delay_channel = connection.channel()
delay_channel.confirm_delivery()

delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 3600000,
  'x-dead-letter-exchange' : 'amq.direct',
  'x-dead-letter-routing-key' : 'hello'
})

while 1 :
        delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))
        print "Sent to delay queue"
4

3 回答 3

2

当消息因为达到规定的 TTL 而过期时,它们可以被重定向到死信交换,这就是我认为您在消息过期后隐式使用的将消息移动到另一个队列的方法。

您可以使用消息的原始路由键或最终的“CC”和“BCC”消息头来选择不同的目标队列。

于 2015-05-28T08:27:06.290 回答
1

好的,所以我设法找到了解决方案。不确定它是否是最好的,但它有效。

  1. 我创建了两个交换:DELAY_EXCHANGE 和 ROUTER_EXCHANGE
  2. 我将 DELAY_EXCHANGE 绑定到 delay_queue(我使用的所有 routing_keys)
  3. 延迟队列设置为x-dead-letter-exchange: ROUTER_EXCHANGEx-message-ttl: 14000
  4. 我使用相应的 routing_keys 将 ROUTER_EXCHANGE 绑定到所有队列(A、B、C)。

这样在发送(推送)消息时我没有指定队列,只是交换和路由键:

    delay_channel.basic_publish(exchange='delay_exchange',
                  routing_key='helloC',
                  body="test",
                  properties=pika.BasicProperties(delivery_mode=2))

消息被推送到 DELAY_EXCHANGE,该 DELAY_EXCHANGE 将其定向到 delay_queue,在那里等待其 TTL。当消息过期时,它被重定向到 ROUTER_EXCHANGE,后者分析其 routing_key 并将其重定向到目标队列之一。惊人的。

于 2015-05-28T10:28:15.267 回答
0

您可以使用新的 RabbitMQ 延迟消息交换https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/

于 2015-06-08T11:47:25.503 回答