我想出了一个有点粗糙的解决方案,但我认为它可以完成工作。
基本思想是使用keepAlive
Source 的方法作为将触发完成的计时器。
但要做到这一点,我们首先必须对数据进行一些抽象。计时器将需要从原始 Source 发送触发器或另一个元组值,因此:
sealed trait Data
object TimerTrigger extends Data
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data
然后将我们的元组源转换为值源。我们仍然会使用groupBy
类似于您的有限流案例的分组:
val originalSource : Source[(Long, String, Int), Unit] = ???
type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid
val groupedDataSource : Source[IDGroup, Unit] =
originalSource.map(t => Value(t._1, t._2, t._3))
.groupBy(_.session_uid)
棘手的部分是处理只是元组的分组:(String, Source[Value,Unit])
。如果时间已经过去,我们需要计时器来通知我们,所以我们需要另一个抽象来知道我们是否仍在计算,或者我们是否由于超时而完成了计算:
sealed trait Sum {
val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum
val zeroSum : Sum = StillComputing(0)
现在我们可以排空每个组的源。keepAlive
如果值的TimerTrigger
来源在timeOut
. 然后Data
keepAlive 中的值与 TimerTrigger 或原始 Source 中的新值进行模式匹配:
val evaluateSum : ((Sum , Data)) => Sum = {
case (runningSum, data) => {
data match {
case TimerTrigger => ComputedSum(runningSum.sum)
case v : Value => StillComputing(runningSum.sum + v.traffic)
}
}
}//end val evaluateSum
type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid
def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult =
idGroup._1 -> idGroup._2.keepAlive(timeOut, () => TimerTrigger)
.scan(zeroSum)(evaluateSum)
.collect {case c : ComputedSum => c.sum}
.runWith(Sink.head)
该集合应用于仅匹配完成总和的部分函数,因此仅在计时器触发后才到达接收器。
然后我们将此处理程序应用于出现的每个分组:
val timeOut = FiniteDuration(5, MINUTES)
val sumSource : Source[SumResult, Unit] =
groupedDataSource map handleGroup(timeOut)
我们现在有一个 Source ,(String,Future[Int])
它是 session_uid 和该 id 的流量总和的 Future。
就像我说的,复杂但符合要求。此外,我不完全确定如果一个 uid 已经分组并已超时,但随后会出现具有相同 uid 的新值会发生什么。