2

我们的应用程序要求所有工作线程在定义的点同步。为此,我们使用 a CyclicBarrier,但它似乎不能很好地扩展。对于超过 8 个线程,同步开销似乎超过了多线程的好处。(但是,我不能用测量数据来支持这一点。)

编辑:同步发生得非常频繁,大约 100k 到 1M 次。

如果许多线程的同步是“困难的”,它是否有助于构建同步树?线程 1 等待 2 和 3,依次等待 4+5 和 6+7,以此类推;完成后,线程 2 和 3 等待线程 1,线程 4 和 5 等待线程 2,依此类推。

1
| \
2   3
|\  |\
4 5 6 7

这样的设置会减少同步开销吗?我会很感激任何建议。

另请参阅这个特色问题:Java 中最快的循环同步是什么(ExecutorService vs. CyclicBarrier vs. X)?

4

4 回答 4

2

对于超过 8 个线程,同步开销似乎超过了多线程的好处。(但是,我不能用测量数据来支持这一点。)

老实说,你的问题就在那里。找出一个性能基准并证明这是问题所在,或者冒着花费数小时/天来解决完全错误的问题的风险。

于 2012-09-28T01:23:13.953 回答
1

您正在以一种微妙的错误方式思考问题,这往往会导致非常糟糕的编码。你不想等待线程,你想等待工作完成

可能最有效的方法是共享的、可等待的计数器。当您进行新工作时,增加计数器并向计数器发出信号。完成工作后,递减计数器。如果没有工作可做,请在柜台等候。如果您将计数器降为零,请检查您是否可以进行新工作。

于 2012-09-27T22:42:26.093 回答
1

如果我理解正确,您是在尝试将解决方案分解为多个部分并分别但同时解决,对吗?然后让您当前的线程等待这些任务?您想使用类似 fork/join 模式的东西。

List<CustomThread> threads = new ArrayList<CustomThread>();
for (Something something : somethings) {
    threads.add(new CustomThread(something));
}
for (CustomThread thread : threads) {
    thread.start();
}
for (CustomThread thread : threads) {
    thread.join(); // Blocks until thread is complete
}
List<Result> results = new ArrayList<Result>();
for (CustomThread thread : threads) {
    results.add(thread.getResult());
}
// do something with results.

在 Java 7 中,通过 fork/join 池提供了更多支持。查看ForkJoinPool及其踪迹,并使用 Google 查找许多其他教程之一。

您可以递归此概念以获得所需的树,只需让您创建的线程以完全相同的方式生成更多线程。


编辑:我的印象是你不会创建那么多线程,所以这更适合你的场景。该示例不会非常短,但它与您在另一个答案中的讨论相同,您可以等待工作,而不是线程。

首先,您Callable的子作业需要 aInput并返回 a Result

public class SubJob implements Callable<Result> {
    private final Input input;

    public MyCallable(Input input) {
        this.input = input;
    }

    public Result call() {
        // Actually process input here and return a result
        return JobWorker.processInput(input);
    }
}

然后使用它,创建ExecutorService一个固定大小的线程池。这将限制您同时运行的作业数量,因此您不会意外地对系统进行线程轰炸。这是你的主要工作:

public class MainJob extends Thread {

    // Adjust the pool to the appropriate number of concurrent
    // threads you want running at the same time
    private static final ExecutorService pool = Executors.newFixedThreadPool(30);
    private final List<Input> inputs;

    public MainJob(List<Input> inputs) {
        super("MainJob")
        this.inputs = new ArrayList<Input>(inputs);
    }

    public void run() {
        CompletionService<Result> compService = new ExecutorCompletionService(pool);
        List<Result> results = new ArrayList<Result>();
        int submittedJobs = inputs.size();
        for (Input input : inputs) {
            // Starts the job when a thread is available
            compService.submit(new SubJob(input)); 
        }
        for (int i = 0; i < submittedJobs; i++) {
            // Blocks until a job is completed
            results.add(compService.take())
        }
        // Do something with results
    }
}

这将允许您重用线程,而不是在每次要运行作业时生成一堆新线程。完成服务将在等待作业完成时进行阻塞。另请注意,该results列表将按完成顺序排列。

您也可以使用Executors.newCachedThreadPool,它创建一个没有上限的池(如使用Integer.MAX_VALUE)。如果一个线程可用,它将重用线程,如果池中的所有线程都在运行作业,它将创建一个新线程。如果您以后开始遇到死锁,这可能是可取的(因为固定线程池中有太多作业在等待子作业无法运行和完成)。这至少会限制您创建/销毁的线程数。

最后,您需要ExecutorService手动关闭,可能通过关闭挂钩,或者它包含的线程将不允许 JVM 终止。

希望有帮助/有意义。

于 2012-09-27T22:51:30.533 回答
0

如果您有生成任务(例如处理矩阵列的示例),那么您可能会遇到 CyclicBarrier。也就是说,如果必须完成第 1 代的每一项工作才能处理第 2 代的任何工作,那么您能做的最好的事情就是等待满足该条件。

如果每一代都有数千个任务,那么最好将所有这些任务提交给一个 ExecutorService (ExecutorService.invokeAll) 并等待结果返回,然后再继续下一步。这样做的好处是消除了上下文切换和在物理 CPU 有限时分配数百个线程而浪费的时间/内存。

如果您的任务不是分代的,而是更多的树状结构,其中只需要完成一个子集才能在该子集上进行下一步,那么您可能需要考虑 aForkJoinPool并且您不需要 Java 7去做。您可以获得 Java 6 的参考实现。这可以在任何 JSR 引入 ForkJoinPool 库代码下找到。

我还有另一个答案,它在 Java 6 中提供了一个粗略的实现:

public class Fib implements Callable<Integer> {
    int n;
    Executor exec;

    Fib(final int n, final Executor exec) {
        this.n = n;
        this.exec = exec;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Integer call() throws Exception {
        if (n == 0 || n == 1) {
            return n;
        }

        //Divide the problem
        final Fib n1 = new Fib(n - 1, exec);
        final Fib n2 = new Fib(n - 2, exec);

        //FutureTask only allows run to complete once
        final FutureTask<Integer> n2Task = new FutureTask<Integer>(n2);
        //Ask the Executor for help
        exec.execute(n2Task);

        //Do half the work ourselves
        final int partialResult = n1.call();

        //Do the other half of the work if the Executor hasn't
        n2Task.run();

        //Return the combined result
        return partialResult + n2Task.get();
    }

}    

请记住,如果您将任务划分得过多并且每个线程完成的工作单元太小,则会对性能产生负面影响。例如,上面的代码是解决斐波那契的一种非常慢的方法。

于 2012-09-28T00:43:58.157 回答