我需要同时处理一些 Collection 实例中的元素。换句话说,而不是迭代 Collection 实例
for (Someclass elem : coll){
process(elem);
}
我想同时处理这些元素。说,类似的东西ConcurrentCollectionExecutor(coll, new Callable{…}, numberOfThreads)
。此外,应该修复多个同时线程。
任何灵活的模式已经存在?
我需要同时处理一些 Collection 实例中的元素。换句话说,而不是迭代 Collection 实例
for (Someclass elem : coll){
process(elem);
}
我想同时处理这些元素。说,类似的东西ConcurrentCollectionExecutor(coll, new Callable{…}, numberOfThreads)
。此外,应该修复多个同时线程。
任何灵活的模式已经存在?
一个好的解决方案是:
Runnable
的 s 给他们ArrayBlockingQueue作为参数run
方法:当队列中有元素时,轮询它们并处理它们Runnable
的 s 提交给ExecutorService
编码:
BlockingQueue<Someclass> toProcess =
new ArrayBlockingQueue<Someclass>(coll.size(), false, coll);
ExecutorService es = Executors.newFixedThreadPool(numberOfThreads);
for(int count = 0 ; count < numberOfThreads ; ++c) {
es.submit(new MyRunnable(toProcess));
}
private static class MyRunnable() implements Runnable {
private final BlockingQueue<Someclass> toProcess;
public MyRunnable(BlockingQueue<Someclass> toProcess) {
this.toProcess = toProcess;
}
@Override
public void run() {
Someclass element = null;
while((element = toProcess.poll()) != null) {
process(element);
}
}
}
使 process 方法成为一个名为 MyRunnable 的类中的 run() 方法,该类实现 Runnable 并且其构造函数将 elem 作为输入并将其存储为实例变量。然后使用:
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
for (Someclass elem : coll){
Runnable worker = new MyRunnable(elem);
executor.execute(worker);
}
下面是此类执行器类的“手工”版本。请注意,您必须传递的不是Callable
(或Runnable
)的实例,而是此类处理器类的类名。
public class ConcurrentCollectionExecutor<T> {
private Collection<T> collection;
private Class<Runnable> processor;
private int numberOfThreads;
private Executor executor;
public ConcurrentCollectionExecutor(Collection<T> collection, Class<Runnable> processor, int numberOfThreads) {
this.collection = collection;
this.processor = processor;
this.numberOfThreads = numberOfThreads;
this.executor = Executors.newFixedThreadPool(numberOfThreads);
}
public void run() {
try {
Constructor<Runnable> constructor = null;
for (T t : collection) {
if (constructor == null) {
constructor = processor.getConstructor(t.getClass());
}
executor.execute(constructor.newInstance(t));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
我不知道这有什么模式,但作为一个想法,你可以根据线程数来定义你的集合元素,所以每个线程都会得到 X 元素来处理,例如:
Collection 有 20 个元素,你所有的函数都提供 4 个线程,然后实习生你像这样开始它们:
thread1 gets the elements [0 .. 4]
thread2 gets the elements [5 .. 9]
thread3 gets the elements [10 .. 14]
thread1 gets the elements [15 .. 19]
请注意,从集合中删除元素可能会导致问题,然后特别是线程 4 尝试访问 element[19],而您的集合中的元素少于 20 个。
编辑:
正如大脑根据元素处理时间提到的那样,这个想法可能不是有效的,好像处理前 5 个元素中的一个需要 10 秒,但其他元素只需要 0.5 秒,然后线程 1 会很忙,但其他线程最终不会并行运行很长时间。