11

我有一个 Java 应用程序,它有工作线程来处理作业。一个工人产生一个结果对象,比如:

class WorkerResult{
    private final Set<ResultItems> items;
    public Worker(Set<ResultItems> pItems){
         items = pItems;
    }
}

当工作人员完成时,它会执行以下操作:

 ...
 final Set<ResultItems> items = new SomeNonThreadSafeSetImplSet<ResultItems>();
 for(Item producedItem : ...){
      items.add(item);
 }
 passToGatherThread(items);

items集合在这里是一种“工作单元”。该passToGatherThread方法将items集合传递给一个收集线程,其中只有一个在运行时存在。

这里不需要同步,因为只有一个线程(Gather-thread)读取items集合,所以不会发生竞争条件。AFAICS,Gather-thread 可能看不到所有项目,因为该集合不是线程安全的,对吧?

假设我无法进行passToGatherThread同步,比如说因为它是第 3 方库。我基本上担心的是,由于缓存、VM 优化等原因,收集线程看不到所有项目。所以问题来了:如何以线程安全的方式传递项目集,以便收集线程“看到”正确的项目集?

4

4 回答 4

2

这里似乎没有同步问题。您为每个 passToGatherThread 创建一个新的 Set 对象,并在修改集合后执行此操作。不会丢失任何对象。

如果不对集合进行任何修改,许多线程可以同时访问 Set(和大多数 Java 集合)。这Collections.unmodifiableCollection就是为了。

由于上述passToGatherThread方法作为与其他线程的通信,它必须使用某种同步——并且每次同步确保线程之间的内存一致性。

另外 - 请注意,对传递集合中对象的所有写入都是在传递给另一个线程之前进行的。即使内存被复制到线程的本地缓存中,它也具有与其他线程相同的未修改值。

于 2013-02-11T13:08:47.143 回答
1

您可以简单地使用SetJava 为您的WorkerResult. 参见例如:

另一种选择是使用Collections.synchronizedSet().

于 2013-02-11T13:09:38.100 回答
1

我已经思考(并讨论)了很多这个问题,并且我想出了另一个答案,我希望这将是最好的解决方案。

传递一个同步的集合在效率方面并不好,因为对该集合的每个后续操作都会被同步 - 如果有很多操作,它可能会被证明是一个障碍。

直截了当:让我们做一些假设(我不同意):

  • 提到的passToGatherThread方法确实不安全,但看起来不太可能
  • 编译器可以重新排序代码中的事件,以便在passToGatherThread填充集合之前调用

确保传递给gatherer方法的集合已准备好且完整的最简单、最干净且可能最有效的方法是将集合推送放在一个同步块中,如下所示:

synchronized(items) {
  passToGatherThread(items);
}

这样,我们在传递集合之前确保内存同步和有效的发生前序列,从而确保所有对象都正确传递。

于 2013-02-13T15:57:38.470 回答
0

worker 实现 callable 并返回 WorkerResult:

class Worker implements Callable<WorkerResult> {
    private WorkerInput in;

    public Worker(WorkerInput in) {
        this.in = in;
    }

    public WorkerResult call() {
        // do work here
    }
}

然后我们使用 ExecutorService 来管理线程池,并通过使用 Future 收集结果。

public class PooledWorkerController {

    private static final int MAX_THREAD_POOL = 3;
    private final ExecutorService pool = 
       Executors.newFixedThreadPool(MAX_THREAD_POOL);

    public Set<ResultItems> process(List<WorkerInput> inputs) 
           throws InterruptedException, ExecutionException{         
        List<Future<WorkerResult>> submitted = new ArrayList<>();
        for (WorkerInput in : inputs) {
            Future<WorkerResult> future = pool.submit(new Worker(in));
            submitted.add(future);
        }
        Set<ResultItems> results = new HashSet<>();
        for (Future<WorkerResult> future : submitted) {
            results.addAll(future.get().getItems());
        }
        return results;
    }
}
于 2013-02-12T01:16:45.120 回答