0

我正在尝试使用中断器来处理消息。我需要两个处理阶段。即两组处理程序在这样的工作池中工作(我猜):

disruptor.
handleEventsWithWorkerPool(
    firstPhaseHandlers)
.thenHandleEventsWithWorkerPool(
    secondPhaseHandlers);

使用上面的代码时,如果我在每组中放置一个以上的工人,性能会下降。意味着大量的 CPU 浪费在完全相同的工作量上。

我试图调整环形缓冲区的大小(我已经看到这对性能有影响),但在这种情况下它没有帮助。所以我做错了什么,还是这是一个真正的问题?


我附上了这个问题的完整演示。

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;

final class ValueEvent {
private long value;

public long getValue() {
    return value;
}

public void setValue(long value) {
    this.value = value;
}

public final static EventFactory<ValueEvent> EVENT_FACTORY = new     EventFactory<ValueEvent>() {
    public ValueEvent newInstance() {
        return new ValueEvent();
    }
};
}

class MyWorkHandler implements WorkHandler<ValueEvent> {

AtomicLong workDone;
public MyWorkHandler (AtomicLong wd)
{
    this.workDone=wd;
}
public void onEvent(final ValueEvent event) throws Exception {

    workDone.incrementAndGet();
}

}

class My2ndPahseWorkHandler implements WorkHandler<ValueEvent> {


AtomicLong workDone;
public My2ndPahseWorkHandler (AtomicLong wd)
{
    this.workDone=wd;
}

public void onEvent(final ValueEvent event) throws Exception {

    workDone.incrementAndGet();
}

}

class MyEventTranslator implements EventTranslatorOneArg<ValueEvent, Long> {

@Override
public void translateTo(ValueEvent event, long sequence, Long value) {

    event.setValue(value);

}

}

public class TwoPhaseDisruptor {

static AtomicLong workDone=new AtomicLong(0);

@SuppressWarnings("unchecked")
public static void main(String[] args) {

    ExecutorService exec = Executors.newCachedThreadPool();

    int numOfHandlersInEachGroup=Integer.parseInt(args[0]);
    long eventCount=Long.parseLong(args[1]);
    int ringBufferSize=2 << (Integer.parseInt(args[2]));


    Disruptor<ValueEvent> disruptor = new Disruptor<ValueEvent>(
            ValueEvent.EVENT_FACTORY, ringBufferSize,
            exec);

    ArrayList<MyWorkHandler> handlers = new ArrayList<MyWorkHandler>();
    for (int i = 0; i < numOfHandlersInEachGroup ; i++) {

        handlers.add(new MyWorkHandler(workDone));
    }

    ArrayList<My2ndPahseWorkHandler > phase2_handlers = new ArrayList<My2ndPahseWorkHandler >();
    for (int i = 0; i < numOfHandlersInEachGroup; i++) {
        phase2_handlers.add(new My2ndPahseWorkHandler(workDone));
    }

    disruptor
            .handleEventsWithWorkerPool(
                    handlers.toArray(new WorkHandler[handlers.size()]))
            .thenHandleEventsWithWorkerPool(
                    phase2_handlers.toArray(new WorkHandler[phase2_handlers.size()]));

    long s = (System.currentTimeMillis());
    disruptor.start();

    MyEventTranslator myEventTranslator = new MyEventTranslator();
    for (long i = 0; i < eventCount; i++) {
        disruptor.publishEvent(myEventTranslator, i);
    }

    disruptor.shutdown();
    exec.shutdown();
    System.out.println("time spent "+ (System.currentTimeMillis() - s) + "     ms");
    System.out.println("amount of work done "+ workDone.get());
}
}

尝试在每个组中使用 1 个线程运行上述示例

1 100000 7

在我的电脑上它给了

time spent 371 ms
amount of work done 200000

然后用每组4个线程试试

4 100000 7

在我的电脑上给了

time spent 9853 ms
amount of work done 200000

在运行期间 CPU 的利用率为 100%

4

1 回答 1

2

您似乎在线程/核心之间共享 AtomicLong 是错误的。但是,当我稍后有更多时间进行演示时,我会尝试一下 - 更好的是让每个 WorkHandler 具有每个线程拥有的私有变量(它是自己的 AtomicLong 或者最好是一个普通的 long)。


更新:

如果您将 Disruptor 线路更改为:

Disruptor<ValueEvent> disruptor = new Disruptor<ValueEvent>(
        ValueEvent.EVENT_FACTORY, ringBufferSize,
        exec,
        com.lmax.disruptor.dsl.ProducerType.SINGLE,
        new com.lmax.disruptor.BusySpinWaitStrategy());

你会得到更好的结果:

jason@debian01:~/code/stackoverflow$ java -cp disruptor-3.1.1.jar:. TwoPhaseDisruptor 4 100000 1024
time spent 2728     ms
amount of work done 200000

我查看了代码并尝试修复错误共享,但发现几乎没有改进。那时我注意到我的 8core 上的 CPU 远未达到 100%(即使是四人测试)。由此我确定,至少,如果你有 CPU 要烧,那么让步/旋转等待策略将减少延迟。

只需确保您至少有 8 个内核(您需要 8 个用于处理,加上 1 个用于发布消息)。

于 2013-07-30T22:32:17.023 回答