我正在尝试使用 librabbitmq 编写一个简单的消费者。它正在工作,但是当我执行 amqp_basic_consume 时,它会消耗整个队列。我想要的是它得到一条消息,处理它并重复。
我尝试使用 basic_qos 一次让消费者预取 1,但这似乎根本没有效果。
基本设置和循环: // 一次设置 1 条消息的 qos if (!amqp_basic_qos(conn, channel, 0, 1, 0)) { die_on_amqp_error(amqp_get_rpc_reply(conn), "basic.qos"); }
// Consuming the message
amqp_basic_consume(conn, channel, queue, amqp_empty_bytes, no_local, no_ack, exclusive, amqp_empty_table);
while (run) {
amqp_rpc_reply_t result;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
result = amqp_consume_message(conn, &envelope, &timeout, 0);
if (AMQP_RESPONSE_NORMAL == result.reply_type) {
strncpy(message, envelope.message.body.bytes, envelope.message.body.len);
message[envelope.message.body.len] = '\0';
printf("Received message size: %d\nbody: -%s-\n", (int) envelope.message.body.len, message );
if ( strncmp(message, "DONE",4 ) == 0 )
{
printf("XXXXXXXXXXXXXXXXXX Cease message received. XXXXXXXXXXXXXXXXXXXXX\n");
run = 0;
}
amqp_destroy_envelope(&envelope);
}else{
printf("Timeout.\n");
run = 0;
}
}
我希望有一个队列被填满,我可以开始处理,如果我点击 ^C,剩余的消息仍在队列中。相反,即使我只处理了一条消息,整个队列也会被清空。