我尝试Observable
在 Monix 中按键进行拆分,然后将每个事件分组到最后一个n
事件GrouppedObservable
并将它们发送以进行进一步处理。问题是要分组的键的数量可能是无限的,这会导致内存泄漏。
应用环境:
我有来自许多对话的消息的 kafka 流。每个对话都有roomId
,我想对这个 id 进行分组以获取 Observables 的集合,每个对话只包含来自单个对话的消息。会话室通常是短暂的,即创建一个独特的新会话roomId
,在短时间内交换几十条消息,然后关闭会话。为避免内存泄漏,我希望仅保留 100-1000 个最近对话的缓冲区,并删除较旧的对话。因此,如果一个事件来自一个长期未见的对话,它将被视为新对话,因为其先前消息的缓冲区将被遗忘。
Monix中的 groupBy 方法具有keysBuffer
指定如何处理键缓冲区的参数。
我认为指定keyBuffer
DropOld策略将使我能够实现我想要的行为。
以下是所描述用例的简化版本。
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
import scala.concurrent.duration._
import scala.util.Random
case class Event(key: Key, value: String, seqNr: Int) {
override def toString: String = s"(k:$key;s:$seqNr)"
}
case class Key(conversationId: Int, messageNr: Int)
object Main {
def main(args: Array[String]): Unit = {
val fakeConsumer = Consumer.foreach(println)
val kafkaSimulator = Observable.interval(1.millisecond)
.map(n => generateHeavyEvent(n.toInt))
val groupedMessages = kafkaSimulator.groupBy(_.key)(OverflowStrategy.DropOld(50))
.mergeMap(slidingWindow)
groupedMessages.consumeWith(fakeConsumer).runSyncUnsafe()
}
def slidingWindow[T](source: Observable[T]): Observable[Seq[T]] =
source.scan(List.empty[T])(fixedSizeList)
def fixedSizeList[T](list: List[T], elem: T): List[T] =
(list :+ elem).takeRight(5)
def generateHeavyEvent(n: Int): Event = {
val conversationId: Int = n / 500
val messageNr: Int = n % 5
val key = Key(conversationId, messageNr)
val value = (1 to 1000).map(_ => Random.nextPrintableChar()).toString()
Event(key, value, n)
}
}
但是,在 VisualVM 上观察应用程序堆表明内存泄漏。跑了大约30分钟后,我得到了java.lang.OutOfMemoryError: GC overhead limit exceeded
下面是描述运行我的应用程序大约 30 分钟的堆使用图的屏幕截图。(最后压扁的部分在后面OutOfMemoryError
)
我的问题是:如何在不泄漏内存的情况下通过可能无限数量的键对 monix 中的事件进行分组?允许丢弃旧密钥
背景资料:
- 莫尼克斯版本:
3.0.0-RC2
- 斯卡拉版本:
2.12.8