211

我试图理解破坏者模式。我看过 InfoQ 的视频并尝试阅读他们的论文。我知道涉及到一个环形缓冲区,它被初始化为一个非常大的数组以利用缓存局部性,消除新内存的分配。

听起来好像有一个或多个原子整数可以跟踪位置。每个“事件”似乎都有一个唯一的 id,并且它在环中的位置是通过找到它相对于环的大小等的模数来找到的。

不幸的是,我对它的工作原理没有直观的感觉。我做过很多交易应用,研究过actor模型,研究过SEDA等。

在他们的演讲中,他们提到这种模式基本上就是路由器的工作方式。但是我也没有找到关于路由器如何工作的任何好的描述。

是否有一些好的指示可以更好地解释?

4

5 回答 5

213

Google Code 项目确实参考了关于实现环形缓冲区的技术论文,但是对于想要了解它如何工作的人来说,它有点枯燥、学术和艰难。然而,有一些博客文章已经开始以更易读的方式解释内部结构。有对作为中断器模式核心的环形缓冲区的解释,对消费者障碍(与从中断器读取相关的部分)的描述以及有关处理可用的多个生产者的一些信息

Disruptor 最简单的描述是:它是一种在线程之间以最有效的方式发送消息的方式。它可以用作队列的替代方案,但它也与 SEDA 和 Actors 共享许多功能。

与队列相比:

Disruptor 提供了将消息传递到另一个线程的能力,如果需要,可以将其唤醒(类似于 BlockingQueue)。但是,有 3 个明显的区别。

  1. Disruptor 的用户通过扩展 Entry 类并提供工厂来进行预分配来定义消息的存储方式。这允许内存重用(复制)或条目可以包含对另一个对象的引用。
  2. 将消息放入 Disruptor 是一个两阶段的过程,首先在环形缓冲区中声明一个插槽,它为用户提供可以填充适当数据的条目。然后必须提交条目,这种两阶段方法对于灵活使用上述内存是必要的。正是提交使消息对消费者线程可见。
  3. 消费者有责任跟踪已从环形缓冲区消费的消息。将此责任从环形缓冲区本身移开有助于减少写入争用的数量,因为每个线程都维护自己的计数器。

与演员相比

与大多数其他编程模型相比,Actor 模型更接近 Disruptor,尤其是当您使用提供的 BatchConsumer/BatchHandler 类时。这些类隐藏了维护使用的序列号的所有复杂性,并在发生重要事件时提供一组简单的回调。但是,有一些细微的差别。

  1. Disruptor 使用 1 线程 - 1 消费者模型,其中 Actor 使用 N:M 模型,即您可以拥有任意数量的 Actor,它们将分布在固定数量的线程上(通常每个核心 1 个)。
  2. BatchHandler 接口提供了一个额外的(并且非常重要的)回调onEndOfBatch()。这允许速度较慢的消费者,例如那些执行 I/O 以将事件批处理在一起以提高吞吐量的消费者。可以在其他 Actor 框架中进行批处理,但是由于几乎所有其他框架都不在批处理结束时提供回调,因此您需要使用超时来确定批处理结束,从而导致延迟很差。

与 SEDA 相比

LMAX 构建了 Disruptor 模式来替代基于 SEDA 的方法。

  1. 它比 SEDA 提供的主要改进是并行工作的能力。为此,Disruptor 支持将相同的消息(以相同的顺序)多播给多个消费者。这避免了流水线中对分叉阶段的需要。
  2. 我们还允许消费者等待其他消费者的结果,而不必在他们之间设置另一个排队阶段。消费者可以简单地查看它所依赖的消费者的序列号。这避免了管道中连接阶段的需要。

与内存屏障相比

另一种思考方式是将其视为结构化的、有序的内存屏障。生产者屏障形成写屏障,消费者屏障是读屏障。

于 2011-07-03T08:03:40.823 回答
138

