1

好的,无需详细介绍我设置的整个系统,

我遇到的问题是,当消费者取消(AMQPChannel->basic_cancel)侦听队列时,它会留下一条该工作人员未确认的附加消息。它也不会触发正常的回调来处理此消息。

一些细节

  • 队列阻塞(使用等待) while(count($channel->callbacks)) { $channel->wait( ... ) ... }
  • 预取是 1,你可以拥有的最小
  • 消费者可以动态收听 ( AMQPChannel->basic_consume)
  • 消费者可以动态忘记 ( AMQPChannel->basic_cancel)

我不会详细说明我告诉给定消费者消费或取消给定队列的确切方式。但这一切都完美地开始消费或取消就好了。但是,当我取消一个仍然有消息的队列时,他们只是忘记了最后一条消息,并且我相信取消会删除该队列的回调,因此除了杀死不希望的消费者之外,没有办法恢复该消息。

我做了一些调试(只是一个例子,不是我实际做的)

   Debug::dump($this->getAmqpChannel()->getMethodQueue());
   $tag = $this->_tags[$queue]; //I keep track of the consumer tag on a queue by queue basis, $queue == {queuename} below
   $this->getAmqpChannel()->basic_cancel( $tag );
   Debug::dump($this->getAmqpChannel()->getMethodQueue());

这个的输出大致是

  array()
  RunCommand: basic_cancel //this works fine consumer forgets queue except ->
  array(1){
    [0] => array(3){
        [0] => string(5) "60,60",
        [1] => string(114) "amq.ctag-D9om-gD_cPevzeon52zpig\0\0\0\0\0\0\0\0\0G{queuename}",  //{queuename} is the name of the queue, which is based on clients information I cant share (such as their name)
       [2] => object(PhpAmqpLib\Message\AMQPMessage)#0 (9) {
            ["DELIVERY_MODE_NON_PERSISTENT":constant] => int(1),
            ["DELIVERY_MODE_PERSISTENT":constant] => int(2),
            ["body":public] => string(1358647) "{ ... "correlation_id":32,"max_correlation_id":38}"
            ["body_size":public] => int(135864),
            ["is_truncated":public] => bool(false),
            ["content_encoding":public] => null,
            ["propertyDefinitions":protected static] => array(14){ ... }
            ["delivery_info":public] => array(0){},
            ["prop_types":protected] => array(14){ ... }
      }
    }

一旦工人死亡(或者我宁愿杀死它),消息就会被放回队列中,我可以在获取消息下的 RabbitMq 管理事物(插件)中将其拉出。它就在那里,

  Properties
    correlation_id: 32:38
    delivery_mode:  2
    content_encoding:   text/plain
    content_type:   application/json

"correlation_id":32,"max_correlation_id":38对应于,因为correlation_id: 32:38我需要跟踪消息部分。所以我知道这是同样的信息。

那么为什么在我取消之后我会收到最后一条卡在僵尸领域的消息,并且无论如何都要将它踢回队列而不杀死消费者。

此外,这不是一次性的,每次我取消其中仍有消息的队列时都会发生这种情况。所以它与给定的消息无关。就好像它得到了最后一条预取消息,然后因为它被取消了,所以最后一条消息没有回调来运行,它只是陷入了困境。请记住 0 预取是获取所有消息,1 是您可以设置的最低值。

任何可以提供的帮助都会很棒。

更新

我可能会打电话给我一个解决方案

 $this->getAmqpChannel()->basic_recover(true); //basic_recover($requeue)

无论是之前还是之后basic_cancel

这会拒绝该消息,我什至可以$this->getAmqpChannel()->getMethodQueue()如上所示进行测试,以查看$queue我正在取消的消息是否被封存(尚未实现)。

我试图避免使用recover,但我认为这应该没问题,因为消费者使用单个通道并且正在阻塞,最坏的情况是它只会拒绝有效消息 1 次,虽然不理想应该是可以接受的。

然而,在某些情况下,我从兔子那里得到了一个额外的例外,

  PRECONDITION_FAILED - unknown delivery tag {n}

如果有人对这个额外的错误有任何详细信息,那就太好了。此外,所有队列都需要 Ack,它们都不是自动的。

更新1

我在堆栈跟踪中注意到,queue_unbind所以我所做的是在内部跟踪绑定,这样我可以确保取消绑定只完成一次。明天做更多测试后,我将发布一些代码,但我在实施后的初始测试不再产生错误。

所有这些听起来可能有点“奇怪”,我可以解释为什么以及我在做什么,但这可能超出了问题的范围。我会说我已经在生产中使用了这个系统超过 2 年(我设计了它),我们每分钟可以进行 18 万次搜索(如果考虑系统的所有部分,大约 100 次)。我们还使用它进行了超过 2800 亿次搜索,感觉是我构建的。我们现在也是我们行业中的领先公司,要么淘汰了我们的竞争对手,要么他们把他们的东西寄给我们,不再在内部做。这在很大程度上是因为我们的快速周转以及我们的数据质量。所以这个系统确实有效,而且效果很好。

但在最近的审计中,我注意到 Daily 消费者只需处理大约 1000 万行(大约 100 分钟的工作),而 Nightly 消费者只需处理大约 1 亿行(或大约 20 小时的工作)。Daily 消费者可以在夜间工作,但只能在工作时间之外(因为它减少了白天的响应时间),因此有大约 10 小时的窗口,夜间工作只能在更小、功能更差的服务器上运行。这给我们的解决方案是,如果没有每日作业(客户提交的作业),它们可以动态地动态交换到夜间工作(数据仓库)。这应该保持大部分响应能力,同时在没有提交作业时不会浪费资源。我们可以在搜索中随心所欲地横向扩展,但我们确实为我们的主服务器付出了很多,

我可能会写一本小书来说明这一切是如何工作的,但希望这能给我一些关于我在做什么的基本概念。我还受到合同中一些保密和非竞争条款的约束,因此我可以真正了解具体细节。

4

1 回答 1

0

Consumer在 RabbitMQ 用语中,表示队列中的订阅者。(有关通道、消费者和连接之间差异的详细信息,请参阅此答案)。

当您打开确认时,它们会为channel. 在该通道上传递的任何消息都将具有与其关联的传递标签。处理完消息后,您需要通过同一通道告诉服务器该消息已被处理。取消消费者对已传递消息的确认没有影响。事实上,接收消息,取消消费者,处理消息,然后发送确认将是一个完全有效的用例。

因此,您有两个选择。您可以不确认消息,在这种情况下,您所要做的就是关闭通道,它将在队列的头部重新排队。或者,您可以确认它(anackack),在这种情况下,消息将重新排队 ifnack或丢弃 if ack

如果我没记错的话,不指定预取计数(通过basic.qos)将导致预取为零,这意味着您必须在接收下一条消息之前确认上一条消息。我可能错了。当然,如果你使用 a basic.get,你可以完全避免这个问题,而对性能的影响很小。

于 2018-08-29T19:49:52.277 回答