0

我在生产者-消费者场景中使用 ConcurrentLinkedQueue。我的生产者是我的应用程序中所有方法调用的单例:Producer.getInstance().add("foo"); 并且 add() 方法调用 ConcurrentLinkedQueue 提供方法。

public void add(String message) {
    myQueue.offer(message);
}

否则,我让我的消费者在另一个线程中运行,并简单地在生产者内部的 ConcurrentLinkedQueue 上调用 poll 方法。

编辑:

在 if ((buffer = myQueue.poll()) != null) { } 之间添加代码

CRActiveMQProducer 是一个 Singleton,它初始化与我的 ActiveMQ 服务器的连接并使用 send() 方法发送消息。

private StringBuffer stringBuffer = new StringBuffer();

public void run() {
    while(condition) {
        String buffer = null;
        if ((buffer = myQueue.poll()) != null) {
            stringBuffer.append(buffer);
            numberMessage++;
            if (numberMessage >= 10000) {
                CRActiveMQProducer.getInstance().send(stringBuffer.toString());
                stringBuffer = stringBuffer.delete(0, stringBuffer.length());
                numberMessage = 0L;
            }
        }
    }
} 

我调用了 Producer add() 方法 5000 万次(是的,它很大,但它只是应该完成的调用次数的 2.5%)

无论如何,我得到了 OutOfMemory 异常,我尝试使用 VisualVM 读取堆转储,我发现这个 OOM 是由大量 ConcurrentLinkedQueue$Node 实例(超过 3000 万个)引起的。我想我对每个 offer() 或 poll() 方法调用都有一个新节点,但不是 100% 确定(无法加载完整的堆转储......)。

您认为这是 ConcurrentLinkedQueue 的正常行为吗?或者只是我做错了什么?谢谢!

4

2 回答 2

0

情侣建议:

  1. 为您的应用程序添加更多内存。尝试类似的东西-Xmx2g
  2. 尝试添加额外的线程以从队列中读取
  3. 尽可能快地“用缓冲区做某事”
  4. 将“使用缓冲区做一些事情”分拆到它自己的线程中(可能使用线程池),以防止它阻塞从队列中读取更多内容

您是否对阅读器服务中的所有内容进行了计时,以查看需要多长时间?如果您确实将事物放入队列的速度比阅读它们的速度要快,那么您需要找到加快阅读器速度的方法。您的内存不足只是真正问题的副作用。问题是您没有足够快地处理排队的工作。

于 2012-07-11T00:22:54.743 回答
0

队列显然必须存储您放入队列中的所有元素,并且它使用彼此链接的节点来做到这一点。这就是链接队列的原理。您的生产者生产itels 太快,而消费者没有时间消费它们,因此最终您会遇到OOM。

您应该考虑使用绑定的 BlockingQueue:它会在队列包含太多元素时强制生产者线程阻塞,从而避免 OOM。

于 2012-07-10T22:01:53.857 回答