我需要对大量 id(大约 100k 到 100 万)进行一些计算/处理。由于 id 的数量非常大,并且每个处理都需要一些时间,所以我正在考虑在我的 Java 代码中实现线程。
假设我们不能一次运行 100K 线程,在这种情况下我如何实现线程?
注意 - 我能想到的唯一解决方案是运行大约 100 个或更多线程,每个线程将处理大约 1000 个或更多 ID。
我需要对大量 id(大约 100k 到 100 万)进行一些计算/处理。由于 id 的数量非常大,并且每个处理都需要一些时间,所以我正在考虑在我的 Java 代码中实现线程。
假设我们不能一次运行 100K 线程,在这种情况下我如何实现线程?
注意 - 我能想到的唯一解决方案是运行大约 100 个或更多线程,每个线程将处理大约 1000 个或更多 ID。
使用 Java 内置的线程池和执行器。
ExecutorService foo = Executors.newFixedThreadPool(100);
foo.submit(new MyRunnable());
您可以创建各种线程池来定制您想要的数量,如果它是动态的,等等。
使用线程池:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadIDS implements Runnable
{
public static final int totalIDS = 1000000;
int start;
int range;
public ThreadIDS(int start, int range)
{
this.start=start;
this.range=range;
}
public static void main(String[] args)
{
int availableProcessors = Runtime.getRuntime().availableProcessors();
int eachThread = totalIDS/availableProcessors + 1;
ExecutorService threads = Executors.newFixedThreadPool(availableProcessors);
for(int i = 0 ; i < availableProcessors ; i++)
{
threads.submit(new ThreadIDS(i*eachThread, eachThread));
}
while(!threads.awaitTermination(1000, TimeUnit.MILLISECONDS))System.out.println("Waiting for threads to finish");
}
public void processID(int id)
{
}
public void run()
{
for(int i = start ; i < Math.min(start+range, totalIDS) ; i++)
{
processID(i);
}
}
}
编辑运行方法。由于我们在除法时加 1 以避免整数除法使我们错过 id,因此我们可能会超出 totalIDS 限制。Math.min 避免了这种情况。
如果您不想使用 ThreadPools,则将 main 更改为:
public static void main(String[] args)
{
int availableProcessors = Runtime.getRuntime().availableProcessors();
int eachThread = totalIDS/availableProcessors + 1;
for(int i = 0 ; i < availableProcessors ; i++)
{
new Thread(new ThreadIDS(i * eachThread, eachThread)).start();
}
}
运行与 CPU 内核一样多的线程 (Runtime.getRuntime().availableProcessors())。让每个线程像这样循环运行:
public void run() {
while (!ids.isEmpty()) {
Id id = ids.poll(); // exact access method depends on how your set of ids is organized
processId(id);
}
}
与使用线程池相比,这更简单,需要更少的内存(无需为每个 id 创建 Runnable)。
如果给定 ID 的处理时间有任何变化,那么将您的工作分成 4 个 Runnable(每个内核 1 个)可能不是最好的主意。一个更好的解决方案是将你的工作分成小块,这样一个核心就不会被所有的“艰苦”工作卡住,而其他 3 个核心则通过他们的工作然后什么也不做。
您可以提前将任务分成小块并将它们提交给 ThreadPoolExecutor,但使用 Fork/Join 框架可能会更好。它旨在非常有效地处理此类事情。
这样的事情可以确保所有 4 个核心都保持忙碌,直到所有工作完成:
public class Test
{
public void workTest()
{
ForkJoinPool pool = new ForkJoinPool(); //Defaults to # of cores
List<ObjectThatWeProcess> work = getWork(); //Get IDs or whatever
FJAction action = new FJAction(work);
pool.invoke(action);
}
public static class FJAction extends RecursiveAction
{
private static final workSize = 1000; //Only do work if 1000 objects or less
List<ObjectThatWeProcess> work;
FJAction(List<ObjectThatWeProcess> work)
{
this.work = work;
}
public void compute()
{
if(work.size() > workSize)
{
invokeAll(new FJAction(work.subList(0,work.size()/2)),
new FJAction(work.subList(work.size()/2,work.size())));
}
else
processWork();
}
private void processWork()
{
//do something
}
}
}
如果“工作”返回与您相关的值,您还可以扩展 RecursiveTask<T>。