4

我编写了一个 C (rabbitmq-c) worker 应用程序,它使用 Python 脚本 (pika) 发布的队列。

我有以下似乎无法解决的奇怪行为:

  1. 在消息发布到队列之前启动所有工作人员按预期工作
  2. 队列发布后启动 1 个工作人员按预期工作
  3. 但是:在工作人员开始从队列中消费之后启动额外的工作人员意味着这些工作人员看不到队列上的任何消息(消息计数 = 0),因此只需等待(即使队列上仍有许多消息)。杀死第一个工作人员会突然开始消息流向所有其他(等待的)消费者。

任何想法可能会发生什么?

我已经尝试确保每个消费者都有自己的频道(这是必要的吗?)但仍然是相同的行为......

这是消费者(工人)的代码:

conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
           "/",
           0,
           131072,
           0,
           AMQP_SASL_METHOD_PLAIN,
           "guest",
           "guest");

if (amqp_channel_open(conn, chan) == NULL)
    LOG_ERR(" [!] Failed to open amqp channel!\n");

if ((q = amqp_queue_declare(conn,
                            chan,
                            amqp_cstring_bytes("ranges"),
                            0,
                            0,
                            0,
                            0,
                            amqp_empty_table)) == NULL)
    LOG_ERR(" [!] Failed to declare queue!\n");

LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);

amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);

while(1) {
    amqp_maybe_release_buffers(conn);
    amqp_consume_message(conn, &e, NULL, 0);

    {
        int n;
        amqp_frame_t f;
        unsigned char buf[8];
        unsigned char *pbuf = buf;

        amqp_simple_wait_frame(conn, &f);       // METHOD frame
        amqp_simple_wait_frame(conn, &f);       // HEADER frame

        n = f.payload.properties.body_size;
        if (n != sizeof(range_buf))
            LOG_ERR(" [!] Invalid message size!");

        while (n) {
            amqp_simple_wait_frame(conn, &f);   // BODY frame
            memcpy(pbuf,
                   f.payload.body_fragment.bytes,
                   f.payload.body_fragment.len);
            n -= f.payload.body_fragment.len;
            pbuf += f.payload.body_fragment.len;
        }

        // do something with buf

        LOG_INFO(" [x] Message recevied from queue\n");
    }

    amqp_destroy_envelope(&e);

    amqp_maybe_release_buffers(conn);
}
4

2 回答 2

4

这里的问题很可能是您的消费者在启动时预取了所有消息。这是 RabbitMQ 的默认行为,但您可以减少消费者预取的消息数量,以便更好地将工作负载分散到多个工作人员之间。

这仅仅意味着一个或多个消费者将接收所有消息,而不会为新消费者留下任何消息。

如果您将 qos 应用于您的消费者并将预取限制为 10 条消息。消费者只会将前 10 条消息排队,新的消费者可以填补空缺。

您正在寻找实现此功能的函数称为amqp_basic_qos ,此外,您可以在此处阅读有关消费者预取的更多信息。

于 2014-08-06T13:24:13.097 回答
0

这可能会帮助你

消息确认

完成一项任务可能需要几秒钟。您可能想知道如果其中一个消费者开始一项长期任务并且只完成了部分任务而死去会发生什么。使用我们当前的代码,一旦 RabbitMQ 将消息传递给客户,它就会立即将其从内存中删除。在这种情况下,如果你杀死一个工人,我们将丢失它刚刚处理的消息。我们还将丢失所有发送给该特定工作人员但尚未处理的消息。

但是我们不想丢失任何任务。如果一个工人死亡,我们希望将任务交付给另一个工人。

为了确保消息永远不会丢失,RabbitMQ 支持消息确认。一个 ack(nowledgement) 从消费者发回,告诉 RabbitMQ 一个特定的消息已经被接收、处理并且 RabbitMQ 可以自由地删除它。

如果一个消费者在没有发送 ack 的情况下死亡,RabbitMQ 将理解一条消息没有被完全处理,并将其重新传递给另一个消费者。这样,即使工人偶尔死亡,您也可以确保不会丢失任何消息。

没有任何消息超时;RabbitMQ 只会在工作连接断开时重新传递消息。即使处理消息需要非常非常长的时间也没关系。

默认情况下启用消息确认。

于 2014-08-01T05:58:12.433 回答