我有以下代码来声明一个队列:
Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), true,consumer);
以及以下内容以获取下一个 Delivery 对象并对其进行处理:
Delivery delivery = null;
T queue = null;
//loop over, continuously retrieving messages
while(true) {
try {
delivery = consumer.nextDelivery();
queue = deserialise(delivery.getBody());
process(queue);
} catch (ShutdownSignalException e) {
logger.warn("Shutodwon signal received.");
break;
} catch (ConsumerCancelledException e) {
logger.warn("Consumer cancelled exception: {}",e.getMessage());
break;
} catch (InterruptedException e) {
logger.warn("Interuption exception: {}", e);
break;
}
}
反序列化代码。如您所见,我正在使用 Kryo:
public T deserialise(byte[] body) {
Kryo kryo= new Kryo();
Input input = new Input(body);
T deserialised = kryo.readObject(input, getQueueClass());
input.close();
return deserialised;
}
如果我使用包含大量对象的队列运行此程序,则在大约 270 万个对象之后,我会遇到内存不足异常。我最初是通过在夜间运行它来发现这一点的,数据从 JMeter 以大约 90/s 的速度输入,起初它毫无问题地消耗,但早上我注意到 RabbitMQ 中有大量数据并且内存不足异常消费者。我再次运行它并使用 Eclipse Memory Analyzer 来确定该内存的使用位置。从这里我可以看到 com.rabbitmq.client.QueueingConsumer 引用的 java.util.concurrent.LinkedBlockingQueue 正在增长,直到内存不足。
我需要做些什么来告诉 Rabbit 释放资源吗?
我可以增加堆大小,但我担心这只是一个短期修复,我的代码中可能有一些东西可能会在生产部署几个月后出现内存泄漏。