2

流数据播放,非常容易。
这是我打算如何做的一个简单示例(如果我做错了,请告诉我):

def getRandomStream = Action { implicit req =>

  import scala.util.Random
  import scala.concurrent.{blocking, ExecutionContext}
  import ExecutionContext.Implicits.global

  def getSomeRandomFutures: List[Future[String]] = {
    for {
      i <- (1 to 10).toList
      r = Random.nextInt(30000)
    } yield Future {
      blocking {
        Thread.sleep(r)
      }
      s"after $r ms. index: $i.\n"
    }
  }

  val enumerator = Concurrent.unicast[Array[Byte]] {
    (channel: Concurrent.Channel[Array[Byte]]) => {
      getSomeRandomFutures.foreach {
        _.onComplete {
          case Success(x: String) => channel.push(x.getBytes("utf-8"))
          case Failure(t) => channel.push(t.getMessage)
        }
      }
      //following future will close the connection
      Future {
        blocking {
          Thread.sleep(30000)
        }
      }.onComplete {
        case Success(_) => channel.eofAndEnd()
        case Failure(t) => channel.end(t)
      }
    }
  }
  new Status(200).chunked(enumerator).as("text/plain;charset=UTF-8")
}

现在,如果你得到这个动作的服务,你会得到类似的东西:

after 1757 ms. index: 10.
after 3772 ms. index: 3.
after 4282 ms. index: 6.
after 4788 ms. index: 8.
after 10842 ms. index: 7.
after 12225 ms. index: 4.
after 14085 ms. index: 9.
after 17110 ms. index: 1.
after 21213 ms. index: 2.
after 21516 ms. index: 5.

在随机时间过去后接收每一行。
现在,假设我想在将数据从服务器流式传输到客户端时保留这个简单的示例,但我也想支持从客户端到服务器的完整数据流式传输。

所以,假设我正在实现一个BodyParser将输入解析为List[Future[String]]. 这意味着,现在,我Action可能看起来像这样:

def getParsedStream = Action(myBodyParser) { implicit req =>

  val xs: List[Future[String]] = req.body

  val enumerator = Concurrent.unicast[Array[Byte]] {
    (channel: Concurrent.Channel[Array[Byte]]) => {
      xs.foreach {
        _.onComplete {
          case Success(x: String) => channel.push(x.getBytes("utf-8"))
          case Failure(t) => channel.push(t.getMessage)
        }
      }
      //again, following future will close the connection
      Future.sequence(xs).onComplete {
        case Success(_) => channel.eofAndEnd()
        case Failure(t) => channel.end(t)
      }
    }
  }
  new Status(200).chunked(enumerator).as("text/plain;charset=UTF-8")
}

但这仍然不是我想要实现的。在这种情况下,我只会请求完成后从请求中获取正文,并且所有数据都已上传到服务器。但我想开始服务请求。一个简单的演示,就是将任何接收到的线路回显给用户,同时保持连接处于活动状态。

所以这是我目前的想法:
如果我BodyParser会返回 anEnumerator[String]而不是List[Future[String]]
在这种情况下,我可以简单地执行以下操作:

def getParsedStream = Action(myBodyParser) { implicit req =>
  new Status(200).chunked(req.body).as("text/plain;charset=UTF-8")
}

所以现在,我面临着如何实现这样一个BodyParser. 更准确地说我到底需要什么,好吧:
我需要接收数据块以解析为字符串,其中每个字符串都以换行符结尾\n(尽管可能包含多行......)。每个“行块”都将由一些(与此问题无关的)计算处理,这将产生 aString或更好的 a Future[String],因为此计算可能需要一些时间。此计算的结果字符串应在准备好时发送给用户,就像上面的随机示例一样。这应该在发送更多数据的同时发生。

我已经研究了几种试图实现它的资源,但到目前为止都没有成功。例如scalaQuery 播放迭代-> 看起来这个人正在做与我想做的事情相似的事情,但我无法将其转化为可用的示例。(并且从 play2.0 到 play2.2 API 的差异无济于事......)

