假设我有一个AtomicReference
对象列表:
AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());
线程 A将元素添加到此列表中:batch.get().add(o);
稍后,线程 B获取列表,例如,将其存储在数据库中:insertBatch(batch.get());
在写入(线程 A)和读取(线程 B)时,我是否必须进行额外的同步以确保线程 B 以 A 离开它的方式看到列表,或者这是否由 AtomicReference 处理?
换句话说:如果我有一个可变对象的 AtomicReference,并且一个线程更改了该对象,其他线程是否会立即看到此更改?
编辑:
也许一些示例代码是有序的:
public void process(Reader in) throws IOException {
List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
ExecutorService exec = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; ++i) {
tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
@Override public AtomicReference<List<Object>> call() throws IOException {
final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));
Processor.this.parser.parse(in, new Parser.Handler() {
@Override public void onNewObject(Object event) {
batch.get().add(event);
if (batch.get().size() >= batchSize) {
dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
}
}
});
return batch;
}
}));
}
List<Object> remainingBatches = new ArrayList<Object>();
for (Future<AtomicReference<List<Object>>> task : tasks) {
try {
AtomicReference<List<Object>> remainingBatch = task.get();
remainingBatches.addAll(remainingBatch.get());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
}
throw (RuntimeException)cause;
}
}
// these haven't been flushed yet by the worker threads
if (!remainingBatches.isEmpty()) {
dao.insertBatch(remainingBatches);
}
}
这里发生的是我创建了四个工作线程来解析一些文本(这是方法的Reader in
参数process()
)。每个工作人员将其已解析的行保存在一个批处理中,并在批处理已满时刷新该批处理 ( dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
)。
由于文本中的行数不是批次大小的倍数,因此最后一个对象最终会出现在未刷新的批次中,因为它未满。因此,这些剩余的批次由主线程插入。
我用AtomicReference.getAndSet()
一个空的替换整个批次。这个程序在线程方面是否正确?