我目前正在使用 spring AMQP API 连接到 RabbitMQ。基本上在我的消费者代码中,我正在异步读取消息并在弹性搜索中进行批量插入。当我执行 ack = AUTO 时,我的速度适中,为 400-500 msg/sec(从队列中读取)。当我做 ack = NONE 时,阅读速度的提高是巨大的,即达到 5000-6000 msg/sec。
配置如下:
- 具有 32GB RAM 的 Linux 机器
- JVM参数:
-server
-Xms1g -Xmx1g -Xss384k PermSize=256m MaxPermSize=256m
现在的问题是,当我在执行 ack = NONE 时,虽然我的速度很快,但 JVM 在一段时间后会出现 OutOfMemory,我可以看到在这种情况下发生了很多 GC。
我计划使用 Spring 集成中的 QueueChannel,我可以将通道的大小限制为它可以包含多少消息。
如何使用 RabbitMQ 实现这一点还有其他方法可以在不崩溃 JVM 的情况下实现 4000-5000 msg/sec 这样的良好读取速度吗?