所以,总结一下:这是正确的方法(考虑到我不想使用WebSockets)?如果是这样,我该如何实施BodyParser

编辑:

我刚刚偶然发现有关此问题的播放文档的注释,说:

注意:也可以通过使用由BodyParser接收输入数据块的自定义处理的无限 HTTP 请求以另一种方式实现相同类型的实时通信,但这要复杂得多。

所以,我不会放弃,现在我确定这是可以实现的。

4

2 回答 2

0

您想要做的事情在 Play 中不太可能。

问题是 Play 在完全收到请求之前无法开始发送响应。因此,您可以像以前一样接收完整的请求,然后发送响应,也可以在收到请求处理请求(在自定义中BodyParser),但在收到请求之前您仍然无法回复完整的请求(这是文档中的注释所暗示的 - 尽管您可以在不同的连接中发送响应)。

要了解原因,请注意 anAction本质上是 a (RequestHeader) => Iteratee[Array[Byte], SimpleResult]。在任何时候,anIteratee都处于三种状态之一 - DoneContError。它只有在状态时才能接受更多的数据Cont,但它只能在Done状态时返回一个值。由于该返回值是 a SimpleResult(即我们的响应),这意味着从接收数据到发送数据之间存在硬性中断。

根据this answer,HTTP标准确实允许在请求完成之前做出响应,但是大多数浏览器不遵守规范,并且无论如何Play都不支持它,如上所述。

在 Play 中实现全双工通信的最简单方法是使用 WebSockets,但我们已经排除了这种可能性。如果服务器资源使用是更改的主要原因,您可以尝试使用 解析数据play.api.mvc.BodyParsers.parse.temporaryFile,这会将数据保存到临时文件,或者play.api.mvc.BodyParsers.parse.rawBuffer如果请求太大,则会溢出到临时文件。

否则,我看不到使用 Play 执行此操作的合理方法,因此您可能需要考虑使用另一个 Web 服务器。

于 2014-01-31T14:55:36.713 回答
0

“在单个 HTTP 连接上同时输入和输出数据”

我还没有读完你的所有问题,也没有读完代码,但你要求做的事情在 HTTP 中不可用。这与 Play 无关。

当您发出网络请求时,您打开一个到网络服务器的套接字并发送“GET /file.html HTTP/1.1\n[可选标头]\n[更多标头]\n\n”

在完成请求后(并且仅在之后)您会收到响应(可选地包括请求正文作为请求的一部分)。当且仅当请求响应完成时,在 HTTP 1.1(但不是 1.0)中,您可以在同一个套接字上发出新请求(在 http 1.0 中,您打开一个新套接字)。

响应“挂起”是可能的……这就是网络聊天的工作方式。服务器只是坐在那里,挂在打开的套接字上,在有人向您发送消息之前不发送响应。当/如果您收到聊天消息时,与 Web 服务器的持久连接最终会提供响应。

同样,请求也可以“挂起”。您可以开始将您的请求数据发送到服务器,稍等片刻,然后当您收到额外的用户输入时完成请求。与在每个用户输入上不断创建新的 http 请求相比,这种机制提供了更好的性能。服务器可以将此数据流解释为不同输入的流,即使这不一定是 HTTP 规范的最初意图。

HTTP 不支持接收部分请求,然后发送部分响应,然后接收更多请求的机制。它只是不在规范中。一旦您开始接收响应,向服务器发送附加信息的唯一方法是使用另一个 HTTP 请求。您可以使用已经并行打开的一个,或者您可以打开一个新的,或者您可以完成第一个请求/响应并在同一个套接字上发出额外的请求(在 1.1 中)。

如果您必须在单个套接字连接上使用异步 io,您可能需要考虑使用 HTTP 以外的其他协议。

于 2014-01-31T16:26:12.273 回答