0

我正在尝试使用 Flatbuffers 的 ActiveMQ。生产者的一切似乎都运行良好,但消费者的内存随着进程运行的时间不断增加。

生产者将消息标记为NON_PERSISTENT每秒发送约 30 次。每条消息都是一个字节消息,大约 3000 字节。

Producer.cpp

void Producer::send_message(uint8_t* pointer, size_t size) {
    auto msg = std::unique_ptr<cms::BytesMessage>(session->createBytesMessage(pointer, size));
    producer->send(msg.get());
}

void Producer::run() {
    try {
        std::unique_ptr <activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));

        connection.reset(connectionFactory->createConnection());
        session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
        destination.reset(session->createTopic(destURI));
        producer.reset(session->createProducer(destination.get()));
        producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
        connection->start();
    }
    catch (cms::CMSException& e) {
        e.printStackTrace();
    }
}

Consumer.cpp

void Consumer::onMessage(const cms::Message * message)
{
    try
    {
        const auto msg = dynamic_cast<const cms::BytesMessage*>(message);

        const auto data = msg->getBodyBytes();
        const auto size = msg->getBodyLength();
        flatbuffers::Verifier verifier((uint8_t*)(data), size);

        if (Ditto::VerifyDataBuffer(verifier)) {
            // Do something with the buffer
        }
    }
    catch (cms::CMSException& e) {
        e.printStackTrace();
    }
}

void Consumer::run()
{
    try {
        std::unique_ptr<activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));

        connection.reset(connectionFactory->createConnection());

        std::shared_ptr<activemq::core::ActiveMQConnection> amqConnection = std::dynamic_pointer_cast<activemq::core::ActiveMQConnection>(connection);
        if (amqConnection != nullptr) {
            amqConnection->addTransportListener(this);
        }

        connection->start();
        connection->setExceptionListener(this);
        session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
        destination.reset(session->createTopic(destURI));
        consumer.reset(session->createConsumer(destination.get()));
        consumer->setMessageListener(this);
    }
    catch (cms::CMSException& e) {
        e.printStackTrace();
        activemq::library::ActiveMQCPP::shutdownLibrary();
    }
}

然后,我打电话给Consumer

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    Consumer consumer("failover:(tcp://127.0.0.1:61616)", "Test-Topic");
    consumer.run();

    while (1) {}

    consumer.close();
    activemq::library::ActiveMQCPP::shutdownLibrary();
}

Consumer能够接收和处理消息。然而,记忆却在Consumer不断上升。运行 10 分钟后内存约为 200MB。在 CMS 概述中,他们提到传递给 的指针onMessage是调用所拥有的,所以我不应该尝试删除它。但是,调用者似乎从未删除该消息,这使得内存不断增加。

有什么方法可以在每次onMessage通话后释放消息的内存?

非常感谢您的时间和帮助。

4

1 回答 1

0

我想通了。

getBodyBytes()返回一个指向我应该在调用后清理的数组的指针。所以我只需要将它包装在 astd::unique_ptr中即可正确清理。

onMessage()应该是这样的:

void Consumer::onMessage(const cms::Message * message)
{
    try
    {
        const auto msg = dynamic_cast<const cms::BytesMessage*>(message);

        std::unique_ptr<unsigned char> data(msg->getBodyBytes());

        auto size = msg->getBodyLength();
        flatbuffers::Verifier verifier((uint8_t*)(data), size);

        if (Ditto::VerifyDataBuffer(verifier)) {
            // Do something with the buffer
        }
    }
    catch (cms::CMSException& e) {
        e.printStackTrace();
    }
}
于 2019-08-29T15:55:15.333 回答