3

我正在尝试找到实现以下 API 的好方法:

void add(Object o);
void processAndClear();

该类将存储对象,并在调用 processAndClear 时遍历当前存储的对象,以某种方式处理它们,然后清除存储。这个类应该是线程安全的。

显而易见的方法是使用锁定,但我想更加“并发”。这是我将使用的方法:

class Store{
    private AtomicReference<CopyOnWriteArrayList<Object>> store = new AtomicReference<>(new CopyOnWriteArrayList <>());

    void add(Object o){
        store.get().add(o);
    }

    void processAndClear(){
        CopyOnWriteArrayList<Object> objects = store.get();
        store.compareAndSet(objects, new CopyOnWriteArrayList<>());
        for (Object object : objects) {
            //do sth
        }
    }
}

这将允许尝试添加对象的线程几乎立即继续进行,而无需任何锁定/等待 xlearing 完成。这是或多或少正确的方法吗?

4

2 回答 2

4

您上面的代码不是线程安全的。想象一下:

  1. add()线程 A 在之后被搁置store.get()
  2. 线程 B 在 中processAndClear(),替换列表,处理旧列表的所有元素,然后返回。
  3. 线程 A 恢复并将新项目添加到现在已过时的列表中,该列表将永远不会被处理。

这里可能最简单的解决方案是使用LinkedBlockingQueue,这也可以大大简化任务:

class Store{
  final LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();

  void add(final Object o){
    queue.put(o); // blocks until there is free space in the optionally bounded queue
  }

  void processAndClear(){
    Object element;
    while ((element = queue.poll()) != null) { // does not block on empty list but returns null instead
      doSomething(element);
    }
  }
}

编辑:如何做到这一点synchronized

class Store{
  final LinkedList<Object> queue = new LinkedList<>(); // has to be final for synchronized to work

  void add(final Object o){
    synchronized(queue) { // on the queue as this is the shared object in question
      queue.add(o);
    }
  }

  void processAndClear() {
    final LinkedList<Object> elements = new LinkedList<>(); // temporary local list
    synchronized(queue) { // here as well, as every access needs to be properly synchronized
      elements.addAll(queue);
      queue.clear();
    }

    for (Object e : elements) { 
      doSomething(e); // this is thread-safe as only this thread can access these now local elements
    }
  }
}

为什么这不是一个好主意

尽管这是线程安全的,但与并发版本相比,它要慢得多。假设您有一个系统有 100 个经常调用的线程add,而一个线程调用processAndClear。那么就会出现以下性能瓶颈:

  • 如果一个线程调用add其他 99 个线程同时被搁置。
  • processAndClear所有 100 个线程的第一部分被搁置。

如果您假设这 100 个添加线程无事可做,您可以很容易地证明,应用程序的运行速度与单线程应用程序的运行速度相同,但减去了同步成本。这意味着:100 个线程的添加实际上会比 1 个线程慢。如果您使用第一个示例中的并发列表,则情况并非如此。

然而,处理线程的性能会略有提升,因为doSomething可以在添加新元素时在旧元素上运行。但是,并发示例可能会更快,因为您可以让多个线程同时进行处理。

Effectivelysynchronized也可以使用,但是你会自动引入性能瓶颈,可能导致应用程序作为单线程运行速度较慢,迫使你进行复杂的性能测试。此外,扩展功能总是存在引入线程问题的风险,因为锁定需要手动完成。
相比之下,并发列表无需额外代码即可解决所有这些问题,并且代码可以在以后轻松更改或扩展。

于 2014-03-05T15:28:16.063 回答
1

该类将存储对象,并在调用 processAndClear 时遍历当前存储的对象,以某种方式处理它们,然后清除存储。

这似乎您应该使用 aBlockingQueue来完成此任务。您的add(...)方法将添加到队列中,您的消费者将调用take()等待下一个项目的块。(是一个典型的实现)为你处理所有的同步和信令BlockingQueueArrayBlockingQueue

这意味着您不必拥有 aCopyOnWriteArrayList或 a AtomicReference。您将失去的是一个集合,并且您可以出于其他原因进行迭代,而不是您当前的帖子阐明。

于 2014-03-05T15:20:53.843 回答