首先,我们想了解它提供的编程模型。

有一个或多个作家。有一个或多个读者。有一行条目,完全按照从旧到新的顺序排列(如图从左到右)。作者可以在右端添加新条目。每个读者从左到右顺序阅读条目。显然,读者无法阅读过去的作家。

没有条目删除的概念。我使用“阅读器”而不是“消费者”来避免条目被消费的形象。但是我们知道最后一个读者左边的条目变得毫无用处。

一般读者可以同时独立阅读。但是,我们可以声明读者之间的依赖关系。阅读器依赖关系可以是任意的无环图。如果阅读器 B 依赖于阅读器 A,则阅读器 B 无法阅读过去的阅读器 A。

阅读器依赖性的产生是因为阅读器 A 可以对条目进行注释,而阅读器 B 依赖于该注释。例如,A 对条目进行一些计算,并将结果存储在a条目中的字段中。然后 A 继续前进,现在 B 可以读取条目,并将aA 的值存储起来。如果阅读器 C 不依赖于 A,则 C 不应尝试阅读a.

这确实是一个有趣的编程模型。无论性能如何,仅该模型就可以使许多应用程序受益。

当然,LMAX 的主要目标是性能。它使用预先分配的条目环。环足够大,但它是有界的,因此系统的负载不会超出设计容量。如果环已满,作者将等到最慢的读者前进并腾出空间。

入口对象是预先分配的并且永远存在,以减少垃圾收集成本。我们不会插入新的条目对象或删除旧的条目对象,相反,作者会要求一个预先存在的条目,填充其字段并通知读者。这种明显的两相动作实际上只是一个原子动作

setNewEntry(EntryPopulator);

interface EntryPopulator{ void populate(Entry existingEntry); }

预分配条目也意味着相邻条目(很可能)位于相邻的内存单元中,并且由于读取器顺序读取条目,这对于利用 CPU 缓存很重要。

并且为避免锁定、CAS 甚至内存屏障做出了很多努力(例如,如果只有一个写入器,则使用非易失性序列变量)

对于读者的开发者:不同的注解读者应该写入不同的字段,避免写争用。(实际上它们应该写入不同的缓存行。)注释阅读器不应该接触其他非依赖阅读器可能读取的任何内容。这就是为什么我说这些读者注释条目,而不是修改条目。

于 2011-07-16T05:48:14.353 回答
41

Martin Fowler 写了一篇关于 LMAX 和破坏者模式的文章The LMAX Architecture,这可能会进一步澄清它。

于 2011-07-19T07:49:45.400 回答
17

实际上,出于纯粹的好奇心,我花时间研究了实际来源,其背后的想法非常简单。撰写本文时的最新版本是 3.2.1。

有一个缓冲区存储预先分配的事件,这些事件将保存数据供消费者读取。

缓冲区由其长度的标志数组(整数数组)支持,该数组描述了缓冲区插槽的可用性(有关详细信息,请参阅更多信息)。数组像 java#AtomicIntegerArray 一样被访问,所以为了这个解释的目的,你不妨假设它是一个。

可以有任意数量的生产者。当生产者想要写入缓冲区时,会生成一个长数字(如在调用 AtomicLong#getAndIncrement 时,Disruptor 实际上使用自己的实现,但它以相同的方式工作)。让我们称这个生成的 long 为 producerCallId。以类似的方式,当消费者结束从缓冲区读取插槽时,会生成消费者CallId。访问最近的 consumerCallId。

(如果有很多消费者,则选择 id 最低的调用。)

然后将这些 id 进行比较,如果两者之间的差异小于缓冲区侧,则允许生产者写入。

(如果 producerCallId 大于最近的 consumerCallId + bufferSize,则表示缓冲区已满,生产者被迫总线等待,直到有可用的位置。)

然后根据生产者的 callId(prducerCallId 以 bufferSize 为模,但由于 bufferSize 始终是 2 的幂(在创建缓冲区时强制执行限制))为生产者分配缓冲区中的插槽,因此使用的实际操作是 producerCallId & (bufferSize - 1 ))。然后可以自由修改该槽中的事件。

