Can anyone tell me what is Disruptor design pattern with simple example ? I will want to know basics of this design pattern.
3 回答
一个简单的谷歌给了我很多信息,包括Martin Fowler 的介绍
在粗略的层面上,您可以将 Disruptor 视为队列的多播图,其中生产者将对象放在其上,这些对象通过单独的下游队列发送给所有消费者以供并行消费。当你往里面看时,你会发现这个队列网络实际上是一个单一的数据结构——一个环形缓冲区。每个生产者和消费者都有一个序列计数器来指示它当前正在处理缓冲区中的哪个槽。每个生产者/消费者写入自己的序列计数器,但可以读取其他的序列计数器。这样,生产者可以读取消费者的计数器,以确保它想要写入的插槽是可用的,而计数器上没有任何锁。类似地,消费者可以通过观察计数器来确保它只在另一个消费者使用完消息后才处理消息。
GitHub 项目包含Java 代码 + 文档。
我花了几天时间阅读它,并且刚刚开始在架构上掌握它,并掌握这种设计模式出现的原因。
有关如何实现的简单代码示例,请尝试https://github.com/trevorbernard/disruptor-examples
要获得包括白皮书链接、源代码和 UML 图的良好描述,您可以尝试从 http://martinfowler.com/articles/lmax.html开始
从这篇文章:
破坏者模式是一个批处理队列,由一个循环数组(即环形缓冲区)支持,其中填充了预先分配的传输对象,它使用内存屏障通过序列同步生产者和消费者。
幸运的是,您无需了解破坏者模式的内在细节即可使用它。如果您发现通过代码更容易理解,下面是CoralQueue的Hello World,这是一个用于实现中断模式的线程间通信的超低延迟队列。
package com.coralblocks.coralqueue.sample.queue;
import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.Builder;
public class Basics {
public static void main(String[] args) {
final Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(1024, new Builder<StringBuilder>() {
@Override
public StringBuilder newInstance() {
return new StringBuilder(1024);
}
});
Thread producer = new Thread(new Runnable() {
private final StringBuilder getStringBuilder() {
StringBuilder sb;
while((sb = queue.nextToDispatch()) == null) {
// queue can be full if the size of the queue
// is small and/or the consumer is too slow
// busy spin (you can also use a wait strategy instead)
}
return sb;
}
@Override
public void run() {
StringBuilder sb;
while(true) { // the main loop of the thread
// (...) do whatever you have to do here...
// and whenever you want to send a message to
// the other thread you can just do:
sb = getStringBuilder();
sb.setLength(0);
sb.append("Hello!");
queue.flush();
// you can also send in batches to increase throughput:
sb = getStringBuilder();
sb.setLength(0);
sb.append("Hi!");
sb = getStringBuilder();
sb.setLength(0);
sb.append("Hi again!");
queue.flush(); // dispatch the two messages above...
}
}
}, "Producer");
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
while (true) { // the main loop of the thread
// (...) do whatever you have to do here...
// and whenever you want to check if the producer
// has sent a message you just do:
long avail;
while((avail = queue.availableToPoll()) == 0) {
// queue can be empty!
// busy spin (you can also use a wait strategy instead)
}
for(int i = 0; i < avail; i++) {
StringBuilder sb = queue.poll();
// (...) do whatever you want to do with the data
// just don't call toString() to create garbage...
// copy byte-by-byte instead...
}
queue.donePolling();
}
}
}, "Consumer");
consumer.start();
producer.start();
}
}