流数据播放,非常容易。
这是我打算如何做的一个简单示例(如果我做错了,请告诉我):
def getRandomStream = Action { implicit req =>
import scala.util.Random
import scala.concurrent.{blocking, ExecutionContext}
import ExecutionContext.Implicits.global
def getSomeRandomFutures: List[Future[String]] = {
for {
i <- (1 to 10).toList
r = Random.nextInt(30000)
} yield Future {
blocking {
Thread.sleep(r)
}
s"after $r ms. index: $i.\n"
}
}
val enumerator = Concurrent.unicast[Array[Byte]] {
(channel: Concurrent.Channel[Array[Byte]]) => {
getSomeRandomFutures.foreach {
_.onComplete {
case Success(x: String) => channel.push(x.getBytes("utf-8"))
case Failure(t) => channel.push(t.getMessage)
}
}
//following future will close the connection
Future {
blocking {
Thread.sleep(30000)
}
}.onComplete {
case Success(_) => channel.eofAndEnd()
case Failure(t) => channel.end(t)
}
}
}
new Status(200).chunked(enumerator).as("text/plain;charset=UTF-8")
}
现在,如果你得到这个动作的服务,你会得到类似的东西:
after 1757 ms. index: 10.
after 3772 ms. index: 3.
after 4282 ms. index: 6.
after 4788 ms. index: 8.
after 10842 ms. index: 7.
after 12225 ms. index: 4.
after 14085 ms. index: 9.
after 17110 ms. index: 1.
after 21213 ms. index: 2.
after 21516 ms. index: 5.
在随机时间过去后接收每一行。
现在,假设我想在将数据从服务器流式传输到客户端时保留这个简单的示例,但我也想支持从客户端到服务器的完整数据流式传输。
所以,假设我正在实现一个BodyParser
将输入解析为List[Future[String]]
. 这意味着,现在,我Action
可能看起来像这样:
def getParsedStream = Action(myBodyParser) { implicit req =>
val xs: List[Future[String]] = req.body
val enumerator = Concurrent.unicast[Array[Byte]] {
(channel: Concurrent.Channel[Array[Byte]]) => {
xs.foreach {
_.onComplete {
case Success(x: String) => channel.push(x.getBytes("utf-8"))
case Failure(t) => channel.push(t.getMessage)
}
}
//again, following future will close the connection
Future.sequence(xs).onComplete {
case Success(_) => channel.eofAndEnd()
case Failure(t) => channel.end(t)
}
}
}
new Status(200).chunked(enumerator).as("text/plain;charset=UTF-8")
}
但这仍然不是我想要实现的。在这种情况下,我只会在请求完成后从请求中获取正文,并且所有数据都已上传到服务器。但我想开始服务请求。一个简单的演示,就是将任何接收到的线路回显给用户,同时保持连接处于活动状态。
所以这是我目前的想法:
如果我BodyParser
会返回 anEnumerator[String]
而不是List[Future[String]]
?
在这种情况下,我可以简单地执行以下操作:
def getParsedStream = Action(myBodyParser) { implicit req =>
new Status(200).chunked(req.body).as("text/plain;charset=UTF-8")
}
所以现在,我面临着如何实现这样一个BodyParser
. 更准确地说我到底需要什么,好吧:
我需要接收数据块以解析为字符串,其中每个字符串都以换行符结尾\n
(尽管可能包含多行......)。每个“行块”都将由一些(与此问题无关的)计算处理,这将产生 aString
或更好的 a Future[String]
,因为此计算可能需要一些时间。此计算的结果字符串应在准备好时发送给用户,就像上面的随机示例一样。这应该在发送更多数据的同时发生。
我已经研究了几种试图实现它的资源,但到目前为止都没有成功。例如scalaQuery 播放迭代-> 看起来这个人正在做与我想做的事情相似的事情,但我无法将其转化为可用的示例。(并且从 play2.0 到 play2.2 API 的差异无济于事......)
所以,总结一下:这是正确的方法(考虑到我不想使用WebSockets
)?如果是这样,我该如何实施BodyParser
?
编辑:
我刚刚偶然发现有关此问题的播放文档的注释,说:
注意:也可以通过使用由
BodyParser
接收输入数据块的自定义处理的无限 HTTP 请求以另一种方式实现相同类型的实时通信,但这要复杂得多。
所以,我不会放弃,现在我确定这是可以实现的。