系统布局
我们有三个系统:
- API 端点(发布者和消费者)
- RabbitMQ 服务器
- 主要应用程序/处理器(发布者和消费者)
系统 1 和 3 都使用 Laravel,并使用 PHPAMQPLIB 与 RabbitMQ 交互。
消息的路径
系统 1(API 端点)向 RabbitMQ 服务器发送序列化作业,供系统 3 处理。然后它立即声明一个新的随机命名的队列,将一个交换绑定到具有相关 ID 的该队列 - 并开始侦听消息。
同时,系统 3 完成了该工作,一旦完成,就将该工作的详细信息返回给交易所的 RabbitMQ,并带有相关 ID。
问题和我尝试过的
我经常发现这个过程失败了。作业被发送和接收,响应被发送 - 但系统 1 从未读取此响应,我没有看到它在 RabbitMQ 中发布。
我已经对此进行了一些广泛的调试,但没有找到根本原因。我目前的理论是,系统 3 返回响应的速度非常快,以至于系统 1 甚至还没有声明新的队列和交换绑定。这意味着系统 3 的响应无处可去,结果消失了. 该理论主要基于这样一个事实,即如果我在 System 3 上将作业设置为以较低频率处理,则系统变得更加可靠。作业处理越快,它就越不可靠。
问题是:我怎样才能防止这种情况发生?还是我还缺少其他东西?我当然希望在不破坏请求/响应模式的情况下快速有效地处理这些工作。
我已经记录了两个系统的输出 - 两者都使用相同的相关 ID,并且系统 3 在发布时获得 ACK - 而系统 1 有一个声明的队列,没有最终超时的消息。
代码示例 1:发布消息
/**
* Helper method to publish a message to RabbitMQ
*
* @param $exchange
* @param $message
* @param $correlation_id
* @return bool
*/
public static function publishAMQPRouteMessage($exchange, $message, $correlation_id)
{
try {
$connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);
$channel = $connection->channel();
$channel->set_ack_handler(function (AMQPMessage $message) {
Log::info('[AMQPLib::publishAMQPRouteMessage()] - Message ACK');
});
$channel->set_nack_handler(function (AMQPMessage $message) {
Log::error('[AMQPLib::publishAMQPRouteMessage()] - Message NACK');
});
$channel->confirm_select();
$channel->exchange_declare(
$exchange,
'direct',
false,
false,
false
);
$msg = new AMQPMessage($message);
$channel->basic_publish($msg, $exchange, $correlation_id);
$channel->wait_for_pending_acks();
$channel->close();
$connection->close();
return true;
} catch (Exception $e) {
return false;
}
}
代码示例 2:等待消息响应
/**
* Helper method to fetch messages from RabbitMQ.
*
* @param $exchange
* @param $correlation_id
* @return mixed
*/
public static function readAMQPRouteMessage($exchange, $correlation_id)
{
$connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);
$channel = $connection->channel();
$channel->exchange_declare(
$exchange,
'direct',
false,
false,
false
);
list($queue_name, ,) = $channel->queue_declare(
'',
false,
false,
true,
false
);
$channel->queue_bind($queue_name, $exchange, $correlation_id);
$callback = function ($msg) {
return self::$rfcResponse = $msg->body;
};
$channel->basic_consume(
$queue_name,
'',
false,
true,
false,
false,
$callback
);
if (!count($channel->callbacks)) {
Log::error('[AMQPLib::readAMQPRouteMessage()] - No callbacks registered!');
}
while (self::$rfcResponse === null && count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
return self::$rfcResponse;
}
感谢您提供的任何建议!