10

我有以下代码来声明一个队列:

Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), true,consumer);

以及以下内容以获取下一个 Delivery 对象并对其进行处理:

    Delivery delivery = null;
    T queue = null;

    //loop over, continuously retrieving messages
    while(true) {

        try {
            delivery = consumer.nextDelivery();
            queue = deserialise(delivery.getBody());

            process(queue);

        } catch (ShutdownSignalException e) {
            logger.warn("Shutodwon signal received.");
            break;
        } catch (ConsumerCancelledException e) {
            logger.warn("Consumer cancelled exception: {}",e.getMessage());
            break;
        } catch (InterruptedException e) {
            logger.warn("Interuption exception: {}", e);
            break;
        }
    }

反序列化代码。如您所见,我正在使用 Kryo:

public T deserialise(byte[] body) {
    Kryo kryo= new Kryo();
    Input input = new Input(body);
    T deserialised = kryo.readObject(input, getQueueClass());
    input.close();

    return deserialised;
}

如果我使用包含大量对象的队列运行此程序,则在大约 270 万个对象之后,我会遇到内存不足异常。我最初是通过在夜间运行它来发现这一点的,数据从 JMeter 以大约 90/s 的速度输入,起初它毫无问题地消耗,但早上我注意到 RabbitMQ 中有大量数据并且内存不足异常消费者。我再次运行它并使用 Eclipse Memory Analyzer 来确定该内存的使用位置。从这里我可以看到 com.rabbitmq.client.QueueingConsumer 引用的 java.util.concurrent.LinkedBlockingQueue 正在增长,直到内存不足。

我需要做些什么来告诉 Rabbit 释放资源吗?

我可以增加堆大小,但我担心这只是一个短期修复,我的代码中可能有一些东西可能会在生产部署几个月后出现内存泄漏。

4

4 回答 4

7

我的错误是我将频道设置为自动确认。这意味着来自 Rabbit 的每条消息都得到了确认(确认为已收到)。我已经通过将通道声明为不自动确认来修复(并测试)了这个问题: channel.basicConsume(getQueueName(), false,consumer);并且在我处理队列之后,然后我确认消息:consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);

这就是我的队列声明现在的样子:

        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(getQueueName(), false, false, false, null);
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(getQueueName(), false,consumer);

以及处理队列的以下内容:

    Delivery delivery = null;
    T queue = null;

    //loop over, continuously retrieving messages
    while(true) {

        try {
            delivery = consumer.nextDelivery();
            queue = deserialise(delivery.getBody());
            process(queue);
            consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        } catch (ShutdownSignalException e) {
            logger.warn("Shutodwon signal received.");
            break;
        } catch (ConsumerCancelledException e) {
            logger.warn("Consumer cancelled exception: {}",e.getMessage());
            break;
        } catch (InterruptedException e) {
            logger.warn("Interuption exception: {}", e);
            break;
        } catch (IOException e) {
            logger.error("Could not ack message: {}",e);
            break;
        }
    }

我现在可以在 RabbitMQ 管理屏幕中看到消息正在以非常高的速率传递,但它们并没有以这种速率被确认。如果我然后杀死我的消费者,在大约 30 秒内,所有那些未确认的消息都将移回就绪队列。我将进行的改进之一是设置 basicQos 值: channel.basicQos(10);这样就不会传递太多消息但未确认。这是可取的,因为这意味着我可以在同一个队列中启动另一个消费者并开始处理队列,而不是全部结束在内存中未确认且对其他消费者不可用。

于 2012-10-08T12:04:17.167 回答
2

解决方法是设置 basicQos - channel.basicQos(2);。我的频道声明现在看起来像这样:

        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(getQueueName(), false, false, false, null);
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(getQueueName(), true,consumer);
        channel.basicQos(2);

将 basicQos 设置为 2 意味着仅在内部存储器中保留 2 条消息。有关使用 CoDel 算法的更多信息和有趣的讨论,请参阅http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/

于 2012-10-02T10:59:34.600 回答
1

问题似乎是您的消费者无法跟上您的生产者,导致您的队列无限制地增长。您需要限制队列的大小并在达到限制时减慢生产者的速度。我还会考虑优化您的消费者,使其跟上。

于 2012-10-02T09:06:35.347 回答
1

这可能是对象被消耗后没有被销毁的问题。你能显示反序列化的代码吗?我怀疑您正在通过队列发送对象并使用某种对象输入流/字节数组输入流对它们进行反序列化。如果您没有正确关闭可能导致内存泄漏的流。

于 2012-10-02T09:53:59.830 回答