4

我尝试Observable在 Monix 中按键进行拆分,然后将每个事件分组到最后一个n事件GrouppedObservable并将它们发送以进行进一步处理。问题是要分组的键的数量可能是无限的,这会导致内存泄漏。

应用环境

我有来自许多对话的消息的 kafka 流。每个对话都有roomId,我想对这个 id 进行分组以获取 Observables 的集合,每个对话只包含来自单个对话的消息。会话室通常是短暂的,即创建一个独特的新会话roomId,在短时间内交换几十条消息,然后关闭会话。为避免内存泄漏,我希望仅保留 100-1000 个最近对话的缓冲区,并删除较旧的对话。因此,如果一个事件来自一个长期未见的对话,它将被视为新对话,因为其先前消息的缓冲区将被遗忘。

Monix中的 groupBy 方法具有keysBuffer指定如何处理键缓冲区的参数。

我认为指定keyBufferDropOld策略将使我能够实现我想要的行为。

以下是所描述用例的简化版本。

import monix.execution.Scheduler.Implicits.global
import monix.reactive._

import scala.concurrent.duration._
import scala.util.Random

case class Event(key: Key, value: String, seqNr: Int) {
  override def toString: String = s"(k:$key;s:$seqNr)"
}

case class Key(conversationId: Int, messageNr: Int)

object Main {
  def main(args: Array[String]): Unit = {

    val fakeConsumer = Consumer.foreach(println)
    val kafkaSimulator = Observable.interval(1.millisecond)
      .map(n => generateHeavyEvent(n.toInt))

    val groupedMessages = kafkaSimulator.groupBy(_.key)(OverflowStrategy.DropOld(50))
      .mergeMap(slidingWindow)

    groupedMessages.consumeWith(fakeConsumer).runSyncUnsafe()
  }

  def slidingWindow[T](source: Observable[T]): Observable[Seq[T]] =
    source.scan(List.empty[T])(fixedSizeList)

  def fixedSizeList[T](list: List[T], elem: T): List[T] =
    (list :+ elem).takeRight(5)

  def generateHeavyEvent(n: Int): Event = {
    val conversationId: Int = n / 500
    val messageNr: Int = n % 5
    val key = Key(conversationId, messageNr)
    val value = (1 to 1000).map(_ => Random.nextPrintableChar()).toString()
    Event(key, value, n)
  }

}

但是,在 VisualVM 上观察应用程序堆表明内存泄漏。跑了大约30分钟后,我得到了java.lang.OutOfMemoryError: GC overhead limit exceeded

下面是描述运行我的应用程序大约 30 分钟的堆使用图的屏幕截图。(最后压扁的部分在后面OutOfMemoryError

应用程序的 VisualVM 堆图

我的问题是:如何在不泄漏内存的情况下通过可能无限数量的键对 monix 中的事件进行分组?允许丢弃旧密钥

背景资料:

  • 莫尼克斯版本:3.0.0-RC2
  • 斯卡拉版本:2.12.8
4

1 回答 1

2

我有与您类似的用例,读取 kafka 流并按 id 分组。

您想要做的是在GrouppedObservable没有需求时超时/清理。否则,它将永远留在记忆中。因此,您可以执行以下操作:

val eventsStream: Observable[Int] = ???

eventsStream
  .groupBy(_ % 2 == 0)
  .mergeMap {
    _.mapEval(s => Task.delay(println(s)))
     .timeoutOnSlowUpstreamTo(5.minutes, Observable.empty)
  }
  .completedL

于 2019-11-16T08:50:03.460 回答