4

我已经了解了 LMAX 和这个名为 RingBuffer 的奇妙概念。所以人们说,当只有一个线程写入环形缓冲区时,性能比多个生产者要好得多......

但是,我真的不认为典型应用程序可以仅使用一个线程在 ringbuffer 上写入...我真的不明白 lmax 是如何做到的(如果他们这样做的话)。例如N个不同的交易者在交易所下订单,这些都是异步请求,正在转换为订单并放入ringbuffer,他们怎么可能用一个线程写这些?

问题 1. 我可能会遗漏某些内容或误解某些方面,但是如果您有 N 个并发生产者,如何将它们合并为 1 而不相互锁定?

问题 2。我记得 rxJava 可观察对象,您可以在其中获取 N 个可观察对象并使用Observable.merge将它们合并为 1我想知道它是否以任何方式阻塞或维护任何锁定?

4

2 回答 2

3

多线程写入对 RingBuffer 的影响很小,但在非常重的负载下可能会很重要。

RingBuffer 实现包含一个next节点,将在其中进行下一次添加。如果只有一个线程正在向环写入,则该过程将始终在最短的时间内完成,即buffer[head++] = newData.

要在避免锁定的同时处理多线程,您通常会执行类似while ( !buffer[head++].compareAndSet(null,newValue)){}. 当其他线程干扰数据的存储时,这个紧密的循环将继续执行,从而减慢了吞吐量。

请注意,我在上面使用了伪代码,请getFree在我的实现中查看一个真实示例。

  // Find the next free element and mark it not free.
  private Node<T> getFree() {
    Node<T> freeNode = head.get();
    int skipped = 0;
    // Stop when we hit the end of the list
    // ... or we successfully transit a node from free to not-free.
    // This is the loop that could cause delays under hight thread activity.
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
      skipped += 1;
      freeNode = freeNode.next;
    }
    // ...
  }
于 2015-05-14T13:28:27.263 回答
2

在内部,RxJava 的合并使用了一个序列化结构,我称之为发射器循环,它使用synchronized并且正在阻塞。

我们的“客户”主要在吞吐量和延迟不敏感的情况下使用合并,或者完全单线程和阻塞并不是真正的问题。

可以编写一个我称为queue-drain的非阻塞序列化程序,但不能将合并配置为使用它。

如果您愿意手动处理生产者和消费者线程,也可以直接查看JCTools MpscArrayQueue

于 2015-05-14T14:09:25.397 回答