2

我正在尝试掌握在 Play 2 中使用 Iteratees 流式传输彗星结果的方法。我已经掌握了从回调创建枚举器和从地图创建枚举器的句柄。我的问题是 Enumeratee.map,这需要一个函数,该函数接受纯输入并返回纯输出(例如doc中的 String 到 Int 的转换)。我想做的是接受一个纯粹的输入并返回一个结果的承诺。毕竟,枚举器向枚举器提供承诺,枚举器将一个枚举器转换为另一个枚举器,因此应该有一种方法可以使枚举器映射到承诺。

现在,让我举一个例子来说明这一点。假设我有一个 http 请求,其中包含要在我的数据库中查询的 ID 列表。假设这些 id 表示数据库表中的行,请求对这些行执行一组(长)计算,然后返回一组表示计算的 json 对象。由于我有很长的事情要做,所以一次流式传输一个 ID 会很酷,所以我希望有一个枚举管道:

  1. 查询数据库中的一行(返回该行的承诺)
  2. 对行进行长时间计算(取一行并返回计算的承诺)
  3. 将长计算转换为 JSON
  4. &> 将此输出到 Play 2 提供的 Comet 枚举对象

1 有点简单,我可以构造一个带有 fromCallback 的枚举器,它将返回查询结果的承诺。3 也有点简单,因为它是一个简单的 Enumeratee.map

但是我无法理解如何实现步骤 2 的枚举对象的 applyOn。我可以理解我已经构建了一个新的迭代对象,它从“内部”迭代对象获得承诺,flatMap 长计算并返回新的承诺。我没有得到的是如何在给定奇怪的 applyOn 签名的情况下做到这一点:def applyOn[A](it: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]]

有人可以帮我吗?

谢谢

4

2 回答 2

3

主 Enumeratee.mapM[E] 上有一个方法,它采用 f: E => Promise[NE] 并返回 Enumeratee[E, NE]

https://github.com/playframework/Play20/blob/master/framework/src/play/src/main/scala/play/api/libs/iteratee/Enumeratee.scala#L150

于 2012-06-01T22:06:31.070 回答
1

applyOn当您认为 enumeratee 与右侧的 iteratee 组合时,签名更有意义,例如 in enumerator |>> (enumeratee &> iteratee)。iteratee 具有类型Iteratee[E, A],并且 enumerator 需要 aIteratee[Promise[E], Iteratee[E, A]以便可以提取内部 iteratee。所以applyOn必须 aIteratee[E, A]并返回 a Iteratee[Promise[E], Iteratee[E, A]

这是一个实现的概要。它定义了一个阶跃函数,该函数接受输入并返回预期结果。然后递归地遍历承诺元素。

import play.api.libs.concurrent._
import play.api.libs.iteratee._

def unpromise[E]: Enumeratee[Promise[E], E] = new Enumeratee[Promise[E], E] {
  def applyOn[A](inner: Iteratee[E, A]): Iteratee[Promise[E], Iteratee[E, A]] = {
    def step(input: Input[Promise[E]], i: Iteratee[E, A]): Iteratee[Promise[E], Iteratee[E, A]] = {
      input match {
        case Input.EOF => Done(i, Input.EOF)
        case Input.Empty => Cont(step(_, i))
        case Input.El(pe) =>
          val pe2 = pe.map(e => i.feed(Input.El(e))).flatMap(identity)
          val i2 = Iteratee.flatten(pe2)
          i2.pureFlatFold(
            (a, e2) => Done(i2, Input.Empty),
            k => Cont(step(_, i2)),
            (msg, e2) => Done(i2, Input.Empty))
      }
    }
    // should check that inner is not done or error - skipped for clarity
    Cont(step(_, inner))
  }
}

我正在丢弃e2,因此可能有更多代码来确保某些输入不会丢失。

于 2012-05-22T14:47:41.217 回答