16

我需要同时处理一些 Collection 实例中的元素。换句话说,而不是迭代 Collection 实例

for (Someclass elem : coll){
     process(elem);
}

我想同时处理这些元素。说,类似的东西ConcurrentCollectionExecutor(coll, new Callable{…}, numberOfThreads)。此外,应该修复多个同时线程。

任何灵活的模式已经存在?

4

4 回答 4

8

一个好的解决方案是:

  1. 实例化一个包含要处理的元素的ArrayBlockingQueue
  2. 实例化一个ExecutorService来同时执行你的处理
  3. 实例化你Runnable的 s 给他们ArrayBlockingQueue作为参数
  4. 实现run方法:当队列中有元素时,轮询它们并处理它们
  5. 将您Runnable的 s 提交给ExecutorService

编码:

BlockingQueue<Someclass> toProcess = 
    new ArrayBlockingQueue<Someclass>(coll.size(), false, coll);
ExecutorService es = Executors.newFixedThreadPool(numberOfThreads);
for(int count = 0 ; count < numberOfThreads ; ++c) {
    es.submit(new MyRunnable(toProcess));
}


private static class MyRunnable() implements Runnable {
    private final BlockingQueue<Someclass> toProcess;

    public MyRunnable(BlockingQueue<Someclass> toProcess) {
        this.toProcess = toProcess;
    }

    @Override
    public void run() {
        Someclass element = null;
        while((element = toProcess.poll()) != null) {
            process(element);
        }
    }
}
于 2013-02-08T08:38:13.823 回答
8

使 process 方法成为一个名为 MyRunnable 的类中的 run() 方法,该类实现 Runnable 并且其构造函数将 elem 作为输入并将其存储为实例变量。然后使用:

ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
for (Someclass elem : coll){
   Runnable worker = new MyRunnable(elem);
   executor.execute(worker);
}
于 2013-02-08T08:32:35.580 回答
2

下面是此类执行器类的“手工”版本。请注意,您必须传递的不是Callable(或Runnable)的实例,而是此类处理器类的类名。

public class ConcurrentCollectionExecutor<T> {

private Collection<T> collection;
private Class<Runnable> processor;
private int numberOfThreads;
private Executor executor;

public ConcurrentCollectionExecutor(Collection<T> collection, Class<Runnable> processor, int numberOfThreads) {
    this.collection = collection;
    this.processor = processor;
    this.numberOfThreads = numberOfThreads;
    this.executor = Executors.newFixedThreadPool(numberOfThreads);
}

public void run() {
    try {
        Constructor<Runnable> constructor = null;
        for (T t : collection) {
            if (constructor == null) {
                constructor = processor.getConstructor(t.getClass());
            }
            executor.execute(constructor.newInstance(t));
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}      
}
于 2013-02-08T08:43:16.600 回答
0

我不知道这有什么模式,但作为一个想法,你可以根据线程数来定义你的集合元素,所以每个线程都会得到 X 元素来处理,例如:

Collection 有 20 个元素,你所有的函数都提供 4 个线程,然后实习生你像这样开始它们:

thread1 gets the elements [0 .. 4]
thread2 gets the elements [5 .. 9]
thread3 gets the elements [10 .. 14]
thread1 gets the elements [15 .. 19]

请注意,从集合中删除元素可能会导致问题,然后特别是线程 4 尝试访问 element[19],而您的集合中的元素少于 20 个。

编辑

正如大脑根据元素处理时间提到的那样,这个想法可能不是有效的,好像处理前 5 个元素中的一个需要 10 秒,但其他元素只需要 0.5 秒,然后线程 1 会很忙,但其他线程最终不会并行运行很长时间。

于 2013-02-08T08:33:57.320 回答