18

我的系统有两种不同类型的消息——类型 A 和 B。每条消息都有不同的结构——类型 A 包含一个 int 成员,类型 B 包含一个 double 成员。我的系统需要将这两种类型的消息传递给众多业务逻辑线程。减少延迟非常重要,因此我正在研究使用 Disruptor 以机械方式将消息从主线程传递到业务逻辑线程。

我的问题是破坏者只接受环形缓冲区中的一种对象。这是有道理的,因为中断器预先分配了环形缓冲区中的对象。但是,通过 Disruptor 将两种不同类型的消息传递给我的业务逻辑线程也很困难。据我所知,我有四个选择:

  1. 配置中断器以使用包含固定大小字节数组的对象(如How should one use Disruptor (Disruptor Pattern) to build real-world message systems?推荐的那样)。在这种情况下,主线程必须在将消息发布到中断器之前将消息编码为字节数组,并且每个业务逻辑线程必须在接收时将字节数组解码回对象。这种设置的缺点是业务逻辑线程并没有真正共享破坏者的内存——而是从破坏者提供的字节数组中创建新对象(从而产生垃圾)。这种设置的好处是所有业务逻辑线程都可以从同一个中断器中读取多种不同类型的消息。

  2. 将中断器配置为使用单一类型的对象,但创建多个中断器,每个对象类型一个。在上面的例子中,会有两个独立的破坏者——一个用于类型 A 的对象,另一个用于类型 B 的对象。这种设置的好处是主线程不必将对象编码为字节数组,并且业务较少的逻辑线程可以共享与中断器中使用的相同的对象(不创建垃圾)。这种设置的缺点是每个业务逻辑线程必须以某种方式订阅来自多个破坏者的消息。

  3. 将中断器配置为使用包含消息 A 和 B 的所有字段的单一类型的“超级”对象。这非常不符合 OO 风格,但允许在选项 #1 和 #2 之间进行折衷。

  4. 配置中断器以使用对象引用。但是,在这种情况下,我失去了对象预分配和内存排序的性能优势。

对于这种情况,您有什么建议?我觉得选项 #2 是最干净的解决方案,但我不知道消费者是否或如何在技术上订阅来自多个破坏者的消息。如果有人可以提供如何实施选项 #2 的示例,将不胜感激!

4

3 回答 3

3

配置中断器以使用包含固定大小字节数组的对象(如 How should one use Disruptor (Disruptor Pattern) to build real-world message systems? 推荐的那样)。在这种情况下,主线程必须在将消息发布到中断器之前将消息编码为字节数组,并且每个业务逻辑线程必须在接收时将字节数组解码回对象。这种设置的缺点是业务逻辑线程并没有真正共享破坏者的内存——而是从破坏者提供的字节数组中创建新对象(从而产生垃圾)。这种设置的好处是所有业务逻辑线程都可以从同一个中断器中读取多种不同类型的消息。

