哪种 Java 同步结构可能为具有固定数量线程的并发迭代处理场景提供最佳性能,如下所述?在我自己试验了一段时间(使用 ExecutorService 和 CyclicBarrier)并且对结果有些惊讶之后,我将不胜感激一些专家的建议,也许还有一些新的想法。这里的现有问题似乎并不主要关注性能,因此这是一个新问题。提前致谢!
该应用程序的核心是一个简单的迭代数据处理算法,并行化以在运行 OS X 10.6 和 Java 1.6.0_07 的 Mac Pro 上的 8 个内核上分散计算负载。要处理的数据被分成 8 个块,每个块被馈送到一个 Runnable 以由固定数量的线程之一执行。将算法并行化相当简单,它在功能上可以按预期工作,但它的性能还不是我认为的那样。该应用程序似乎在系统调用同步上花费了很多时间,所以经过一些分析后,我想知道我是否选择了最合适的同步机制。
该算法的一个关键要求是它需要分阶段进行,因此线程需要在每个阶段结束时同步。主线程准备工作(非常低的开销),将其传递给线程,让它们处理它,然后在所有线程完成后继续,重新安排工作(同样非常低的开销)并重复循环。该机器专门用于此任务,垃圾收集通过使用预分配项目的每个线程池最小化,并且线程数可以固定(没有传入请求等,每个 CPU 核心只有一个线程)。
V1 - 执行服务
我的第一个实现使用了一个带有 8 个工作线程的 ExecutorService。该程序创建了 8 个任务来保存工作,然后让他们处理它,大致如下:
// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
// package data into 8 work items
...
// create one Callable task per work item
...
// submit the Callables to the worker threads
executorService.invokeAll( taskList );
}
这在功能上运行良好(它应该做的),并且对于非常大的工作项,确实所有 8 个 CPU 都变得高度负载,正如处理算法所允许的那样(一些工作项将比其他工作项完成得更快,然后空闲) . 然而,随着工作项变得更小(并且这并不是真正在程序的控制之下),用户 CPU 负载急剧减少:
blocksize | system | user | cycles/sec
256k 1.8% 85% 1.30
64k 2.5% 77% 5.6
16k 4% 64% 22.5
4096 8% 56% 86
1024 13% 38% 227
256 17% 19% 420
64 19% 17% 948
16 19% 13% 1626
图例: - 块大小 = 工作项的大小(= 计算步数) - 系统 = 系统负载,如 OS X 活动监视器(红条) - 用户 = 用户负载,如 OS X 活动监视器(绿条) - 周期/秒 = 通过主 while 循环的迭代,越多越好
这里主要关注的领域是系统中花费的时间比例很高,这似乎是由线程同步调用驱动的。正如预期的那样,对于较小的工作项, ExecutorService.invokeAll() 将需要相对更多的努力来同步线程,而不是每个线程中正在执行的工作量。但是由于 ExecutorService 比这个用例需要的更通用(如果任务多于核心,它可以为线程排队任务),我认为可能会有一个更精简的同步结构。
V2 - 循环障碍
下一个实现使用 CyclicBarrier 在接收工作之前和完成之后同步线程,大致如下:
main() {
// create the barrier
barrier = new CyclicBarrier( 8 + 1 );
// create Runable for thread, tell it about the barrier
Runnable task = new WorkerThreadRunnable( barrier );
// start the threads
for( int i = 0; i < 8; i++ )
{
// create one thread per core
new Thread( task ).start();
}
while( ... ) {
// tell threads about the work
...
// N threads + this will call await(), then system proceeds
barrier.await();
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }
public void run()
{
while( true )
{
// wait for work
barrier.await();
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
同样,这在功能上运行良好(它应该做的),并且对于非常大的工作项目,确实所有 8 个 CPU 都像以前一样变得高负载。然而,随着工作项变得更小,负载仍然急剧缩小:
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.7% 78% 6.1
16k 5.5% 52% 25
4096 9% 29% 64
1024 11% 15% 117
256 12% 8% 169
64 12% 6.5% 285
16 12% 6% 377
对于大型工作项,同步可以忽略不计,性能与 V1 相同。但出乎意料的是,(高度专业化的)CyclicBarrier 的结果似乎比(通用)ExecutorService 的结果差得多:吞吐量(周期/秒)仅为 V1 的 1/4 左右。初步结论是,尽管这似乎是 CyclicBarrier 宣传的理想用例,但它的性能比通用的 ExecutorService 差得多。
V3 - 等待/通知 + CyclicBarrier
似乎值得尝试用简单的等待/通知机制替换第一个循环障碍 await():
main() {
// create the barrier
// create Runable for thread, tell it about the barrier
// start the threads
while( ... ) {
// tell threads about the work
// for each: workerThreadRunnable.setWorkItem( ... );
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
@NotNull volatile private Callable<Integer> workItem;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
synchronized( this )
{
workItem = callable;
notify();
}
}
public void run()
{
while( true )
{
// wait for work
while( true )
{
synchronized( this )
{
if( workItem != NO_WORK ) break;
try
{
wait();
}
catch( InterruptedException e ) { e.printStackTrace(); }
}
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
同样,这在功能上运行良好(它做了它应该做的)。
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.4% 80% 6.3
16k 4.6% 60% 30.1
4096 8.6% 41% 98.5
1024 12% 23% 202
256 14% 11.6% 299
64 14% 10.0% 518
16 14.8% 8.7% 679
小工作项的吞吐量仍然比 ExecutorService 差很多,但大约是 CyclicBarrier 的 2 倍。消除一个 CyclicBarrier 消除了一半的差距。
V4 - 忙等待而不是等待/通知
由于此应用程序是系统上运行的主要应用程序,并且如果内核不忙于工作项,内核无论如何都会空闲,为什么不在每个线程中尝试忙于等待工作项,即使这会不必要地旋转 CPU。工作线程代码更改如下:
class WorkerThreadRunnable implements Runnable {
// as before
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
workItem = callable;
}
public void run()
{
while( true )
{
// busy-wait for work
while( true )
{
if( workItem != NO_WORK ) break;
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
在功能上也能很好地工作(它应该做的事情)。
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.2% 81% 6.3
16k 4.2% 62% 33
4096 7.5% 40% 107
1024 10.4% 23% 210
256 12.0% 12.0% 310
64 11.9% 10.2% 550
16 12.2% 8.6% 741
对于小型工作项,这比 CyclicBarrier + wait/notify 变体进一步增加了 10% 的吞吐量,这并非微不足道。但它的吞吐量仍然比使用 ExecutorService 的 V1 低得多。
V5 - ?
那么对于这种(可能并不少见)问题的最佳同步机制是什么?我厌倦了编写自己的同步机制来完全替换 ExecutorService(假设它太通用了,必须有一些东西仍然可以取出以使其更高效)。这不是我的专业领域,我担心我会花费大量时间调试它(因为我什至不确定我的等待/通知和忙等待变体是否正确)以获得不确定的收益。
任何建议将不胜感激。