1

我有许多线程将使用来自代理的消息并处理它们。每条消息都是 XML,除其他元素外,还包含一个字母数字<itemId>WI354DE48</itemId>元素,该元素用作要“处理”的项目的唯一 ID。由于我无法控制或更改的标准,项目/消息可能会在这些线程正在从中消费的代理队列上复制。所以同一个项目(ID 为 WI354DE48)可能只被发送到队列一次,或者它可能被发送 100 次。无论如何,我只能允许该项目被处理一次;所以我需要一种方法来防止线程 A 处理线程 B 已经处理的重复项。

我正在寻找一个简单的线程安全列表,它可以由所有线程(工作人员)共享,以充当缓存机制。每个线程都将被赋予相同的 a 实例List<String>。当每个工作线程消费一条消息时,它会检查itemId(a String) 是否存在于列表中。如果没有,则没有其他工作人员处理过该项目。在这种情况下,将itemID添加到列表中(锁定/缓存它),然后处理该项目。如果itemId列表中已经存在,那么另一个工作人员已经处理了该项目,所以我们可以忽略它。简单,但有效。

显然,拥有一个线程安全的列表实现是至关重要的。请注意,我们将在此列表中调用的唯一两种方法是:

  • List#contains(String)- 遍历/搜索列表
  • List#add(String)- 改变列表

...重要的是要注意,我们将以大致相同的频率调用这两种方法。很少会contains()返回true并阻止我们需要addID。

我最初认为这CopyOnWriteArrayList是我最好的选择,但在阅读了 Javadocs 之后,似乎每个工作人员最终都会得到列表的自己的线程本地副本,这不是我想要的。然后我调查了Collections.synchronizedList(new ArrayList<String>),这似乎是一个不错的选择:

List<String> processingCache = Collection.synchronizedList(new ArrayList<String>());
List<Worker> workers = getWorkers(processingCache); // Inject the same list into all workers.
for(Worker worker : workers)
    executor.submit(worker);

// Inside each Worker's run method:
@Override
public void run() {
    String itemXML = consumeItemFromBroker();
    Item item = toItem(itemXML);

    if(processingCache.contains(item.getId())
        return;
    else
        processingCache.add(item.getId());

    ... continue processing.
}

我在轨道上Collections.synchronizedList(new ArrayList<String>),还是离基地很远?给定我的用例,是否有更有效的线程安全Listimpl,如果有,为什么?

4

1 回答 1

1

Collections.synchronizedList非常基本,它只是将所有方法标记为synchronized.

这将起作用,但仅在某些特定假设下有效,即您永远不会对 进行多次访问List,即

if(!list.contains(x))
    list.add(x);

不是线程安全的,因为监视器在两个调用之间被释放。

如果您有很多读取和少量写入,它也可能会有点慢,因为所有线程都获得了一个独占锁。

您可以查看java.util.concurrent包中的实现,有几个选项。

我建议使用ConcurrentHashMap带有虚拟值的 a 。

推荐的原因是ConcurrentHashMap已经同步了密钥组,所以如果你有一个好的散列算法(并且String确实),你实际上可以获得大量的并发吞吐量。

我更喜欢这个,ConcurrentSkipListSet因为它不能保证订购,因此你会失去那个开销。

当然,对于线程来说,瓶颈在哪里从来都不是很明显,所以我建议两者都尝试一下,看看哪个能给你更好的性能。

于 2013-11-08T17:31:14.077 回答