1

我正在一个项目队列上启动一个长时间运行的进程,当一个项目被安排处理或正在处理时,我想禁止一些其他操作。我的代码基本上是这样的:

public class LongRunningProcess extends Thread {
    private final ConcurrentLinkedQueue<Item> pending = new ConcurrentLinkedQueue<>();
    private final Set<Item> active = Collections.newSetFromMap(new ConcurrentHashMap<Item, Boolean>());

    public LongRunningProcess() {
        // add items to pending; no more items will ever be added
    }

    @Override
    public void run() {
        while (! pending.isEmpty()) {
            // The peek/add/remove pattern here is important. The number
            // of items that are active or scheduled is always decreasing.
            // Because isScheduled checks pending before checking active,
            // this order of operations ensures that we never miss an item
            // as it is being switched from one collection to the other.
            Item nextItem = pending.peek();
            active.add(nextItem);    // <---Can any of these get reordered?
            pending.remove();        // <---+
            processItem(nextItem);   // <---+
            active.remove(nextItem); // <---+
        }
    }

    public boolean isScheduled(Item item) {
        return pending.contains(item) || active.contains(item);
    }
}

这会按我预期的方式工作,还是可以重新排序上面突出显示的代码块?你能指点我任何相关的规格吗?

编辑:

@Banthar 的有用评论让我找到了java.util.concurrent 包文档,它明确地回答了我的问题:

所有类java.util.concurrent及其子包中的方法都将这些保证扩展到更高级别的同步。尤其是:

  • 在将对象放入任何并发集合之前线程中的操作发生在另一个线程中从集合中访问或删除该元素之后的操作。
4

1 回答 1

1

这会按我预期的方式工作,还是可以对上面两个突出显示的项目中的任何一个进行重新排序?你能指点我任何相关的规格吗?

简短的回答是,因为这两个集合都是并发类,所以active.add(...)pending.remove().

  • pending.peek();pending.remove();访问volatile字段head。_

    private transient volatile Node<E> head = new Node<E>(null);
    
  • 访问内部active.add(nextItem);锁定volatile字段:

    compareAndSetState(0, acquires)) {
    

因为您的两个集合都是并发类,所以它们都具有内部锁或volatile变量,因此方法调用具有读/写内存屏障,可确保“发生在之前”的保证。这确保了操作不会因为Java 内存模型而被重新排序。

但是,这并不意味着您的逻辑是正确的,或者当您查看其他线程如何使用这两个集合时不存在竞争条件。此外,这些调用不是原子的,因此您可以有 3 个线程来执行以下操作:

  1. t1 -- 项目 nextItem = pending.peek();
  2. t2 -- 项目 nextItem = pending.peek();
  3. t1 -- active.add(nextItem);
  4. t3 - 从活动中删除下一个项目并处理它或其他东西
  5. t2 -- active.add(nextItem);
  6. t3 -- 从活动中删除 nextItem 并再次处理它
于 2013-08-22T20:17:22.013 回答