我想编写一个流,akka-stream
用于通过 session_uid 对来自无限流的事件进行分组,并计算每个会话的流量总和(我的上一个问题中的详细信息)。
我将Source#groupBy
通过 session_uid 将函数用于组事件,但似乎该函数会在内部累积所有组键并且无法释放它们。这是导致java.lang.OutOfMemoryError: Java heap space
异常。这是重现它的代码:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.util.Random
object GroupByMemoryLeakApplication extends App {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()
val bigString = Random.nextString(512 * 1024)
// This is infinite stream of events (i.e. this is session ids)
val eventsSource = Source(() => (1 to 1000000000).iterator)
.map((i) => { (i, bigString + i) })
// This is flow pass event through groupBy function
val groupByFlow = Flow[(Int, String)]
.groupBy(_._2)
.map {
case (sessionUid, sessionEvents) =>
sessionEvents
.map(e => { println(e._1); e })
.runWith(Sink.head)
}
.mapAsync(4)(identity)
eventsSource
.via(groupByFlow)
.runWith(Sink.ignore)
.onComplete(_ => system.shutdown())
}
那么,在完成相关事件流( )处理后,如何释放sessionUid
里面的分组键( )呢?groupBy
sessionEvents
可能有人知道基于 session_uid 对事件进行分组的另一种方式akka-stream
吗?