1

我正在尝试使用 librabbitmq 编写一个简单的消费者。它正在工作,但是当我执行 amqp_basic_consume 时,它​​会消耗整个队列。我想要的是它得到一条消息,处理它并重复。

我尝试使用 basic_qos 一次让消费者预取 1,但这似乎根本没有效果。

基本设置和循环: // 一次设置 1 条消息的 qos if (!amqp_basic_qos(conn, channel, 0, 1, 0)) { die_on_amqp_error(amqp_get_rpc_reply(conn), "basic.qos"); }

// Consuming the message
amqp_basic_consume(conn, channel, queue, amqp_empty_bytes, no_local, no_ack, exclusive, amqp_empty_table);

while (run) {
    amqp_rpc_reply_t result;
    amqp_envelope_t envelope;

    amqp_maybe_release_buffers(conn);
    result = amqp_consume_message(conn, &envelope, &timeout, 0);

    if (AMQP_RESPONSE_NORMAL == result.reply_type) {

        strncpy(message, envelope.message.body.bytes, envelope.message.body.len);
        message[envelope.message.body.len] = '\0';

        printf("Received message size: %d\nbody: -%s-\n", (int) envelope.message.body.len, message );

        if ( strncmp(message, "DONE",4 ) == 0 )
        {
            printf("XXXXXXXXXXXXXXXXXX Cease message received. XXXXXXXXXXXXXXXXXXXXX\n");
            run = 0;
        }
        amqp_destroy_envelope(&envelope);
    }else{
         printf("Timeout.\n");
         run = 0;
    }
}

我希望有一个队列被填满,我可以开始处理,如果我点击 ^C,剩余的消息仍在队列中。相反,即使我只处理了一条消息,整个队列也会被清空。

4

1 回答 1

0

noAck是 true 时的行为。将会发生的情况是,消息将在代理可以发送消息时尽快推送到连接的消费者,因为它假设消费者能够接受它们,因为它们在传递后立即得到确认。

在这种情况下,您可能希望更改noAck为 false,然后将ack每条消息显式返回给代理。

或者,您可以使用 a 一次basic.get从代理中提取一条消息,而不是使用基于推送的消费者(有些人不喜欢这个想法)。您的用例将决定什么是最合适的,但基于您似乎有一个完整的队列和相当密集的消息这一事实,我认为basic.get在这种情况下 a 会很好。那么问题将是决定当队列为空时轮询的频率。

于 2019-01-27T17:31:26.987 回答