(实际算法稍微复杂一些,涉及将最近的 consumerId 缓存在单独的原子引用中,以进行优化。)

当事件被修改时,更改被“发布”。当发布标志数组中的相应插槽时,会填充更新的标志。标志值是循环的次数(producerCallId 除以 bufferSize(同样由于 bufferSize 是 2 的幂,实际操作是右移)。

以类似的方式,可以有任意数量的消费者。每次消费者想要访问缓冲区时,都会生成一个 consumerCallId(取决于消费者如何添加到中断器中,id 生成中使用的原子可以为每个消费者共享或分离)。然后将此 consumerCallId 与最近的 producerCallId 进行比较,如果两者中较小,则允许读取器继续。

(同样producerCallId与consumerCallId是偶数,表示缓冲区为空,consumer被迫等待。等待的方式由disruptor创建时的WaitStrategy定义。)

对于个人消费者(拥有自己的 id 生成器的消费者),接下来检查的是批量消费的能力。缓冲区中的槽按照从对应于 consumerCallId(索引的确定方式与为生产者相同的方式确定)到对应于最近 producerCallId 的顺序检查。

通过将写入标志数组中的标志值与为 consumerCallId 生成的标志值进行比较,在循环中检查它们。如果标志匹配,则表示填充插槽的生产者已提交更改。如果不是,则循环中断,并返回最高提交的 changeId。从 ConsumerCallId 到 changeId 中收到的槽可以批量消费。

如果一组消费者一起阅读(具有共享 id 生成器的消费者),每个消费者只接受一个 callId,并且只检查并返回该单个 callId 的插槽。

于 2014-06-05T14:23:06.307 回答
8

这篇文章

破坏者模式是一个批处理队列,由一个循环数组(即环形缓冲区)支持,其中填充了预先分配的传输对象,它使用内存屏障通过序列同步生产者和消费者。

记忆障碍有点难以解释,我认为 Trisha 的博客在这篇文章中做了最好的尝试:http: //mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast。 html

但是,如果您不想深入了解底层细节,您可以知道 Java 中的内存屏障是通过volatile关键字或通过java.util.concurrent.AtomicLong. 破坏者模式序列是AtomicLongs 并且通过内存屏障而不是锁在生产者和消费者之间来回通信。

我发现通过代码更容易理解一个概念,所以下面的代码是一个简单的来自CoralQueue的helloworld,它是由我所属的 CoralBlocks 完成的破坏者模式实现。在下面的代码中,您可以看到中断模式如何实现批处理以及环形缓冲区(即循环数组)如何允许两个线程之间的无垃圾通信:

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;

public class Sample {

    public static void main(String[] args) throws InterruptedException {

        final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);

        Thread consumer = new Thread() {

            @Override
            public void run() {

                boolean running = true;

                while(running) {
                    long avail;
                    while((avail = queue.availableToPoll()) == 0); // busy spin
                    for(int i = 0; i < avail; i++) {
                        MutableLong ml = queue.poll();
                        if (ml.get() == -1) {
                            running = false;
                        } else {
                            System.out.println(ml.get());
                        }
                    }
                    queue.donePolling();
                }
            }

        };

        consumer.start();

        MutableLong ml;

        for(int i = 0; i < 10; i++) {
            while((ml = queue.nextToDispatch()) == null); // busy spin
            ml.set(System.nanoTime());
            queue.flush();
        }

        // send a message to stop consumer...
        while((ml = queue.nextToDispatch()) == null); // busy spin
        ml.set(-1);
        queue.flush();

        consumer.join(); // wait for the consumer thread to die...
    }
}
于 2014-06-16T22:38:45.353 回答