0

我正在玩 Lagom 并创建了接收 Source 作为输入并返回案例类对象的服务:

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.lightbend.lagom.scaladsl.api.{Service, ServiceCall}
import play.api.libs.json.{Format, Json}

trait TestService extends Service {
  def test(): ServiceCall[Source[String, NotUsed], ResultData]

  override final def descriptor = {
    import Service._
    named("DocsStore")
      .withCalls(
        call(test())
      )
  }
}


case class ResultData(uploadId: String, length: Long)

object ResultData {
  implicit val format: Format[ResultData] = Json.format[ResultData]
}

服务实现是:

class TestServiceImpl()(
  implicit val materializer: Materializer,
  implicit val ec: ExecutionContext
) extends TestService {
  val logger = Logger(getClass.getName)

  override def test(): ServiceCall[Source[String, NotUsed], ResultData] = ServiceCall{ source=>
    source.runForeach(s=>logger.info(s"String $s")).map(_=>ResultData("TestResult", 12))
  }
}

当我从 Play 应用程序的控制器调用此服务时:

  def test = Action.async { req=>
    testService.test().invoke(Source("A"::"B"::"C"::Nil)).map(rd=>Ok(Json.toJson(rd)))
  }

服务端的 "runForeach" 成功打印 A、B、C 但服务本身不返回任何结果值(应为 ResultData("TestResult", 12))导致 Play 应用程序抛出异常:

play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[DeserializationException: No content to map due to end-of-input
 at [Source: akka.util.ByteIterator$ByteArrayIterator$$anon$1@309c63af; line: 1, column: 0]]]
    at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:293)
    at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:220)
    at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160)
    at play.api.DefaultGlobal$.onError(GlobalSettings.scala:188)
    at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:100)
    at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:100)
    at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99)
    at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:346)
    at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:345)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

这怎么可能解决?

4

1 回答 1

0

发生这种情况是因为 Lagom 将流的完成解释为关闭连接的信号。在可以发送响应之前关闭连接。

这已在 GitHub 中作为问题提出:https ://github.com/lagom/lagom/issues/814

一种可能的解决方法是让流保持打开状态,直到收到响应,如测试流服务的文档中所示:

// Use a source that never terminates (concat Source.maybe) so we
// don't close the upstream, which would close the downstream
val input = Source("A"::"B"::"C"::Nil).concat(Source.maybe)

但是,如果使用这种策略,服务实现也需要更改,因为上面问题中的实现只在流完成时发送响应。相反,您需要在协议中设计一个显式的完成消息,该消息向服务发出信号以发送响应。

于 2017-10-27T05:26:44.660 回答