2

我目前正在使用 ExecutorService 在 CopyOnWrite ArrayLists 中发送批量字符串以并行处理,其中处理这些列表的 Runnable 任务需要遍历列表并对每个字符串进行处理。

在遇到常规 ArrayLists 的并发问题后,我尝试使用 CopyOnWriteArrayLists,因为它们是线程安全的,但是我的结果现在不一致。也就是说,我每次运行程序都会得到不同的结果,这表明在每个 Runnable 任务可以完全迭代它之前,arraylist 的内容会以某种方式发生变化。

public static class BatchRunnable implements Runnable {

    private CopyOnWriteArrayList<String> batch;

    BatchRunnable(CopyOnWriteArrayList<String> batch){
        this.batch = batch;
    }

    @Override
    public void run(){
        //iterate over batch and work with String elements
        //make no modifications to batch
    }
}
  • 可运行任务不对数组列表进行任何修改,它只遍历列表并使用列表的字符串元素进行处理。

  • CopyOnWriteArrayList 更改的唯一位置是在每个新的 Runnable 任务的实例化处。

当我传入单个字符串而不是批处理时,我得到了一致且正确的结果,但是当我开始在 String ArrayLists 中使用批处理时,我得到了不一致的结果,这表明有些东西正在损害 CopyOnWriteArrayList 批处理的并发性,尽管它被认为是线程-安全的。

任何帮助表示赞赏,谢谢!

编辑:这是我的批次正在构建的地方:

        Runnable worker = null;
        while((line = br.readLine()) != null) {
            recordBatch.add(line);
            if(recordBatch.size() == 100){
                worker = new BatchRunnable(recordBatch);
                executor.execute(worker);
                recordBatch.clear();
            }

        }           
        executor.shutdown();
        executor.awaitTermination(60,TimeUnit.SECONDS);  
4

4 回答 4

2

查看您的while循环:

while((line = br.readLine()) != null) {
        recordBatch.add(line);
        if(recordBatch.size() == 100){
            worker = new BatchRunnable(recordBatch);
            executor.execute(worker);
            recordBatch.clear();
        }

    }  

list在所有BatchRunnable. 因此,只要您list在一处更改,它将反映在所有参考中。因此,一旦您使用 清除列表recordBatch.clear(),该列表对于所有引用都是空的,即使是您在 中的引用也是如此BatchRunnable。这就是为什么你得到不一致的结果。

copy你应该在你的recordBatch列表中传递一个BatchRunnable

worker = new BatchRunnable(new ArrayList<String>(recordBatch));
于 2013-07-23T07:19:18.057 回答
1

将批次传递给BatchRunnable.

 worker = new BatchRunnable(recordBatch);
 executor.execute(worker);
 recordBatch.clear(); // You clear all the list

因此,执行程序将处理列表中的任何内容,但如果到达 clear() 行(并且由于执行程序在不同的线程上运行,这可能会在BatchRunnable完成之前发生),那么列表将为空(或包含下一批!)并且工作批次将有一个不一致的列表。

当您将列表传递给工作人员时,您传递的是参考而不是副本!所以要么复制批次,要么为每个批次创建一个新批次:

 worker = new BatchRunnable(recordBatch);
 executor.execute(worker);
 recordBatch = new CopyOnWriteArrayList<String>();
于 2013-07-23T07:20:20.890 回答
0

我猜你使用迭代器来遍历元素。迭代器在构造迭代器时提供列表状态的快照。遍历迭代器时不需要同步。

因此,在您的情况下,您应该在构造函数中获取迭代器,或者复制CopyOnWriteArrayList.

于 2013-07-23T07:15:51.620 回答
0

如果您在使用纯 a 时遇到并发问题ArrayList,这表明在您BatchRunnable对其进行迭代时它已被修改。ArrayList用 a替换CopyOnWriteArrayList只会隐藏您的并发问题。

在您的代码中,您正在修改(clear()以及add()创建 BatchRunnable 时的列表。提交第一个可运行文件时,它开始处理列表,但您仍继续修改它。

于 2013-07-23T07:19:24.787 回答