我正在尝试使用 Flatbuffers 的 ActiveMQ。生产者的一切似乎都运行良好,但消费者的内存随着进程运行的时间不断增加。
生产者将消息标记为NON_PERSISTENT
每秒发送约 30 次。每条消息都是一个字节消息,大约 3000 字节。
Producer.cpp
void Producer::send_message(uint8_t* pointer, size_t size) {
auto msg = std::unique_ptr<cms::BytesMessage>(session->createBytesMessage(pointer, size));
producer->send(msg.get());
}
void Producer::run() {
try {
std::unique_ptr <activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));
connection.reset(connectionFactory->createConnection());
session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
destination.reset(session->createTopic(destURI));
producer.reset(session->createProducer(destination.get()));
producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
connection->start();
}
catch (cms::CMSException& e) {
e.printStackTrace();
}
}
Consumer.cpp
void Consumer::onMessage(const cms::Message * message)
{
try
{
const auto msg = dynamic_cast<const cms::BytesMessage*>(message);
const auto data = msg->getBodyBytes();
const auto size = msg->getBodyLength();
flatbuffers::Verifier verifier((uint8_t*)(data), size);
if (Ditto::VerifyDataBuffer(verifier)) {
// Do something with the buffer
}
}
catch (cms::CMSException& e) {
e.printStackTrace();
}
}
void Consumer::run()
{
try {
std::unique_ptr<activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));
connection.reset(connectionFactory->createConnection());
std::shared_ptr<activemq::core::ActiveMQConnection> amqConnection = std::dynamic_pointer_cast<activemq::core::ActiveMQConnection>(connection);
if (amqConnection != nullptr) {
amqConnection->addTransportListener(this);
}
connection->start();
connection->setExceptionListener(this);
session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
destination.reset(session->createTopic(destURI));
consumer.reset(session->createConsumer(destination.get()));
consumer->setMessageListener(this);
}
catch (cms::CMSException& e) {
e.printStackTrace();
activemq::library::ActiveMQCPP::shutdownLibrary();
}
}
然后,我打电话给Consumer
:
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
Consumer consumer("failover:(tcp://127.0.0.1:61616)", "Test-Topic");
consumer.run();
while (1) {}
consumer.close();
activemq::library::ActiveMQCPP::shutdownLibrary();
}
Consumer
能够接收和处理消息。然而,记忆却在Consumer
不断上升。运行 10 分钟后内存约为 200MB。在 CMS 概述中,他们提到传递给 的指针onMessage
是调用所拥有的,所以我不应该尝试删除它。但是,调用者似乎从未删除该消息,这使得内存不断增加。
有什么方法可以在每次onMessage
通话后释放消息的内存?
非常感谢您的时间和帮助。