我正在使用 workerpool 示例并尝试不同的 WaitStrategies。当我尝试 TimeoutBlockingWaitStrategy 时,出现错误。这是程序和调用堆栈。
package org.lmax.experiment.test;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
@SuppressWarnings("unused")
public class MultipleWorkerPoolsTest {
private static final Logger log = LoggerFactory
.getLogger(MultipleWorkerPoolsTest.class);
private static class StringEvent{
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String toString(){
return value;
}
}
private static class Worker implements WorkHandler<StringEvent>{
private static final Logger log = LoggerFactory
.getLogger(MultipleWorkerPoolsTest.Worker.class);
private String workerId;
public Worker(String workerId) {
super();
this.workerId = workerId;
}
Random r=new Random();
@Override
public void onEvent(StringEvent event) throws Exception {
System.out.println( "{" + workerId+ "} got {" + event + "}");
log.info("{} got {}",workerId,event);
Integer timeToSleep = r.nextInt(10000);
Thread.sleep(timeToSleep);
System.out.println( "{" + workerId+ "} Completed {" + event + "}");
}
}
private static class StringEventFactory implements EventFactory<StringEvent>{
@Override
public StringEvent newInstance() {
return new StringEvent();
}
}
private static class StringEventTranslator implements EventTranslatorOneArg<StringEvent,String>{
@Override
public void translateTo(StringEvent event, long sequence, String arg0) {
event.setValue("event "+sequence+": "+arg0);
}
}
private Disruptor<StringEvent> disruptor;
private ExecutorService executor;
@Before
public void before(){
executor=Executors.newFixedThreadPool(12, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t=new Thread(r);
t.setDaemon(true);
String threadName = "worker_1_" + t.getId() ;
t.setName(threadName);
return t;
}
});
disruptor=new Disruptor<StringEvent>(new StringEventFactory(),64,executor, ProducerType.MULTI,new TimeoutBlockingWaitStrategy(1000, TimeUnit.NANOSECONDS));
Worker w11=new Worker("worker_1-1");
Worker w12=new Worker("worker_1-2");
Worker w13=new Worker("worker_1-3");
Worker w14=new Worker("worker_1-4");
Worker w21=new Worker("worker_2-1");
Worker w22=new Worker("worker_2-2");
Worker w23=new Worker("worker_2-3");
Worker w24=new Worker("worker_2-4");
Worker w31=new Worker("worker_3-1");
Worker w32=new Worker("worker_3-2");
Worker w33=new Worker("worker_3-3");
Worker w34=new Worker("worker_3-4");
Worker[] workerArray = new Worker[12];
/* workerArray[0] = w11;
workerArray[1] = w12;
workerArray[2] = w13;
workerArray[3] = w14;*/
for ( int i = 0; i < 12; i++ ){
String name = "worker_" + Integer.toString(i);
workerArray[i] = new Worker(name);
}
disruptor.handleEventsWithWorkerPool(workerArray);
// .thenHandleEventsWithWorkerPool(w21,w22,w23,w24)
// .thenHandleEventsWithWorkerPool(w31,w32,w33,w34);
//disruptor.handleEventsWithWorkerPool(w11);
}
@After
public void after() throws InterruptedException{
executor.shutdown();
executor.awaitTermination(0, TimeUnit.MILLISECONDS);
}
@Test
public void test1() throws InterruptedException, ExecutionException{
StringEventTranslator t=new StringEventTranslator();
ExecutorService executorService = Executors.newFixedThreadPool(3);
disruptor.start();
RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer();
Future future = executorService.submit(new Runnable() {
public void run() {
StringEventTranslator t=new StringEventTranslator();
RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer();
String threadName = Thread.currentThread().getName();
String threadId = Long.toString(Thread.currentThread().getId());
String eventName = "hello" + threadName + "." + threadId;
for(int i=0;i<10000;i++){
ringBuffer.publishEvent(t, eventName);
//disruptor.publishEvent(t,eventName);
}
}
});
Future future1 = executorService.submit(new Runnable() {
public void run() {
StringEventTranslator t=new StringEventTranslator();
RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer();
String threadName = Thread.currentThread().getName();
String threadId = Long.toString(Thread.currentThread().getId());
String eventName = "hello" + threadName + "." + threadId;
for(int i=0;i<10000;i++){
ringBuffer.publishEvent(t, eventName);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
future1.get();
future.get();
/* for(int i=0;i<100;i++){
disruptor.publishEvent(t,"hello");
}*/
System.out.println("Disruptor shutting down");
log.info("Disruptor shutting down");
Thread.sleep(1000000);
disruptor.shutdown();
System.out.println("Disruptor shut down complete");
log.info("Disruptor shutdown");
}
}
错误如下
Exception in thread "worker_1_9" Exception in thread "worker_1_12" Exception in thread "worker_1_16" Exception in thread "worker_1_20" Exception in thread "worker_1_17" Exception in thread "worker_1_14" java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "worker_1_10" Exception in thread "worker_1_19" Exception in thread "worker_1_11" Exception in thread "worker_1_18" java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "worker_1_13" java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "worker_1_15" java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
想知道我的代码中是否存在错误。非常感谢任何帮助。此外,如果有人可以在中断器中解释 TimeoutBlockingWaitStrategy 的目的,那就太好了。