9

假设我有一个AtomicReference对象列表:

AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());

线程 A将元素添加到此列表中:batch.get().add(o);

稍后,线程 B获取列表,例如,将其存储在数据库中:insertBatch(batch.get());

在写入(线程 A)和读取(线程 B)时,我是否必须进行额外的同步以确保线程 B 以 A 离开它的方式看到列表,或者这是否由 AtomicReference 处理?

换句话说:如果我有一个可变对象的 AtomicReference,并且一个线程更改了该对象,其他线程是否会立即看到此更改?

编辑:

也许一些示例代码是有序的:

public void process(Reader in) throws IOException {
    List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
    ExecutorService exec = Executors.newFixedThreadPool(4);

    for (int i = 0; i < 4; ++i) {
        tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
            @Override public AtomicReference<List<Object>> call() throws IOException {

                final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));

                Processor.this.parser.parse(in, new Parser.Handler() {
                    @Override public void onNewObject(Object event) {
                            batch.get().add(event);

                            if (batch.get().size() >= batchSize) {
                                dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
                            }
                    }
                });

                return batch;
            }
        }));
    }

    List<Object> remainingBatches = new ArrayList<Object>();

    for (Future<AtomicReference<List<Object>>> task : tasks) {
        try {
            AtomicReference<List<Object>> remainingBatch = task.get();
            remainingBatches.addAll(remainingBatch.get());
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();

            if (cause instanceof IOException) {
                throw (IOException)cause;
            }

            throw (RuntimeException)cause;
        }
    }

    // these haven't been flushed yet by the worker threads
    if (!remainingBatches.isEmpty()) {
        dao.insertBatch(remainingBatches);
    }
}

这里发生的是我创建了四个工作线程来解析一些文本(这是方法的Reader in参数process())。每个工作人员将其已解析的行保存在一个批处理中,并在批处理已满时刷新该批处理 ( dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));)。

由于文本中的行数不是批次大小的倍数,因此最后一个对象最终会出现在未刷新的批次中,因为它未满。因此,这些剩余的批次由主线程插入。

我用AtomicReference.getAndSet()一个空的替换整个批次。这个程序在线程方面是否正确?

4

4 回答 4

11

嗯……它真的不是这样工作的。AtomicReference保证引用本身在线程中是可见的,即如果您为其分配与原始引用不同的引用,则更新将可见。它不保证引用指向的对象的实际内容。

因此,对列表内容的读/写操作需要单独同步。

编辑:因此,从您更新的代码和您发布的评论来看,将本地引用设置volatile为足以确保可见性。

于 2012-02-21T13:22:14.683 回答
1

我认为,忘记这里的所有代码,你的确切问题是:

在写入(线程 A)和读取(线程 B)时,我是否必须进行额外的同步以确保线程 B 以 A 离开它的方式看到列表,或者这是否由 AtomicReference 处理?

因此,对此的确切回应是:的,原子处理可见性。这不是我的意见,而是JDK 文档之一

原子访问和更新的内存效应通常遵循 volatile 的规则,如 Java 语言规范第三版(17.4 内存模型)中所述。

我希望这有帮助。

于 2012-08-14T16:45:54.040 回答
0

AtomicReference只会帮助您参考列表,它不会对列表本身做任何事情。更具体地说,在您的场景中,当系统处于负载状态时,您几乎肯定会遇到问题,其中消费者已获取列表,而生产者正在向其添加项目。

在我看来,这听起来像是您应该使用BlockingQueue. 然后,如果您的生产者比您的消费者更快,您可以限制内存占用并让队列处理所有争用。

就像是:

ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (50);

// ... Producer
queue.put(o);

// ... Consumer
List<Object> queueContents = new ArrayList<Object> ();
// Grab everything waiting in the queue in one chunk. Should never be more than 50 items.
queue.drainTo(queueContents);

添加

感谢@Tudor 指出您正在使用的架构。...我不得不承认这很奇怪。据我所知,你根本不需要AtomicReference。每个线程都拥有自己的线程ArrayList,直到它被传递到dao它被替换的位置,因此在任何地方都没有争用。

我有点担心您在单个Reader. 我希望你有办法确保每个解析器不会影响其他解析器。

正如我在上面的代码中描述的那样,我个人会使用某种形式的生产者-消费者模式。可能是这样的。

static final int PROCESSES = 4;
static final int batchSize = 10;

public void process(Reader in) throws IOException, InterruptedException {

  final List<Future<Void>> tasks = new ArrayList<Future<Void>>();
  ExecutorService exec = Executors.newFixedThreadPool(PROCESSES);
  // Queue of objects.
  final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (batchSize * 2);
  // The final object to post.
  final Object FINISHED = new Object();

  // Start the producers.
  for (int i = 0; i < PROCESSES; i++) {
    tasks.add(exec.submit(new Callable<Void>() {
      @Override
      public Void call() throws IOException {

        Processor.this.parser.parse(in, new Parser.Handler() {
          @Override
          public void onNewObject(Object event) {
            queue.add(event);
          }
        });
        // Post a finished down the queue.
        queue.add(FINISHED);
        return null;
      }
    }));
  }

  // Start the consumer.
  tasks.add(exec.submit(new Callable<Void>() {
    @Override
    public Void call() throws IOException {
      List<Object> batch = new ArrayList<Object>(batchSize);
      int finishedCount = 0;
      // Until all threads finished.
      while ( finishedCount < PROCESSES ) {
        Object o = queue.take();
        if ( o != FINISHED ) {
          // Batch them up.
          batch.add(o);
          if ( batch.size() >= batchSize ) {
            dao.insertBatch(batch);
            // If insertBatch takes a copy we could merely clear it.
            batch = new ArrayList<Object>(batchSize);
          }
        } else {
          // Count the finishes.
          finishedCount += 1;
        }
      }
      // Finished! Post any incopmplete batch.
      if ( batch.size() > 0 ) {
        dao.insertBatch(batch);
      }
      return null;
    }
  }));

  // Wait for everything to finish.
  exec.shutdown();
  // Wait until all is done.
  boolean finished = false;
  do {
    try {
      // Wait up to 1 second for termination.
      finished = exec.awaitTermination(1, TimeUnit.SECONDS);
    } catch (InterruptedException ex) {
    }
  } while (!finished);
}
于 2012-02-21T16:41:51.277 回答
0

添加到Tudor的答案:您必须使ArrayList自身成为线程安全的,或者 - 根据您的要求 - 甚至更大的代码块。

如果您可以摆脱线程安全ArrayList,您可以像这样“装饰”它:

batch = java.util.Collections.synchronizedList(new ArrayList<Object>());

但请记住:即使是这样的“简单”构造也不是线程安全的:

Object o = batch.get(batch.size()-1);
于 2012-02-21T13:33:27.780 回答