我有许多线程将使用来自代理的消息并处理它们。每条消息都是 XML,除其他元素外,还包含一个字母数字<itemId>WI354DE48</itemId>
元素,该元素用作要“处理”的项目的唯一 ID。由于我无法控制或更改的标准,项目/消息可能会在这些线程正在从中消费的代理队列上复制。所以同一个项目(ID 为 WI354DE48)可能只被发送到队列一次,或者它可能被发送 100 次。无论如何,我只能允许该项目被处理一次;所以我需要一种方法来防止线程 A 处理线程 B 已经处理的重复项。
我正在寻找一个简单的线程安全列表,它可以由所有线程(工作人员)共享,以充当缓存机制。每个线程都将被赋予相同的 a 实例List<String>
。当每个工作线程消费一条消息时,它会检查itemId
(a String) 是否存在于列表中。如果没有,则没有其他工作人员处理过该项目。在这种情况下,将itemID
添加到列表中(锁定/缓存它),然后处理该项目。如果itemId
列表中已经存在,那么另一个工作人员已经处理了该项目,所以我们可以忽略它。简单,但有效。
显然,拥有一个线程安全的列表实现是至关重要的。请注意,我们将在此列表中调用的唯一两种方法是:
List#contains(String)
- 遍历/搜索列表List#add(String)
- 改变列表
...重要的是要注意,我们将以大致相同的频率调用这两种方法。很少会contains()
返回true
并阻止我们需要add
ID。
我最初认为这CopyOnWriteArrayList
是我最好的选择,但在阅读了 Javadocs 之后,似乎每个工作人员最终都会得到列表的自己的线程本地副本,这不是我想要的。然后我调查了Collections.synchronizedList(new ArrayList<String>)
,这似乎是一个不错的选择:
List<String> processingCache = Collection.synchronizedList(new ArrayList<String>());
List<Worker> workers = getWorkers(processingCache); // Inject the same list into all workers.
for(Worker worker : workers)
executor.submit(worker);
// Inside each Worker's run method:
@Override
public void run() {
String itemXML = consumeItemFromBroker();
Item item = toItem(itemXML);
if(processingCache.contains(item.getId())
return;
else
processingCache.add(item.getId());
... continue processing.
}
我在轨道上Collections.synchronizedList(new ArrayList<String>)
,还是离基地很远?给定我的用例,是否有更有效的线程安全List
impl,如果有,为什么?