我想有一个环形缓冲区队列,它将接收对象并将它们分布在线程池中的多个线程中,以单个生产者到多个消费者的方式。我如何使用破坏者模式来实现这一点?任何HelloDemux
代码示例?谢谢!!!
问问题
1270 次
1 回答
6
本文详细介绍了有关实现中断模式的解复用器的所有内容,但我认为线程池意味着您将需要一个调度程序,这与中断模式背道而驰。要实现 demux,您需要设置固定数量的消费者线程,而不是池,然后让它们从队列尾部获取消息。现在,您可能会问,如果没有调度员,他们怎么能做到这一点?他们只是在队列尾部忙于旋转(或使用其他类型的等待策略,即旋转、让步、停车、休眠等的组合)。现在,您可能会问,他们如何在不互相介入的情况下做到这一点?那么您有两个选择:您可以使用 MODULUS(无锁)或 CAS(轻锁)。每个都有自己的优点和缺点。MODULUS 速度很快,但如果一个消费者落后,可能会导致车道争用。CAS 没有那么快,但不会导致通道争用。
package com.coralblocks.coralqueue.sample.demux;
import com.coralblocks.coralqueue.demux.CASAtomicDemux;
import com.coralblocks.coralqueue.demux.Demux;
public class Sample {
private static final int NUMBER_OF_CONSUMERS = 4;
public static void main(String[] args) throws InterruptedException {
final Demux<StringBuilder> queue = new CASAtomicDemux<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS);
Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];
for(int i = 0; i < consumers.length; i++) {
final int index = i;
consumers[i] = new Thread() {
@Override
public void run() {
boolean running = true;
while(running) {
long avail;
while((avail = queue.availableToPoll(index)) == 0); // busy spin
for(int i = 0; i < avail; i++) {
StringBuilder sb = queue.poll(index);
if (sb == null) break; // mandatory for demuxes!
if (sb.length() == 0) {
running = false;
break; // exit immediately...
} else {
System.out.println(sb.toString());
}
}
queue.donePolling(index);
}
}
};
consumers[i].start();
}
StringBuilder sb;
for(int i = 0; i < 10; i++) {
while((sb = queue.nextToDispatch()) == null); // busy spin
sb.setLength(0);
sb.append("message ").append(i);
queue.flush();
}
// send a message to stop consumers...
for(int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
// because the consumer exit immediately on this message, each
// consumer will get one of these messages and exit...
while((sb = queue.nextToDispatch()) == null); // busy spin
sb.setLength(0);
}
queue.flush(); // sent batch
for(int i = 0; i < consumers.length; i++) consumers[i].join();
}
}
于 2014-05-23T18:36:25.797 回答