我编写了一个 C (rabbitmq-c) worker 应用程序,它使用 Python 脚本 (pika) 发布的队列。
我有以下似乎无法解决的奇怪行为:
- 在消息发布到队列之前启动所有工作人员按预期工作
- 队列发布后启动 1 个工作人员按预期工作
- 但是:在工作人员开始从队列中消费之后启动额外的工作人员意味着这些工作人员看不到队列上的任何消息(消息计数 = 0),因此只需等待(即使队列上仍有许多消息)。杀死第一个工作人员会突然开始消息流向所有其他(等待的)消费者。
任何想法可能会发生什么?
我已经尝试确保每个消费者都有自己的频道(这是必要的吗?)但仍然是相同的行为......
这是消费者(工人)的代码:
conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
"/",
0,
131072,
0,
AMQP_SASL_METHOD_PLAIN,
"guest",
"guest");
if (amqp_channel_open(conn, chan) == NULL)
LOG_ERR(" [!] Failed to open amqp channel!\n");
if ((q = amqp_queue_declare(conn,
chan,
amqp_cstring_bytes("ranges"),
0,
0,
0,
0,
amqp_empty_table)) == NULL)
LOG_ERR(" [!] Failed to declare queue!\n");
LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);
amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
while(1) {
amqp_maybe_release_buffers(conn);
amqp_consume_message(conn, &e, NULL, 0);
{
int n;
amqp_frame_t f;
unsigned char buf[8];
unsigned char *pbuf = buf;
amqp_simple_wait_frame(conn, &f); // METHOD frame
amqp_simple_wait_frame(conn, &f); // HEADER frame
n = f.payload.properties.body_size;
if (n != sizeof(range_buf))
LOG_ERR(" [!] Invalid message size!");
while (n) {
amqp_simple_wait_frame(conn, &f); // BODY frame
memcpy(pbuf,
f.payload.body_fragment.bytes,
f.payload.body_fragment.len);
n -= f.payload.body_fragment.len;
pbuf += f.payload.body_fragment.len;
}
// do something with buf
LOG_INFO(" [x] Message recevied from queue\n");
}
amqp_destroy_envelope(&e);
amqp_maybe_release_buffers(conn);
}