这将是我的首选方法,但我对我们的用例略有影响,几乎在我们使用 Disruptor 的每个地方,它要么从某种 I/O 设备接收或发送到某种 I/O 设备,因此我们的基本货币是字节数组。您可以通过使用享元方法进行编组来绕过对象创建。为了查看这方面的示例,我在 Devoxx ( https://github.com/mikeb01/ticketing ) 上展示的示例中使用了 Javolution 的 Struct 和 Union 类。如果您可以在从事件处理程序的 onEvent 调用返回之前完全处理该对象,那么这种方法效果很好。如果事件需要超过该点,那么您需要制作某种数据副本,例如将其反序列化为对象。

将破坏者配置为使用单一类型的对象,但创建多个破坏者,每个对象类型一个。在上面的例子中,会有两个独立的破坏者——一个用于类型 A 的对象,另一个用于类型 B 的对象。这种设置的好处是主线程不必将对象编码为字节数组,并且业务较少的逻辑线程可以共享与中断器中使用的相同的对象(不创建垃圾)。这种设置的缺点是每个业务逻辑线程必须以某种方式订阅来自多个破坏者的消息。

没有尝试过这种方法,您可能需要一个可以从多个环形缓冲区轮询的自定义 EventProcessor。

将中断器配置为使用包含消息 A 和 B 的所有字段的单一类型的“超级”对象。这非常不符合 OO 风格,但允许在选项 #1 和 #2 之间进行折衷。将破坏者配置为使用对象引用。但是,在这种情况下,我失去了对象预分配和内存排序的性能优势。

我们已经在几个可以容忍缺乏预分配的情况下做到了这一点。它工作正常。如果您正在传递对象,那么您需要确保在消费者端完成它们后将它们清空。我们发现对“超级”对象使用双重调度模式可以保持实现相当干净。这样做的一个缺点是,由于 GC 在标记阶段有更多的活动对象要遍历,因此您将获得稍长的 GC 停顿时间。

对于这种情况,您有什么建议?我觉得选项 #2 是最干净的解决方案,但我不知道消费者是否或如何在技术上订阅来自多个破坏者的消息。如果有人可以提供如何实施选项 #2 的示例,将不胜感激!

如果您希望在使用数据方面具有完全的灵活性,另一种选择是不使用环形缓冲区,而是直接与 Sequencer 对话并按照您认为合适的方式定义对象布局。

于 2015-05-18T16:16:02.440 回答
2

Ben Baumgold,我相信您现在已经找到了解决方案。您的#4(或#3)可以通过创建一个事件持有者来轻松实现。将其视为对象的枚举。为了加快查找速度,应该使用枚举类型来丰富事件。请注意,我在持有者中存储了对原始事件的引用。创建复制构造函数或 clone() 并在插入到环形缓冲区时复制事件可能更合适。

举例说明:

// 这是事件中使用的枚举

public enum MyEventEnum {
EVENT_TIMER,
EVENT_MARKETDATA;
}

// 这是持有者。在任何时候,ringbuffer 中的这个实例都只保存一个由 array[ type.ordinal() ] 索引的事件。为什么数组应该从代码中显而易见。

public class RingBufferEventHolder {    
 private MyEventEnum;   
 private EventBase array[];

 public RingBufferEventHolder() {
    array=new EventBase[MyEventEnum.values().length]; 
 }

 // TODO: null the rest
 public void setEvent(EventBase event) {
    type=event.getType();
    switch( event.getType() ) {
        case EVENT_TIMER:
            array[MyEventEnum.EVENT_TIMER.ordinal()]=event;
            break;
        case EVENT_MARKETDATA:
            array[MyEventEnum.EVENT_MARKETDATA.ordinal()]=event;
            break;
        default:
            throw new RuntimeException("Unknown event type " + event );
    }
}

// 发布事件

   EventBase newEvent=new EventMarketData(....);
   // prepare
   long nextSequence = ringBuffer.next(); 
   RingBufferEventHolder holder = ringBuffer.get(nextSequence);
   holder.setEvent(newEvent);
   // make the event available to EventProcessors 
   ringBuffer.publish(nextSequence);
于 2016-10-20T06:13:53.750 回答
0

与 Vortex 的答案太相似,但在保留子事件方面有所不同。它是#3 和#4 的混合。如果我可以管理业务逻辑的复杂性,我会去#2 多个干扰器。

优先于基于数组的枚举事件类型实现的主要关注点是不同事件类型的共享对象类型。

public enum ExchangeEventType{
    PLACE_ORDER,   // -> OrderEvent
    CANCEL_ORDER,  // -> OrderEvent
    MARKET_FEED,   // -> MarketEvent
    MARKET_UPDATE, // -> MarketEvent
    ADD_USER,      // -> AccountEvent
    SUSPEND_USER,  // -> AccountEvent
    RESUME_USER    // -> AccountEvent
}    

public ExchangeEvent{
  private EventType type;
  private EventResultCode resultCode;
  private long timestamp;

  // event type objects
  private OrderEvent orderEvent;
  private MarketEvent marketEvent;
  private AccountEvent accountEvent;
}

在业务逻辑中,多个处理器消耗并产生多种类型的事件,因此我有意识地选择不使用单独的干扰器进行权衡。

例如;

  • #1 引擎使用OrderEvent&AccountEvent
  • #2 引擎使用MarketEvent&OrderEvent
于 2021-01-04T08:32:45.413 回答