15

哪种 Java 同步结构可能为具有固定数量线程的并发迭代处理场景提供最佳性能,如下所述?在我自己试验了一段时间(使用 ExecutorService 和 CyclicBarrier)并且对结果有些惊讶之后,我将不胜感激一些专家的建议,也许还有一些新的想法。这里的现有问题似乎并不主要关注性能,因此这是一个新问题。提前致谢!

该应用程序的核心是一个简单的迭代数据处理算法,并行化以在运行 OS X 10.6 和 Java 1.6.0_07 的 Mac Pro 上的 8 个内核上分散计算负载。要处理的数据被分成 8 个块,每个块被馈送到一个 Runnable 以由固定数量的线程之一执行。将算法并行化相当简单,它在功能上可以按预期工作,但它的性能还不是我认为的那样。该应用程序似乎在系统调用同步上花费了很多时间,所以经过一些分析后,我想知道我是否选择了最合适的同步机制。

该算法的一个关键要求是它需要分阶段进行,因此线程需要在每个阶段结束时同步。主线程准备工作(非常低的开销),将其传递给线程,让它们处理它,然后在所有线程完成后继续,重新安排工作(同样非常低的开销)并重复循环。该机器专门用于此任务,垃圾收集通过使用预分配项目的每个线程池最小化,并且线程数可以固定(没有传入请求等,每个 CPU 核心只有一个线程)。

V1 - 执行服务

我的第一个实现使用了一个带有 8 个工作线程的 ExecutorService。该程序创建了 8 个任务来保存工作,然后让他们处理它,大致如下:

// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
    // package data into 8 work items
    ...

    // create one Callable task per work item
    ...

    // submit the Callables to the worker threads
    executorService.invokeAll( taskList );
}

这在功能上运行良好(它应该做的),并且对于非常大的工作项,确实所有 8 个 CPU 都变得高度负载,正如处理算法所允许的那样(一些工作项将比其他工作项完成得更快,然后空闲) . 然而,随着工作项变得更小(并且这并不是真正在程序的控制之下),用户 CPU 负载急剧减少:

blocksize | system | user | cycles/sec
256k        1.8%    85%     1.30
64k         2.5%    77%     5.6
16k         4%      64%     22.5
4096        8%      56%     86
1024       13%      38%     227
256        17%      19%     420
64         19%      17%     948
16         19%      13%     1626

图例: - 块大小 = 工作项的大小(= 计算步数) - 系统 = 系统负载,如 OS X 活动监视器(红条) - 用户 = 用户负载,如 OS X 活动监视器(绿条) - 周期/秒 = 通过主 while 循环的迭代,越多越好

这里主要关注的领域是系统中花费的时间比例很高,这似乎是由线程同步调用驱动的。正如预期的那样,对于较小的工作项, ExecutorService.invokeAll() 将需要相对更多的努力来同步线程,而不是每个线程中正在执行的工作量。但是由于 ExecutorService 比这个用例需要的更通用(如果任务多于核心,它可以为线程排队任务),我认为可能会有一个更精简的同步结构。

V2 - 循环障碍

下一个实现使用 CyclicBarrier 在接收工作之前和完成之后同步线程,大致如下:

