我正在尝试将两个 Play 框架枚举器组合在一起,但合并具有相同键值的值。在大多数情况下,它可以工作,除了用于保留以前没有匹配的值的 Map 每次找到匹配并返回 Done Iteratee 时都会丢失。
有没有办法在返回完成后为下一个步骤调用提供状态?
到目前为止,我发现的任何示例似乎都是围绕将连续值分组在一起然后传递整个分组,而没有将流中的一些任意值分组并且仅在分组后传递特定值。
理想情况下,一旦匹配成功,它将发送匹配的值。到目前为止,我所做的(几乎基于创建基于时间的分块 Enumeratee)
def virtualSystemGrouping[E](system:ParentSystem): Iteratee[Detail, Detail] = {
def step(state: Map[String, Detail])(input:Input[Detail]): Iteratee[Detail, Detail] = {
input match {
case Input.EOF => {Done(null, Input.EOF)}
case Input.Empty =>{Cont[Detail, Detail](i => step(state)(i))}
case Input.El(e) => {
if (!system.isVirtual) Done(e)
if (state.exists((k) =>{k._1.equals(e.name)})) {
val other = state(e.name)
// ??? should have a; state - e.name
// And pass new state and merged value out.
Done(e + other)
} else {
Cont[Detail, Detail](i => step(state + (e.name -> e))(i))
}
}
}
}
Cont(step(Map[String,Detail]()))
}
这个的调用看起来像;
val systems:List[ParentSystem] = getSystems()
val start = Enumerator.empty[Detail]
val send = systems.foldLeft(start){(b,p) =>
b interleave Concurrent.unicast[Detail]{channel =>
implicit val timeout = Timeout (1 seconds)
val actor = SystemsActor.lookupActor(p.name + "/details")
actor map {
case Some(a) => {a ! SendDetailInformation(channel)}
case None => {channel.eofAndEnd}
} recover {
case t:Throwable => {channel.eofAndEnd}
}
}
} &> Enumeratee.grouped(virtualSystemGrouping(parent)) |>> Iteratee.foreach(e => {output.push(e)})
send.onComplete(t => output.eofAndEnd)