5

其中一个EventHandler(DatabaseConsumer)Disruptor调用数据库中的存储过程,它太慢以至于阻塞了Disruptor一段时间。

因为我需要Disruptor继续运行而不会阻塞。我正在考虑添加一个额外的队列,以便EventHandler可以用作Producer另一个新创建的线程可以用作Consumer处理数据库的工作,这可以是异步的而不影响Disruptor

这是一些约束:

  1. Disruptor传递给 的对象EventHandler大约是 30KB,这个对象的数量大约是 400k。理论上,需要处理的对象的总大小在 30KBX400K =12GB 左右。所以额外的队列对他们来说应该足够了。
  2. 由于性能很重要,应该避免 GC 暂停。
  3. Java 程序的堆大小只有 2GB。

我正在考虑将文本文件作为一种选择。EventHandler(生产者)将对象写入文件并Consumer从中读取并调用存储过程。问题是如何处理它到达文件末尾的情况以及如何知道新的行。

有谁以前解决过这种情况?有什么建议吗?

4

2 回答 2

2

简短的回答是调整干扰器的大小以应对突发的大小而不是整个体积,请记住干扰器可以只包含对 30kb 对象的引用,整个对象不需要在环形缓冲区中。

在您的数据库需要内存来缓冲中断器之前,任何形式的缓冲都为您提供了在数据库落后太多时对系统其余部分施加背压的选项。也就是说,您可以减慢干扰器的输入速度。

假脱机到文件的另一个选项是查看Java Chronicle,它使用内存映射文件将内容持久化到磁盘。

更复杂的答案是利用破坏者的批处理效果,以便您的数据库可以赶上。即使用EventHandler 将事件收集在一起,将一批事件作为一个单元提交给数据库。这种做法允许 EventHandler 在事情备份时变得更高效,从而增加吞吐量。

于 2014-04-09T10:05:42.463 回答
1

简短的回答:不要使用破坏者。使用支持重传的分布式 MQ。

长答案:如果你有快速的生产者和缓慢的消费者,你将需要某种重传机制。我认为你无法摆脱这种情况,除非你能容忍系统中出现讨厌的块(即巨大的延迟)。这就是分布式 MQ(消息队列)发挥作用的时候。Disruptor 不是分布式 MQ,但您可以尝试实现类似的东西。这个想法是:

  • 所有消息由消费者按顺序排序和处理
  • 如果队列已满,则丢弃消息
  • 如果消费者检测到消息间隙,它将请求重新传输丢失的消息,缓冲未来的消息,直到收到间隙

使用这种方法,消费者可以随心所欲地慢,因为它总是可以随时请求重新传输丢失的任何消息。我们这里缺少的是重传实体。在分布式 MQ 中,它将是一个单独且独立的节点,将所有消息持久化到磁盘,因此它可以随时将任何消息重播到任何其他节点。由于您在这里不是在谈论 MQ,而是在谈论破坏者,所以您将不得不以某种方式自己在另一个线程上实现该重传机制。这是一个非常有趣的问题,没有简单的答案或秘诀。我会使用多个中断队列,以便您的消费者可以执行以下操作:

  • 从主通道读取(即主中断队列)
  • 如果您检测到序列间隙,请转到连接到重播线程的另一个中断队列。您实际上需要两个队列,一个用于请求丢失的消息,另一个用于接收它们。
  • 重播线程将有另一个中断队列,它从那里接收所有消息并将其持久化到磁盘。

您需要确保您的重播线程能够以足够快的速度将消息写入磁盘。如果它不能,那么除了阻塞整个系统之外,就没有其他逃生的办法了。幸运的是,如果您知道自己在做什么,磁盘 i/o 可以非常快地完成。

如果消费者速度较慢,如果你能负担得起阻止生产者,你就可以忘记我所说的一切。但是,如果生产者从网络获取消息,阻止它们最终会给您丢包 (UDP) 和可能的 IOException (TCP)。

如您所见,这是一个非常有趣的问题,答案非常复杂。在Coral Blocks ,我们有在CoralReactor之上开发分布式 MQ 的经验。您可以查看我们网站上的一些文章。

于 2014-05-24T02:23:32.403 回答