main() {
    // create the barrier
    barrier = new CyclicBarrier( 8 + 1 );

    // create Runable for thread, tell it about the barrier
    Runnable task = new WorkerThreadRunnable( barrier );

    // start the threads
    for( int i = 0; i < 8; i++ )
    {
        // create one thread per core
        new Thread( task ).start();
    }

    while( ... ) {
        // tell threads about the work
        ...

        // N threads + this will call await(), then system proceeds
        barrier.await();

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }

    public void run()
    {
        while( true )
        {
            // wait for work
            barrier.await();

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

同样,这在功能上运行良好(它应该做的),并且对于非常大的工作项目,确实所有 8 个 CPU 都像以前一样变得高负载。然而,随着工作项变得更小,负载仍然急剧缩小:

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.7%     78%    6.1
16k         5.5%     52%    25
4096        9%       29%    64
1024       11%       15%    117
256        12%        8%    169
64         12%        6.5%  285
16         12%        6%    377

对于大型工作项,同步可以忽略不计,性能与 V1 相同。但出乎意料的是,(高度专业化的)CyclicBarrier 的结果似乎比(通用)ExecutorService 的结果差得多:吞吐量(周期/秒)仅为 V1 的 1/4 左右。初步结论是,尽管这似乎是 CyclicBarrier 宣传的理想用例,但它的性能比通用的 ExecutorService 差得多。

V3 - 等待/通知 + CyclicBarrier

似乎值得尝试用简单的等待/通知机制替换第一个循环障碍 await():

main() {
    // create the barrier
    // create Runable for thread, tell it about the barrier
    // start the threads

    while( ... ) {
        // tell threads about the work
        // for each: workerThreadRunnable.setWorkItem( ... );

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;
    @NotNull volatile private Callable<Integer> workItem;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        synchronized( this )
        {
            workItem = callable;
            notify();
        }
    }

    public void run()
    {
        while( true )
        {
            // wait for work
            while( true )
            {
                synchronized( this )
                {
                    if( workItem != NO_WORK ) break;

                    try
                    {
                        wait();
                    }
                    catch( InterruptedException e ) { e.printStackTrace(); }
                }
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

同样,这在功能上运行良好(它做了它应该做的)。

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.4%     80%    6.3
16k         4.6%     60%    30.1
4096        8.6%     41%    98.5
1024       12%       23%    202
256        14%       11.6%  299
64         14%       10.0%  518
16         14.8%      8.7%  679

小工作项的吞吐量仍然比 ExecutorService 差很多,但大约是 CyclicBarrier 的 2 倍。消除一个 CyclicBarrier 消除了一半的差距。

V4 - 忙等待而不是等待/通知

由于此应用程序是系统上运行的主要应用程序,并且如果内核不忙于工作项,内核无论如何都会空闲,为什么不在每个线程中尝试忙于等待工作项,即使这会不必要地旋转 CPU。工作线程代码更改如下:

class WorkerThreadRunnable implements Runnable {
    // as before

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        workItem = callable;
    }

    public void run()
    {
        while( true )
        {
            // busy-wait for work
            while( true )
            {
                if( workItem != NO_WORK ) break;
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

在功能上也能很好地工作(它应该做的事情)。

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.2%     81%    6.3
16k         4.2%     62%     33
4096        7.5%     40%    107
1024       10.4%     23%    210
256        12.0%    12.0%   310
64         11.9%    10.2%   550
16         12.2%     8.6%   741

对于小型工作项,这比 CyclicBarrier + wait/notify 变体进一步增加了 10% 的吞吐量,这并非微不足道。但它的吞吐量仍然比使用 ExecutorService 的 V1 低得多。

V5 - ?

那么对于这种(可能并不少见)问题的最佳同步机制是什么?我厌倦了编写自己的同步机制来完全替换 ExecutorService(假设它太通用了,必须有一些东西仍然可以取出以使其更高效)。这不是我的专业领域,我担心我会花费大量时间调试它(因为我什至不确定我的等待/通知和忙等待变体是否正确)以获得不确定的收益。

任何建议将不胜感激。

4

6 回答 6

6

It does seem that you do not need any synchronization between the workers. Maybe you should consider using the ForkJoin framework which is available in Java 7, as well as a separate library. Some links:

于 2012-10-04T21:55:56.387 回答
3

Update: V6 - Busy Wait, with main thread also working

An obvious improvement on V5 (busy wait for work in 7 worker threads, busy wait for completion in main thread) seemed to again split the work into 7+1 parts and to let the main thread process one part concurrently with the other worker threads (instead of just busy-waiting), and to subsequently busy-wait for the completion of all other threads' work items. That would utilize the 8th processor (in the example's 8-core configuration) and add its cycles to the available compute resource pool.

This was indeed straight-forward to implement. And the results are indeed again slightly better:

blocksize | system | user | cycles/sec
256k        1.0%     98%       1.39
64k         1.0%     98%       6.8
16k         1.0%     98%      50.4
4096        1.0%     98%     372
1024        1.0%     98%    1317
256         1.0%     98%    3546
64          1.5%     98%    9091
16          2.0%     98%   16949

So this seems to represents the best solution so far.

于 2010-04-27T08:26:21.447 回答
1

我也想知道你是否可以尝试超过 8 个线程。如果您的 CPU 支持超线程,那么(至少在理论上)您可以为每个内核挤压 2 个线程,然后看看它会产生什么结果。

于 2010-04-26T21:54:37.123 回答
1

更新:V5 - 所有线程中的忙等待(到目前为止似乎是最佳的)

由于所有内核都专用于此任务,因此似乎值得尝试简单地消除所有复杂的同步结构并在所有线程中的每个同步点进行忙碌等待。事实证明,这大大超过了所有其他方法。

设置如下:从上面的 V4 开始(CyclicBarrier + Busy Wait)。将 CyclicBarrier 替换为 AtomicInteger,主线程在每个周期都将其重置为零。每个完成其工作的工作线程 Runnable 都会将原子整数加一。主线程忙等待:

while( true ) {
    // busy-wait for threads to complete their work
    if( atomicInt.get() >= workerThreadCount ) break;
}

而不是 8 个,只启动了 7 个工作线程(因为所有线程,包括主线程,现在几乎完全加载了一个核心)。结果如下:

blocksize | system | user | cycles/sec
256k        1.0%     98%       1.36
64k         1.0%     98%       6.8
16k         1.0%     98%      44.6
4096        1.0%     98%     354
1024        1.0%     98%    1189
256         1.0%     98%    3222
64          1.5%     98%    8333
16          2.0%     98%   16129

在工作线程中使用等待/通知可将吞吐量降低到该解决方案的 1/3 左右。

于 2010-04-26T15:00:26.797 回答
1

刚刚找到这个帖子,尽管它已经有将近一年的历史了,但让我向您指出几个月前我们在波恩大学开发的“jbarrier”库:

http://net.cs.uni-bonn.de/wg/cs/applications/jbarrier/

屏障包正好针对工作线程数<=内核数的情况。该包基于忙等待,它不仅支持屏障动作,还支持全局缩减,除了中央屏障之外,它还提供树结构屏障,用于进一步并行化同步/缩减部分。

于 2011-02-05T14:58:57.233 回答
1

更新:V7 - 忙碌等待恢复为等待/通知

在玩了一些 V6 之后,事实证明,在进行分析时,繁忙的等待会稍微掩盖应用程序的真正热点。另外,即使没有工作项目正在处理,系统上的风扇也会继续超速运转。因此,进一步的改进是忙于等待工作项目一段固定的时间(例如,大约 2 毫秒),然后恢复到“更好”的 wait()/notify() 组合。工作线程只是通过一个原子布尔值将它们当前的等待模式发布到主线程,该原子布尔值指示它们是否正忙于等待(因此只需要设置一个工作项)或者它们是否期望调用 notify() 因为它们在等待()。

另一个相当直接的改进是让已完成其主要工作项的线程在等待其他线程完成其主要工作项时重复调用客户端提供的回调。这样,等待时间(发生这种情况是因为线程必然会获得稍微不同的工作负载)不需要完全丢失给应用程序。

我仍然很想听听其他遇到类似用例的用户的意见。

于 2010-04-28T15:31:21.990 回答