0

我们已经用 Rabbit 设置了一些工作流环境。

它解决了我们的需求,但我想知道像我们为计划任务那样做是否也是一种好习惯。

调度意味着没有任务关键的 100% 调整时间。因此,如果应在 60 秒后重试作业,这确实意味着 60 多秒,这取决于处理队列的时间。

我创建了一个 Q_WAIT 并制作了一些标题来传输设置。

让我们这样做:

Worker 正在运行订阅 Q_ACTION

如果操作错过(例如 smtp 服务器不可访问)

->(重新)将消息发布到 Q_WAIT 并设置 properties.headers["scheduled"] = time + 60seconds


另一个进程每 15 秒循环一次通过方法 pop() 而不是订阅的 Q_WAIT 中的所有消息

q_WAIT.pop(:ack => true) do |delivery_info,properties,body|...

  if (properties.headers["scheduled"] has reached its time)

     -> (Re-)Publish the message back to Q_ACTION
        ack(message)

在每个循环之后,连接关闭,因此 NOT (Re-)Published 留在 Q_WAIT 中,因为它们没有被确认。


有人可以确认这是一种有效的(良好)做法。

4

2 回答 2

3

当然,您可以使用原始问题中描述的循环过程。

此外,您可以使用Time-To-Live ExtensionDead Letter Exchanges 扩展

首先,指定x-dead-letter-exchange Q_WAIT队列参数等于当前交换并且x-dead-letter-routing-key等于Q_ACTION绑定的路由键。

如果您需要自定义每条消息 ttl,则在发布期间设置x-message-ttl队列参数集或设置消息过期属性(虽然有一些众所周知的警告,但这不是最佳实践,但它也有效)。

Q_WAIT在这种情况下,您的消息将在 ttl 过期后从右到右进行死信,Q_ACTION而无需任何额外的消费者,这更加可靠和稳定。

请注意,如果您需要高级重新发布逻辑(更改消息正文、属性),则需要额外的队列(例如Q_PRE_ACTION)来使用消息、更改它们然后发布到目标队列(例如Q_ACTION)。

于 2014-06-15T09:25:28.913 回答
0

正如评论中提到的,我尝试了该功能,x-dead-letter-exchange它适用于大多数要求。一个问题/误解是 TTL-PER-MESSAGE 选项。

请看这里的例子。据我了解:

  1. DLQ 的超时时间为 10 秒
  2. 所以第一条消息将在发布后 10 秒在订阅者上可用。
  3. 第二条消息在第一条消息之后 1 秒发布,消息 ttl(过期)为 3 秒

我希望第二条消息应该在发布 3 秒后和第一条消息之前宣布。

但它并没有那样工作,两者都在 10 秒后可用。

问:消息过期不应该推翻DLQ ttl吗?

#!/usr/bin/env ruby
# encoding: utf-8

require 'bunny'

B = Bunny.new ENV['CLOUDAMQP_URL']
B.start

DELAYED_QUEUE='work.later'
DESTINATION_QUEUE='work.now'

def publish
  ch = B.create_channel
  # declare a queue with the DELAYED_QUEUE name
  q = ch.queue(DELAYED_QUEUE, :durable => true, arguments: {
    # set the dead-letter exchange to the default queue
    'x-dead-letter-exchange' => '',
    # when the message expires, set change the routing key into the destination queue name
    'x-dead-letter-routing-key' => DESTINATION_QUEUE,
    # the time in milliseconds to keep the message in the queue
    'x-message-ttl' => 10000,
  })
  # publish to the default exchange with the the delayed queue name as routing key,
  # so that the message ends up in the newly declared delayed queue
  ch.basic_publish('message content 1 ' + Time.now.strftime("%H-%M-%S"), "", DELAYED_QUEUE, :persistent => true)
  puts "#{Time.now}: Published the message 1"

  # wait moment before next publish
  sleep 1.0

  # puts this with a shorter ttl
  ch.basic_publish('message content 2 ' + Time.now.strftime("%H-%M-%S"), "", DELAYED_QUEUE, :persistent => true, :expiration => "3000")
  puts "#{Time.now}: Published the message 2"

  ch.close
end

def subscribe
  ch = B.create_channel
  # declare the destination queue
  q = ch.queue DESTINATION_QUEUE, durable: true 
  q.subscribe do |delivery, headers, body|
    puts "#{Time.now}: Got the message: #{body}"
  end
end

subscribe()
publish()

sleep
于 2014-06-15T12:48:55.343 回答