在这里,我快速定义了一个迭代器,它将在固定时间长度 t 内从输入中获取值(以毫秒为单位),以及一个枚举器,它将允许您对输入流进行分组并进一步处理,该输入流被划分为在该长度 t 内构造的段。它依赖 JodaTime 来跟踪自迭代器开始以来已经过去了多少时间。
def throttledTakeIteratee[E](timeInMillis: Long): Iteratee[E, List[E]] = {
var startTime = new Instant()
def step(state: List[E])(input: Input[E]): Iteratee[E, List[E]] = {
val timePassed = new Interval(startTime, new Instant()).toDurationMillis
input match {
case Input.EOF => { startTime = new Instant; Done(state, Input.EOF) }
case Input.Empty => Cont[E, List[E]](i => step(state)(i))
case Input.El(e) =>
if (timePassed >= timeInMillis) { startTime = new Instant; Done(e::state, Input.Empty) }
else Cont[E, List[E]](i => step(e::state)(i))
}
}
Cont(step(List[E]()))
}
def throttledTake[T](timeInMillis: Long) = Enumeratee.grouped(throttledTakeIteratee[T](timeInMillis))