12

我想创建一个 Play 2 Enumeratee,它接收值并将它们输出,每秒x/毫秒分块在一起。这样,在具有大量用户输入的多用户 websocket 环境中,可以限制每秒接收的帧数。

我知道可以像这样将一组项目组合在一起:

val chunker = Enumeratee.grouped(
  Traversable.take[Array[Double]](5000) &>> Iteratee.consume()
)

是否有基于时间而不是基于项目数量的内置方法来执行此操作?

我正在考虑以某种方式使用预定的 Akka 作业来执行此操作,但乍一看这似乎效率低下,而且我不确定是否会出现并发问题。

4

2 回答 2

3

像这样怎么样?我希望这对你有帮助。

 package controllers

 import play.api._
 import play.api.Play.current
 import play.api.mvc._
 import play.api.libs.iteratee._
 import play.api.libs.concurrent.Akka
 import play.api.libs.concurrent.Promise

 object Application extends Controller {

   def index = Action {
     val queue = new scala.collection.mutable.Queue[String]
     Akka.future {
       while( true ){
         Logger.info("hogehogehoge")
         queue += System.currentTimeMillis.toString
         Thread.sleep(100)
       }
     }
     val timeStream = Enumerator.fromCallback { () =>
       Promise.timeout(Some(queue), 200)
     }
     Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
       var str = ""
       while(queue.nonEmpty){
         str += queue.dequeue + ", "
       }
       str
     })))
   }

 }

这份文件对你也有帮助。 http://www.playframework.com/documentation/2.0/Enumerators

更新 这是 play2.1 版本。

 package controllers

 import play.api._
 import play.api.Play.current
 import play.api.mvc._
 import play.api.libs.iteratee._
 import play.api.libs.concurrent.Akka
 import play.api.libs.concurrent.Promise
 import scala.concurrent._
 import ExecutionContext.Implicits.global

 object Application extends Controller {

   def index = Action {
     val queue = new scala.collection.mutable.Queue[String]
     Akka.future {
       while( true ){
         Logger.info("hogehogehoge")
         queue += System.currentTimeMillis.toString
         Thread.sleep(100)
       }
     }
     val timeStream = Enumerator.repeatM{
       Promise.timeout(queue, 200)
     }
     Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
       var str = ""
       while(queue.nonEmpty){
         str += queue.dequeue + ", "
       }
       str
     })))
   }

 }
于 2013-03-18T00:32:25.863 回答
2

在这里,我快速定义了一个迭代器,它将在固定时间长度 t 内从输入中获取值(以毫秒为单位),以及一个枚举器,它将允许您对输入流进行分组并进一步处理,该输入流被划分为在该长度 t 内构造的段。它依赖 JodaTime 来跟踪自迭代器开始以来已经过去了多少时间。

def throttledTakeIteratee[E](timeInMillis: Long): Iteratee[E, List[E]] = {
  var startTime = new Instant()

  def step(state: List[E])(input: Input[E]): Iteratee[E, List[E]] = {
    val timePassed = new Interval(startTime, new Instant()).toDurationMillis

    input match {
      case Input.EOF => { startTime = new Instant; Done(state, Input.EOF) }
      case Input.Empty => Cont[E, List[E]](i => step(state)(i))
      case Input.El(e) =>
        if (timePassed >= timeInMillis) { startTime = new Instant; Done(e::state, Input.Empty) }
        else Cont[E, List[E]](i => step(e::state)(i))
    }
  }

  Cont(step(List[E]()))
}

def throttledTake[T](timeInMillis: Long) = Enumeratee.grouped(throttledTakeIteratee[T](timeInMillis))
于 2013-03-18T18:31:40.533 回答