1

我使用带有 ReactiveMongo 的 Play 框架。大多数 ReactiveMongo API 都是基于 Play 的Enumerator。只要我从 MongoDB 中获取一些数据并“按原样”异步返回,一切都很好。数据的转换,如将 BSON 转换为字符串,使用Enumerator.map是显而易见的。

但是今天我遇到了一个问题,该问题归结为以下代码。我浪费了半天的时间试图创建一个Enumerator消耗给定Enumerator项目并在它们之间插入一些项目的项目。重要的是不要一次加载所有项目,因为它们可能有很多(代码示例只有两个项目“1”和“2”)。但在语义上它类似于mkString集合。我确信它可以很容易地完成,但我能提供的最好的 - 就是这段代码。创建Enumeratorusing的非常相似的代码Concurrent.broadcast非常适合 WebSockets。但在这里,即使这样也行不通。HTTP 响应永远不会回来。当我查看时Enumeratee,它看起来应该提供这样的功能,但我找不到解决问题的方法。

PS试图打电话chan.eofAndEndIteratee.mapDonechunked(enums >>> Enumerator.eof不是chunked(enums)- 没有帮助。有时响应会返回,但不包含正确的数据。我想念什么?

def trans(in:Enumerator[String]):Enumerator[String] = {
  val (res, chan) = Concurrent.broadcast[String]

  val iter = Iteratee.fold(true) { (isFirst, curr:String) =>
    if (!isFirst)
      chan.push("<-------->")
    chan.push(curr)
    false
  }

  in.apply(iter)

  res
}

def enums:Enumerator[String] = {
  val en12 = Enumerator[String]("1", "2")

  trans(en12)
  //en12 //if I comment the previous line and uncomment this, it prints "12" as expected
}

def enum = Action {
  Ok.chunked(enums)
}
4

2 回答 2

2

这是我的解决方案,我认为它对于此类问题是正确的。欢迎评论:

def fill[From](
    prefix: From => Enumerator[From],
    infix: (From, From) => Enumerator[From],
    suffix: From => Enumerator[From]
    )(implicit ec:ExecutionContext) = new Enumeratee[From, From] {
  override def applyOn[A](inner: Iteratee[From, A]): Iteratee[From, Iteratee[From, A]] = {
    //type of the state we will use for fold
    case class State(prev:Option[From], it:Iteratee[From, A])

    Iteratee.foldM(State(None, inner)) { (prevState, newItem:From) =>
      val toInsert = prevState.prev match {
        case None => prefix(newItem)
        case Some(prevItem) => infix (prevItem, newItem)
      }

      for(newIt <- toInsert >>> Enumerator(newItem) |>> prevState.it)
        yield State(Some(newItem), newIt)

    } mapM {
      case State(None, it) => //this is possible when our input was empty
        Future.successful(it)
      case State(Some(lastItem), it) =>
        suffix(lastItem) |>> it
    }
  }
}

// if there are missing integers between from and to, fill that gap with 0
def fillGap(from:Int, to:Int)(implicit ec:ExecutionContext) = Enumerator enumerate List.fill(to-from-1)(0)
def fillFrom(x:Int)(input:Int)(implicit ec:ExecutionContext) = fillGap(x, input)
def fillTo(x:Int)(input:Int)(implicit ec:ExecutionContext) = fillGap(input, x)

val ints = Enumerator(10, 12, 15)
val toStr = Enumeratee.map[Int] (_.toString)

val infill = fill(
  fillFrom(5),
  fillGap,
  fillTo(20)
)

val res = ints &> infill &> toStr // res will have 0,0,0,0,10,0,12,0,0,15,0,0,0,0
于 2013-12-26T12:40:31.213 回答
0

You wrote that you are working with WebSockets, so why don't you use dedicated solution for that? What you wrote is better for Server-Sent-Events rather than WS. As I understood you, you want to filter your results before sending them back to client? If its correct then you Enumeratee instead of Enumerator. Enumeratee is transformation from-to. This is very good piece of code how to use Enumeratee. May be is not directly about what you need but I found there inspiration for my project. Maybe when you analyze given code you would find best solution.

于 2013-12-18T13:31:12.700 回答