好的,无需详细介绍我设置的整个系统,
我遇到的问题是,当消费者取消(AMQPChannel->basic_cancel)侦听队列时,它会留下一条该工作人员未确认的附加消息。它也不会触发正常的回调来处理此消息。
一些细节
- 队列阻塞(使用等待)
while(count($channel->callbacks)) { $channel->wait( ... ) ... }
- 预取是 1,你可以拥有的最小
- 消费者可以动态收听 (
AMQPChannel->basic_consume
) - 消费者可以动态忘记 (
AMQPChannel->basic_cancel
)
我不会详细说明我告诉给定消费者消费或取消给定队列的确切方式。但这一切都完美地开始消费或取消就好了。但是,当我取消一个仍然有消息的队列时,他们只是忘记了最后一条消息,并且我相信取消会删除该队列的回调,因此除了杀死不希望的消费者之外,没有办法恢复该消息。
我做了一些调试(只是一个例子,不是我实际做的)
Debug::dump($this->getAmqpChannel()->getMethodQueue());
$tag = $this->_tags[$queue]; //I keep track of the consumer tag on a queue by queue basis, $queue == {queuename} below
$this->getAmqpChannel()->basic_cancel( $tag );
Debug::dump($this->getAmqpChannel()->getMethodQueue());
这个的输出大致是
array()
RunCommand: basic_cancel //this works fine consumer forgets queue except ->
array(1){
[0] => array(3){
[0] => string(5) "60,60",
[1] => string(114) "amq.ctag-D9om-gD_cPevzeon52zpig\0\0\0\0\0\0\0\0\0G{queuename}", //{queuename} is the name of the queue, which is based on clients information I cant share (such as their name)
[2] => object(PhpAmqpLib\Message\AMQPMessage)#0 (9) {
["DELIVERY_MODE_NON_PERSISTENT":constant] => int(1),
["DELIVERY_MODE_PERSISTENT":constant] => int(2),
["body":public] => string(1358647) "{ ... "correlation_id":32,"max_correlation_id":38}"
["body_size":public] => int(135864),
["is_truncated":public] => bool(false),
["content_encoding":public] => null,
["propertyDefinitions":protected static] => array(14){ ... }
["delivery_info":public] => array(0){},
["prop_types":protected] => array(14){ ... }
}
}
一旦工人死亡(或者我宁愿杀死它),消息就会被放回队列中,我可以在获取消息下的 RabbitMq 管理事物(插件)中将其拉出。它就在那里,
Properties
correlation_id: 32:38
delivery_mode: 2
content_encoding: text/plain
content_type: application/json
"correlation_id":32,"max_correlation_id":38
对应于,因为correlation_id: 32:38
我需要跟踪消息部分。所以我知道这是同样的信息。
那么为什么在我取消之后我会收到最后一条卡在僵尸领域的消息,并且无论如何都要将它踢回队列而不杀死消费者。
此外,这不是一次性的,每次我取消其中仍有消息的队列时都会发生这种情况。所以它与给定的消息无关。就好像它得到了最后一条预取消息,然后因为它被取消了,所以最后一条消息没有回调来运行,它只是陷入了困境。请记住 0 预取是获取所有消息,1 是您可以设置的最低值。
任何可以提供的帮助都会很棒。
更新
我可能会打电话给我一个解决方案
$this->getAmqpChannel()->basic_recover(true); //basic_recover($requeue)
无论是之前还是之后basic_cancel
这会拒绝该消息,我什至可以$this->getAmqpChannel()->getMethodQueue()
如上所示进行测试,以查看$queue
我正在取消的消息是否被封存(尚未实现)。
我试图避免使用recover
,但我认为这应该没问题,因为消费者使用单个通道并且正在阻塞,最坏的情况是它只会拒绝有效消息 1 次,虽然不理想应该是可以接受的。
然而,在某些情况下,我从兔子那里得到了一个额外的例外,
PRECONDITION_FAILED - unknown delivery tag {n}
如果有人对这个额外的错误有任何详细信息,那就太好了。此外,所有队列都需要 Ack,它们都不是自动的。
更新1
我在堆栈跟踪中注意到,queue_unbind
所以我所做的是在内部跟踪绑定,这样我可以确保取消绑定只完成一次。明天做更多测试后,我将发布一些代码,但我在实施后的初始测试不再产生错误。
所有这些听起来可能有点“奇怪”,我可以解释为什么以及我在做什么,但这可能超出了问题的范围。我会说我已经在生产中使用了这个系统超过 2 年(我设计了它),我们每分钟可以进行 18 万次搜索(如果考虑系统的所有部分,大约 100 次)。我们还使用它进行了超过 2800 亿次搜索,感觉是我构建的。我们现在也是我们行业中的领先公司,要么淘汰了我们的竞争对手,要么他们把他们的东西寄给我们,不再在内部做。这在很大程度上是因为我们的快速周转以及我们的数据质量。所以这个系统确实有效,而且效果很好。
但在最近的审计中,我注意到 Daily 消费者只需处理大约 1000 万行(大约 100 分钟的工作),而 Nightly 消费者只需处理大约 1 亿行(或大约 20 小时的工作)。Daily 消费者可以在夜间工作,但只能在工作时间之外(因为它减少了白天的响应时间),因此有大约 10 小时的窗口,夜间工作只能在更小、功能更差的服务器上运行。这给我们的解决方案是,如果没有每日作业(客户提交的作业),它们可以动态地动态交换到夜间工作(数据仓库)。这应该保持大部分响应能力,同时在没有提交作业时不会浪费资源。我们可以在搜索中随心所欲地横向扩展,但我们确实为我们的主服务器付出了很多,
我可能会写一本小书来说明这一切是如何工作的,但希望这能给我一些关于我在做什么的基本概念。我还受到合同中一些保密和非竞争条款的约束,因此我可以真正了解具体细节。