0

我需要对大量 id(大约 100k 到 100 万)进行一些计算/处理。由于 id 的数量非常大,并且每个处理都需要一些时间,所以我正在考虑在我的 Java 代码中实现线程。

假设我们不能一次运行 100K 线程,在这种情况下我如何实现线程?

注意 - 我能想到的唯一解决方案是运行大约 100 个或更多线程,每个线程将处理大约 1000 个或更多 ID。

4

4 回答 4

5

使用 Java 内置的线程池和执行器。

ExecutorService foo =  Executors.newFixedThreadPool(100);
foo.submit(new MyRunnable());

您可以创建各种线程池来定制您想要的数量,如果它是动态的,等等。

于 2013-09-18T16:17:07.790 回答
3

使用线程池:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class ThreadIDS implements Runnable
{
    public static final int totalIDS = 1000000;
    int start;
    int range;
    public ThreadIDS(int start, int range)
    {
        this.start=start;
        this.range=range;
    }
    public static void main(String[] args)
    {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        int eachThread = totalIDS/availableProcessors + 1;
        ExecutorService threads = Executors.newFixedThreadPool(availableProcessors);
        for(int i = 0 ; i < availableProcessors ; i++)
        {
            threads.submit(new ThreadIDS(i*eachThread, eachThread));
        }   
        while(!threads.awaitTermination(1000, TimeUnit.MILLISECONDS))System.out.println("Waiting for threads to finish");
    }

    public void processID(int id)
    {

    }

    public void run()
    {
        for(int i = start ; i < Math.min(start+range, totalIDS) ; i++)
        {
            processID(i);
        }
    }
}

编辑运行方法。由于我们在除法时加 1 以避免整数除法使我们错过 id,因此我们可能会超出 totalIDS 限制。Math.min 避免了这种情况。

如果您不想使用 ThreadPools,则将 main 更改为:

public static void main(String[] args)
{
    int availableProcessors = Runtime.getRuntime().availableProcessors();
    int eachThread = totalIDS/availableProcessors + 1;
    for(int i = 0 ; i < availableProcessors ; i++)
    {
        new Thread(new ThreadIDS(i * eachThread, eachThread)).start();
    }
}
于 2013-09-18T16:13:19.837 回答
0

运行与 CPU 内核一样多的线程 (Runtime.getRuntime().availableProcessors())。让每个线程像这样循环运行:

public void run() {
   while (!ids.isEmpty()) {
     Id id = ids.poll(); // exact access method depends on how your set of ids is organized
     processId(id);
   }
}

与使用线程池相比,这更简单,需要更少的内存(无需为每个 id 创建 Runnable)。

于 2013-09-19T04:33:01.123 回答
0

如果给定 ID 的处理时间有任何变化,那么将您的工作分成 4 个 Runnable(每个内核 1 个)可能不是最好的主意。一个更好的解决方案是将你的工作分成小块,这样一个核心就不会被所有的“艰苦”工作卡住,而其他 3 个核心则通过他们的工作然后什么也不做。

您可以提前将任务分成小块并将它们提交给 ThreadPoolExecutor,但使用 Fork/Join 框架可能会更好。它旨在非常有效地处理此类事情。

这样的事情可以确保所有 4 个核心都保持忙碌,直到所有工作完成:

public class Test
{
    public void workTest()
    {
        ForkJoinPool pool = new ForkJoinPool();  //Defaults to # of cores

        List<ObjectThatWeProcess> work = getWork(); //Get IDs or whatever
        FJAction action = new FJAction(work);
        pool.invoke(action);        
    }

    public static class FJAction extends RecursiveAction
    {
        private static final workSize = 1000; //Only do work if 1000 objects or less
        List<ObjectThatWeProcess> work;

        FJAction(List<ObjectThatWeProcess> work)
        {
            this.work = work;
        }

        public void compute()
        {
            if(work.size() > workSize)
            {
                invokeAll(new FJAction(work.subList(0,work.size()/2)),
                          new FJAction(work.subList(work.size()/2,work.size())));
            }
            else
                processWork();
        }

        private void processWork()
        {
            //do something
        }
    }
}

如果“工作”返回与您相关的值,您还可以扩展 RecursiveTask<T>。

于 2013-09-19T14:50:57.430 回答