1

我正在尝试将两个 Play 框架枚举器组合在一起,但合并具有相同键值的值。在大多数情况下,它可以工作,除了用于保留以前没有匹配的值的 Map 每次找到匹配并返回 Done Iteratee 时都会丢失。

有没有办法在返回完成后为下一个步骤调用提供状态?

到目前为止,我发现的任何示例似乎都是围绕将连续值分组在一起然后传递整个分组,而没有将流中的一些任意值分组并且仅在分组后传递特定值。

理想情况下,一旦匹配成功,它将发送匹配的值。到目前为止,我所做的(几乎基于创建基于时间的分块 Enumeratee

def virtualSystemGrouping[E](system:ParentSystem): Iteratee[Detail, Detail] = {
  def step(state: Map[String, Detail])(input:Input[Detail]): Iteratee[Detail, Detail] = {
    input match {
      case Input.EOF => {Done(null, Input.EOF)}
      case Input.Empty =>{Cont[Detail, Detail](i => step(state)(i))}
      case Input.El(e) => {
        if (!system.isVirtual) Done(e)
        if (state.exists((k) =>{k._1.equals(e.name)})) {
          val other = state(e.name)
          // ??? should have a; state - e.name
          // And pass new state and merged value out.
          Done(e + other)
        } else {
          Cont[Detail, Detail](i => step(state + (e.name -> e))(i))
        }
      }
    }
  }
  Cont(step(Map[String,Detail]()))
}

这个的调用看起来像;

   val systems:List[ParentSystem] = getSystems()
   val start = Enumerator.empty[Detail]
   val send = systems.foldLeft(start){(b,p) =>
     b interleave Concurrent.unicast[Detail]{channel =>
       implicit val timeout = Timeout (1 seconds)
       val actor = SystemsActor.lookupActor(p.name + "/details")
       actor map {
         case Some(a) => {a ! SendDetailInformation(channel)}
         case None => {channel.eofAndEnd}
         } recover {
         case t:Throwable => {channel.eofAndEnd}
       }

       }
   } &> Enumeratee.grouped(virtualSystemGrouping(parent)) |>> Iteratee.foreach(e => {output.push(e)})
   send.onComplete(t => output.eofAndEnd)
4

1 回答 1

0

我能够想出的一种方法是使用 Concurrent.unicast 并将通道传递给组合函数。我确信有一种方法可以创建一个 Iteratee/Enumerator,它可以在一个漂亮整洁的包中完成所有工作,但目前我还没有做到这一点。

更新组合功能;

def virtualSystemGrouping[E](system:ParentSystem, output:Channel): Iteratee[Detail, Detail] = {
  def step(state: Map[String, Detail])(input:Input[Detail]): Iteratee[Detail, Detail] = {
    input match {
      case Input.EOF => {
          state.mapValues(r=>output.push(r))
          output.eofAndEnd
          Done(null, Input.EOF)
      }
      case Input.Empty =>{Cont[Detail, Detail](i => step(state)(i))}
      case Input.El(e) => {
        if (!system.isVirtual) {output.push(e); Done(e, Input.Empty)}
        if (state.exists((k) =>{k._1.equals(e.name)})) {
          val other = state(e.name)
          output.push(e + other)
          Cont[Detail, Detail](i => step(state - e.name)(i))
        } else {
          Cont[Detail, Detail](i => step(state + (e.name -> e))(i))
        }
      }
    }
  }
  Cont(step(Map[String,Detail]()))
}

在这里,任何组合值都被推入输出通道,然后进行后续处理。

this 的用法如下所示;

   val systems:List[ParentSystem] = getSystems(parent)
   val start = Enumerator.empty[Detail]
   val concatDetail = systems.foldLeft(start){(b,p) =>
     b interleave Concurrent.unicast[Detail]{channel =>
       implicit val timeout = Timeout (1 seconds)
       val actor = SystemsActor.lookupActor(p.name + "/details")
       actor map {
         case Some(a) => {a ! SendRateInformation(channel)}
         case None => {channel.eofAndEnd}
         } recover {
         case t:Throwable => {channel.eofAndEnd}
       }

       }
   } 


   val combinedDetail = Concurrent.unicast[Detail]{channel => 
    concatDetail &> Enumeratee.grouped(virtualSystemGrouping(parent, channel)) |>> Iteratee.ignore
   }
   val send = combinedDetail |>> Iteratee.foreach(e => {output.push(e)})
   send.onComplete(t => output.eofAndEnd)

与原来的非常相似,除了现在对组合函数的调用是在单播 onStart 块中完成的(其中定义了通道)。concatDetail 是从子系统的交错结果创建的枚举器。这是通过系统分组功能提供的,该功能反过来通过提供的通道推送任何组合结果(以及 EOF 处的剩余结果)。

然后,combinedDetails 枚举器被接收并推送到上游输出通道。

编辑: virtualSystemGrouping 可以概括为;

  def enumGroup[E >: Null, K, M](
      key:(E) => K,
      merge:(E, Option[E]) => M,
      output:Concurrent.Channel[M]
      ): Iteratee[E, E] = {
    def step(state: Map[K, E])(input:Input[E]): Iteratee[E, E] = {
      input match {
        case Input.EOF => {
          state.mapValues(f => output.push(merge(f, None))) //Push along any remaining values.
          output.eofAndEnd();
          Done(null, Input.EOF)
          }
        case Input.Empty =>{ Cont[E, E](i => step(state)(i))}
        case Input.El(e) => {
          if (state.contains(key(e))) {
            output.push(merge(e, state.get(key(e))))
            Cont[E, E](i => step(state - key(e))(i))
          } else {
            Cont[E, E](i => step(state + (key(e) -> e))(i))
          }
        }
      }
    }

    Cont(step(Map[K,E]()))
  }

打电话,例如;

Enumeratee.grouped(
             enumGroup(
             (k=>k.name),
             ((e1, e2) => e2.fold(e1)(v => e1 + v)),
             channel)
            )
于 2014-03-17T13:02:53.323 回答