我刚刚开始使用 php-amqplib 和 RabbitMQ,并且想要一种方法来处理无论出于何种原因无法处理且无法处理的消息。我认为人们处理这个问题的一种方法是使用死信队列。我正在尝试设置它,但到目前为止还没有任何运气,希望有人可以提供一些建议。
我的队列启动看起来有点像:
class BaseAbstract
{
/** @var AMQPStreamConnection */
protected $connection;
/** @var AMQPChannel */
protected $channel;
/** @var array */
protected $deadLetter = [
'exchange' => 'dead_letter',
'type' => 'direct',
'queue' => 'delay_queue',
'ttl' => 10000 // in milliseconds
];
protected function initConnection(array $config)
{
try {
$this->connection = AMQPStreamConnection::create_connection($config);
$this->channel = $this->connection->channel();
// Setup dead letter exchange and queue
$this->channel->exchange_declare($this->deadLetter['exchange'], $this->deadLetter['type'], false, true, false);
$this->channel->queue_declare($this->deadLetter['queue'], false, true, false, false, false, new AMQPTable([
'x-dead-letter-exchange' => $this->deadLetter['exchange'],
'x-dead-letter-routing-key' => $this->deadLetter['queue'],
'x-message-ttl' => $this->deadLetter['ttl']
]));
$this->channel->queue_bind($this->deadLetter['queue'], $this->deadLetter['exchange']);
// Set up regular exchange and queue
$this->channel->exchange_declare($this->getExchangeName(), $this->getExchangeType(), true, true, false);
$this->channel->queue_declare($this->getQueueName(), true, true, false, false, new AMQPTable([
'x-dead-letter-exchange' => $this->deadLetter['exchange'],
'x-dead-letter-routing-key' => $this->deadLetter['queue']
]));
if (method_exists($this, 'getRouteKey')) {
$this->channel->queue_bind($this->getQueueName(), $this->getExchangeName(), $this->getRouteKey());
} else {
$this->channel->queue_bind($this->getQueueName(), $this->getExchangeName());
}
} catch (\Exception $e) {
throw new \RuntimeException('Cannot connect to the RabbitMQ service: ' . $e->getMessage());
}
return $this;
}
// ...
}
我认为应该设置我的死信交换和队列,然后还设置我的常规交换和队列(使用扩展类提供的 getRouteKey、getQueueName 和 getExchangeName/Type 方法)
当我尝试处理如下消息时:
public function process(AMQPMessage $message)
{
$msg = json_decode($message->body);
if (empty($msg->payload) || empty($msg->payload->run)) {
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true);
return;
}
// removed for post brevity, but compose $cmd variable
exec($cmd, $output, $returned);
if ($returned !== 0) {
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} else {
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
}
}
但我得到了错误Something went wrong: Cannot connect to the RabbitMQ service: PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'delay_queue' in vhost '/': received 'dead_letter' but current is ''
这是我应该设置死字的方式吗?我看到的不同示例似乎都显示了一些不同的处理方式,但似乎都不适合我。所以我显然在这里误解了一些东西,我很感激任何建议。:)