7

因此 Play2.0 Enumeratee 页面显示了使用&>orthrough方法将 an 更改Enumerator[String]为的示例Enumerator[Int]

val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt }
val ints: Enumerator[Int] = strings &> toInt

还有一个Enumeratee.groupedenumeratee 可以从单个元素创建块的枚举器。这似乎工作正常。

但我看到的是,通常的输入将采用Array[Byte](由Enumerator.fromFileand返回Enumerator.fromStream)的形式。考虑到这一点,我想将这些Array[Byte]输入转换为Enumerator[String],例如,每个字符串都是一行(以 a 结尾'\n')。线条和元素的边界Array[Byte]通常不匹配。如何编写一个可以将分块数组转换为分块字符串的枚举器?

目的是在每行可用时将这些行分块返回给浏览器Array[Byte],并保留不属于完整行的剩余字节,直到下一个输入块出现。

理想情况下,我希望有一个方法,给定 aniter: Iteratee[Array[Byte], T]和 anEnumerator[Array[Byte]]将给我一个Enumerator[T],我的 T 元素被解析iter

附加信息:我有一些时间来清理我的代码,这是我正在尝试做的一个具体示例。我有以下检测下一行的迭代器:

import play.api.libs.iteratee._
type AB = Array[Byte]

def takeWhile(pred: Byte => Boolean): Iteratee[AB, AB] = {
  def step(e: Input[AB], acc: AB): Iteratee[AB, AB] = e match {
    case Input.EOF => Done(acc, Input.EOF)
    case Input.Empty => Cont(step(_, acc))
    case Input.El(arr) =>
      val (taking, rest) = arr.span(pred)
      if (rest.length > 0) Done(acc ++ taking, Input.El(rest)) 
      else Cont(step(_, acc ++ taking)) 
  }
  Cont(step(_, Array()))
}

val line = for {
  bytes <- takeWhile(b => !(b == '\n' || b == '\r'))
  _     <- takeWhile(b =>   b == '\n' || b == '\r')
} yield bytes

我想做的是这样的:

Ok.stream(Enumerator.fromFile(filename) &> chunkBy(line)).as("text/plain")
4

2 回答 2

5

https://github.com/playframework/Play20/commit/f979006a7e2c1c08ca56ee0bae67b5463ee099c1#L3R131做一些类似于你正在做的事情。我固定分组以处理剩余的输入。代码基本上看起来像:

val upToNewLine = 
  Traversable.splitOnceAt[String,Char](_ != '\n')  &>>
  Iteratee.consume()

Enumeratee.grouped(upToNewLine)

我也必须以同样的方式修复重复

于 2012-05-08T21:04:01.730 回答
2

这是我经过几个小时的实验后得到的。我希望有人能想出一个更优雅的实现,因为我几乎无法理解我的。

def chunkBy(chunker: Iteratee[AB, AB]) = new Enumeratee[AB, AB] {
  def applyOn[A](inner: Iteratee[AB, A]): Iteratee[AB, Iteratee[AB, A]] = {
    def step(e: Input[AB], in: Iteratee[AB, A], leftover: Input[AB]):
          Iteratee[AB, Iteratee[AB, A]] = {
      e match {
        case Input.EOF =>
          // if we have a leftover and it's a chunk, then output it
          leftover match {
            case Input.EOF | Input.Empty => Done(in, leftover)
            case Input.El(_) =>
              val lastChunk = Iteratee.flatten(Enumerator.enumInput(leftover)
                >>> Enumerator.eof |>> chunker)
              lastChunk.pureFlatFold(
                done = { (chunk, rest) =>
                  val nextIn = Iteratee.flatten(Enumerator(chunk) |>> in)
                  nextIn.pureFlatFold(
                    done = (a, e2) => Done(nextIn, e2),
                    // nothing more will come
                    cont = k => Done(nextIn, Input.EOF),
                    error = (msg, e2) => Error(msg, e2))
                },
                // not enough content to get a chunk, so drop content
                cont = k => Done(in, Input.EOF),
                error = (msg, e2) => Error(msg, e2))
          }
        case Input.Empty => Cont(step(_, in, leftover))
        case Input.El(arr) =>
          // feed through chunker
          val iChunks = Iteratee.flatten(
            Enumerator.enumInput(leftover)
              >>> Enumerator(arr)
              >>> Enumerator.eof // to extract the leftover
              |>> repeat(chunker))
          iChunks.pureFlatFold(
            done = { (chunks, rest) =>
              // we have our chunks, feed them to the inner iteratee
              val nextIn = Iteratee.flatten(Enumerator(chunks: _*) |>> in)
              nextIn.pureFlatFold(
                done = (a, e2) => Done(nextIn, e2),
                // inner iteratee needs more data
                cont = k => Cont(step(_: Input[AB], nextIn,
                  // we have to ignore the EOF we fed to repeat
                  if (rest == Input.EOF) Input.Empty else rest)),
                error = (msg, e2) => Error(msg, e2))
            },
            // not enough content to get a chunk, continue
            cont = k => Cont(step(_: Input[AB], in, leftover)),
            error = (msg, e2) => Error(msg, e2))
      }
    }
    Cont(step(_, inner, Input.Empty))
  }
}

这是我自定义的定义repeat

// withhold the last chunk so that it may be concatenated with the next one
def repeat(chunker: Iteratee[AB, AB]) = {
  def loop(e: Input[AB], ch: Iteratee[AB, AB], acc: Vector[AB], 
        leftover: Input[AB]): Iteratee[AB, Vector[AB]] = e match {
    case Input.EOF => ch.pureFlatFold(
      done = (a, e) => Done(acc, leftover),
      cont = k => k(Input.EOF).pureFlatFold(
        done = (a, e) => Done(acc, Input.El(a)),
        cont = k => sys.error("divergent iter"),
        error = (msg, e) => Error(msg, e)),
      error = (msg, e) => Error(msg, e))
    case Input.Empty => Cont(loop(_, ch, acc, leftover))
    case Input.El(_) =>
      val i = Iteratee.flatten(Enumerator.enumInput(leftover) 
          >>> Enumerator.enumInput(e) |>> ch)
      i.pureFlatFold(
        done = (a, e) => loop(e, chunker, acc :+ a, Input.Empty),
        cont = k => Cont(loop(_, i, acc, Input.Empty)),
        error = (msg, e) => Error(msg, e))
  }
  Cont(loop(_: Input[AB], chunker, Vector(), Input.Empty))
}

这适用于一些样本,包括这个:

 val source = Enumerator(
   "bippy".getBytes,
   "foo\n\rbar\n\r\n\rbaz\nb".getBytes,
   "azam\ntoto\n\n".getBytes)
 Ok.stream(source 
   &> chunkBy(line) 
   &> Enumeratee.map(l => l ++ ".\n".getBytes)
 ).as("text/plain")

哪个打印:

bippyfoo.
bar.
baz.
bazam.
toto.
于 2012-04-29T06:28:19.070 回答