13

我有一个多线程应用程序写入和读取 ConcurrentLinkedQueue,它在概念上用于支持列表/表中的条目。我最初为此使用了 ConcurrentHashMap,效果很好。一项新要求要求跟踪输入的订单条目,因此可以根据某些条件以最旧的第一顺序删除它们。ConcurrentLinkedQueue 似乎是一个不错的选择,而且它在功能上运行良好。

可配置数量的条目保存在内存中,当达到限制时提供新条目时,将按最早的优先顺序在队列中搜索可以删除的条目。某些条目不会被系统删除并等待客户端交互。

似乎正在发生的事情是我在发生的队列前面有一个条目,比如 100K 条目之前。队列似乎配置的条目数量有限(size() == 100),但在分析时,我发现内存中有 ~100K ConcurrentLinkedQueue$Node 对象。这似乎是设计使然,只需查看 ConcurrentLinkedQueue 的源代码,remove 只会删除对正在存储的对象的引用,但会将链表留在原处以进行迭代。

最后我的问题:有没有一种“更好”的懒惰方式来处理这种性质的集合?我喜欢 ConcurrentLinkedQueue 的速度,我无法承受在这种情况下似乎可能出现的无限泄漏。如果没有,似乎我必须创建第二个结构来跟踪订单并且可能有相同的问题,加上同步问题。

4

3 回答 3

11

这里实际发生的是 remove 方法准备一个轮询线程以使链接引用无效。

ConcurrentLinkedQueue 是一个非阻塞线程安全的队列实现。但是,当您尝试从队列中轮询节点时,它是一个有两个功能的过程。首先将值设为空,然后将引用设为空。CAS 操作是单个原子函数,不会为轮询提供即时解决方案。

当您轮询时会发生什么,第一个成功的线程将获取节点的值并将该值设为空,然后该线程将尝试将引用设为空。然后可能会有另一个线程进入并尝试从队列中轮询。为确保此队列具有非阻塞属性(即一个线程的失败不会导致另一个线程的失败),新的传入线程将查看该值是否为 null,如果为 null,则该线程将使引用为空并尝试再次轮询()。

因此,您在这里看到的是删除线程只是准备任何新的轮询线程以使引用为空。我认为尝试实现非阻塞删除功能几乎是不可能的,因为这需要三个原子功能。null 值的 null 引用所述节点,最后是从该节点的父节点到其后继节点的新引用。

回答你的最后一个问题。不幸的是,没有更好的方法来实现删除和维护队列的非阻塞状态。至少在这一点上是这样。一旦处理器开始使用 2 路和 3 路外壳,那么这是可能的。

于 2010-03-30T18:24:41.933 回答
1

查看 1.6.0_29 的源代码,似乎修改了 CLQ 的迭代器以尝试删除具有空项的节点。代替:

p = p.getNext();

现在的代码是:

Node<E> next = succ(p);
if (pred != null && next != null)
    pred.casNext(p, next);
p = next; 

这是作为错误修复的一部分添加的:http ://bugs.sun.com/view_bug.do?bug_id=6785442

事实上,当我尝试以下操作时,我得到了旧版本的 OOME,但没有新版本:

Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i=0; i<10000; i++)
{
    for (int j=0; j<100000; j++)
    {
        queue.add(j);
    }
    boolean start = true;
    for (Iterator<Integer> iter = queue.iterator(); iter.hasNext(); )
    {
        iter.next();
        if (!start)
            iter.remove();
        start = false;
    }
    System.out.println(i);
}
于 2012-01-09T10:46:03.977 回答
1

队列的主要语义是 add/poll。如果您在ConcurrentLinkedQueue上使用poll(),它将被清理干净。根据您的描述,poll()应该让您删除最旧的条目。为什么不使用它而不是remove()

于 2010-03-30T18:25:37.823 回答