0

系统布局

我们有三个系统:

  1. API 端点(发布者和消费者)
  2. RabbitMQ 服务器
  3. 主要应用程序/处理器(发布者和消费者)

系统 1 和 3 都使用 Laravel,并使用 PHPAMQPLIB 与 RabbitMQ 交互。

消息的路径

系统 1(API 端点)向 RabbitMQ 服务器发送序列化作业,供系统 3 处理。然后它立即声明一个新的随机命名的队列,将一个交换绑定到具有相关 ID 的该队列 - 并开始侦听消息。

同时,系统 3 完成了该工作,一旦完成,就将该工作的详细信息返回给交易所的 RabbitMQ,并带有相关 ID。

问题和我尝试过的

我经常发现这个过程失败了。作业被发送和接收,响应被发送 - 但系统 1 从未读取此响应,我没有看到它在 RabbitMQ 中发布。

我已经对此进行了一些广泛的调试,但没有找到根本原因。我目前的理论是,系统 3 返回响应的速度非常快,以至于系统 1 甚至还没有声明新的队列和交换绑定。这意味着系统 3 的响应无处可去,结果消失了. 该理论主要基于这样一个事实,即如果我在 System 3 上将作业设置为以较低频率处理,则系统变得更加可靠。作业处理越快,它就越不可靠。

问题是:我怎样才能防止这种情况发生?还是我还缺少其他东西?我当然希望在不破坏请求/响应模式的情况下快速有效地处理这些工作。

我已经记录了两个系统的输出 - 两者都使用相同的相关 ID,并且系统 3 在发布时获得 ACK - 而系统 1 有一个声明的队列,没有最终超时的消息。

代码示例 1:发布消息

/**
 * Helper method to publish a message to RabbitMQ
 *
 * @param $exchange
 * @param $message
 * @param $correlation_id
 * @return bool
 */
public static function publishAMQPRouteMessage($exchange, $message, $correlation_id)
{
    try {
        $connection = new AMQPStreamConnection(
            env('RABBITMQ_HOST'),
            env('RABBITMQ_PORT'),
            env('RABBITMQ_LOGIN'),
            env('RABBITMQ_PASSWORD'),
            env('RABBITMQ_VHOST')
        );
        $channel = $connection->channel();

        $channel->set_ack_handler(function (AMQPMessage $message) {
            Log::info('[AMQPLib::publishAMQPRouteMessage()] - Message ACK');
        });

        $channel->set_nack_handler(function (AMQPMessage $message) {
            Log::error('[AMQPLib::publishAMQPRouteMessage()] - Message NACK');
        });

        $channel->confirm_select();

        $channel->exchange_declare(
            $exchange,
            'direct',
            false,
            false,
            false
        );

        $msg = new AMQPMessage($message);
        $channel->basic_publish($msg, $exchange, $correlation_id);

        $channel->wait_for_pending_acks();

        $channel->close();
        $connection->close();

        return true;
    } catch (Exception $e) {
        return false;
    }
}

代码示例 2:等待消息响应

/**
 * Helper method to fetch messages from RabbitMQ.
 *
 * @param $exchange
 * @param $correlation_id
 * @return mixed
 */
public static function readAMQPRouteMessage($exchange, $correlation_id)
{
    $connection = new AMQPStreamConnection(
        env('RABBITMQ_HOST'),
        env('RABBITMQ_PORT'),
        env('RABBITMQ_LOGIN'),
        env('RABBITMQ_PASSWORD'),
        env('RABBITMQ_VHOST')
    );
    $channel = $connection->channel();

    $channel->exchange_declare(
        $exchange,
        'direct',
        false,
        false,
        false
    );

    list($queue_name, ,) = $channel->queue_declare(
        '',
        false,
        false,
        true,
        false
    );

    $channel->queue_bind($queue_name, $exchange, $correlation_id);

    $callback = function ($msg) {
        return self::$rfcResponse = $msg->body;
    };

    $channel->basic_consume(
        $queue_name,
        '',
        false,
        true,
        false,
        false,
        $callback
    );

    if (!count($channel->callbacks)) {
        Log::error('[AMQPLib::readAMQPRouteMessage()] - No callbacks registered!');
    }

    while (self::$rfcResponse === null && count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();

    return self::$rfcResponse;
}

感谢您提供的任何建议!

4

1 回答 1

1

我可能遗漏了一些东西,但是当我读到这个时:

系统 1(API 端点)向 RabbitMQ 服务器发送序列化作业,供系统 3 处理。然后它立即声明一个新的随机命名的队列,将一个交换绑定到具有相关 ID 的该队列 - 并开始侦听消息。

我的第一个想法是“为什么要等到消息发送后才声明返回队列?”

事实上,我们在这里有一系列单独的步骤:

  1. 生成相关 ID
  2. 将包含该 ID 的消息发布到交换器以在其他地方处理
  3. 声明一个新队列来接收响应
  4. 使用相关 ID 将队列绑定到交换器
  5. 将回调绑定到新队列
  6. 等待回复

直到第 2 步之后才能得到响应,因此我们希望尽可能晚地这样做。唯一不能在这之前的步骤是第 6 步,但在代码中将第 5 步和第 6 步放在一起可能更方便。所以我会将代码重新排列为:

  1. 生成相关 ID
  2. 声明一个新队列来接收响应
  3. 使用相关 ID 将队列绑定到交换器
  4. 将包含相关 ID 的消息发布到交换以在其他地方处理
  5. 将回调绑定到新队列
  6. 等待回复

这样,无论响应发布的速度有多快,它都会被步骤 2 中声明的队列拾取,并且一旦您绑定回调并开始等待,您就会处理它。

请注意,没有什么是不readAMQPRouteMessage知道的publishAMQPRouteMessage,因此您可以在它们之间自由移动代码。当您想从响应队列中消费时,您只需要它的名称,您可以将其保存到变量中并传递,或者自己生成而不是让 RabbitMQ 命名它。例如,您可以在它正在侦听的相关 ID 之后为其命名,以便您始终可以通过简单的字符串操作来计算它是什么,例如"job_response.{$correlation_id}"

于 2018-04-03T21:04:30.917 回答