2

下面的代码确实流回客户端,我收集的是比使用 Java 的 IO 流更惯用的方式。但是,它有一个问题:流完成后连接保持打开状态。

def getImage() = Action { request =>
  val imageUrl = "http://hereandthere.com/someimageurl.png"
  Ok.stream({ content: Iteratee[Array[Byte], Unit] => 
    WS.url(imageUrl).withHeaders("Accept"->"image/png").get { response => content }
    return
  }).withHeaders("Content-Type"->"image/png")
}

这用于将大型(>1 mb)文件从内部 API 流式传输到请求者。

问题是,为什么它保持连接打开?它对上游服务器有什么期望吗?我使用 curl 测试了上游服务器,并且连接确实关闭了 - 它只是在通过此代理时没有关闭。

4

3 回答 3

4

流未完成的原因是没有将 EOF 发送到从 WS.get() 调用返回的迭代对象。如果没有这个明确的 EOF,连接将保持打开状态——因为它处于分块模式,并且可能是一个长时间运行的类似彗星的连接。

这是固定代码:

Ok.stream({ content: Iteratee[Array[Byte], Unit] => 
  WS.url(imageUrl)
    .withHeaders("Accept"->"image/png")
    .get { response => content }
    .onRedeem { ii =>
       ii.feed(Input.EOF)
    }
}).withHeaders("Content-Type"->"image/png")
于 2012-10-14T16:45:48.757 回答
1

播放 2.2.x 更新:

def proxy = Action.async {
  val url = "http://localhost:9000"

  def enumerator(chunks: Iteratee[Array[Byte], Unit] => _) = {
    new Enumerator[Array[Byte]] {
      def apply[C](i: Iteratee[Array[Byte], C]): Future[Iteratee[Array[Byte], C]] = {
        val doneIteratee = Promise[Iteratee[Array[Byte], C]]()
        chunks(i.map {
          done =>
            doneIteratee.success(Done[Array[Byte], C](done)).asInstanceOf[Unit]
        })
        doneIteratee.future
      }
    }
  }

  val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]()
  val resultPromise = Promise[SimpleResult]()

  WS.url(url).get {
    responseHeaders =>

      resultPromise.success(new Status(responseHeaders.status).chunked(
        enumerator({
          content: Iteratee[Array[Byte], Unit] => iterateePromise.success(content)
        }
        )).withHeaders(
        "Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
        "Connection" -> "Close"))

      Iteratee.flatten(iterateePromise.future)
  }.onComplete {
    case Success(ii) => ii.feed(Input.EOF)
    case Failure(t) => throw t
  }

  resultPromise.future
}

如果有人有更好的解决方案,我很感兴趣!

于 2013-11-28T20:25:01.730 回答
1

这是play 2.1.0的修改版本。请参阅https://groups.google.com/forum/#!msg/play-framework/HwoRR-nipCc/gUKs9NexCx4J

感谢 Anatoly G 的分享。

def proxy = Action {

   val url = "..."

   Async {
     val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]
     val resultPromise = Promise[ChunkedResult[Array[Byte]]]

     WS.url(url).get { responseHeaders =>
       resultPromise.success {
         new Status(responseHeaders.status).stream({ content: Iteratee[Array[Byte], Unit] =>
           iterateePromise.success(content)
         }).withHeaders(
           "Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
           "Connection" -> "Close")
       }
       Iteratee.flatten(iterateePromise.future)
     }.onComplete {
       case Success(ii) => ii.feed(Input.EOF)
       case Failure(t) => resultPromise.failure(t)
     }

     resultPromise.future
   }

}
于 2013-10-01T15:02:58.827 回答