实际上,出于纯粹的好奇心,我花时间研究了实际来源,其背后的想法非常简单。撰写本文时的最新版本是 3.2.1。
有一个缓冲区存储预先分配的事件,这些事件将保存数据供消费者读取。
缓冲区由其长度的标志数组(整数数组)支持,该数组描述了缓冲区插槽的可用性(有关详细信息,请参阅更多信息)。数组像 java#AtomicIntegerArray 一样被访问,所以为了这个解释的目的,你不妨假设它是一个。
可以有任意数量的生产者。当生产者想要写入缓冲区时,会生成一个长数字(如在调用 AtomicLong#getAndIncrement 时,Disruptor 实际上使用自己的实现,但它以相同的方式工作)。让我们称这个生成的 long 为 producerCallId。以类似的方式,当消费者结束从缓冲区读取插槽时,会生成消费者CallId。访问最近的 consumerCallId。
(如果有很多消费者,则选择 id 最低的调用。)
然后将这些 id 进行比较,如果两者之间的差异小于缓冲区侧,则允许生产者写入。
(如果 producerCallId 大于最近的 consumerCallId + bufferSize,则表示缓冲区已满,生产者被迫总线等待,直到有可用的位置。)
然后根据生产者的 callId(prducerCallId 以 bufferSize 为模,但由于 bufferSize 始终是 2 的幂(在创建缓冲区时强制执行限制))为生产者分配缓冲区中的插槽,因此使用的实际操作是 producerCallId & (bufferSize - 1 ))。然后可以自由修改该槽中的事件。
(实际算法稍微复杂一些,涉及将最近的 consumerId 缓存在单独的原子引用中,以进行优化。)
当事件被修改时,更改被“发布”。当发布标志数组中的相应插槽时,会填充更新的标志。标志值是循环的次数(producerCallId 除以 bufferSize(同样由于 bufferSize 是 2 的幂,实际操作是右移)。
以类似的方式,可以有任意数量的消费者。每次消费者想要访问缓冲区时,都会生成一个 consumerCallId(取决于消费者如何添加到中断器中,id 生成中使用的原子可以为每个消费者共享或分离)。然后将此 consumerCallId 与最近的 producerCallId 进行比较,如果两者中较小,则允许读取器继续。
(同样producerCallId与consumerCallId是偶数,表示缓冲区为空,consumer被迫等待。等待的方式由disruptor创建时的WaitStrategy定义。)
对于个人消费者(拥有自己的 id 生成器的消费者),接下来检查的是批量消费的能力。缓冲区中的槽按照从对应于 consumerCallId(索引的确定方式与为生产者相同的方式确定)到对应于最近 producerCallId 的顺序检查。
通过将写入标志数组中的标志值与为 consumerCallId 生成的标志值进行比较,在循环中检查它们。如果标志匹配,则表示填充插槽的生产者已提交更改。如果不是,则循环中断,并返回最高提交的 changeId。从 ConsumerCallId 到 changeId 中收到的槽可以批量消费。
如果一组消费者一起阅读(具有共享 id 生成器的消费者),每个消费者只接受一个 callId,并且只检查并返回该单个 callId 的插槽。