14

我正在使用 akka 流,并且我有一部分图表需要有条件地跳过,因为流无法处理某些值。具体来说,我有一个接受字符串并发出 http 请求的流程,但是当字符串为空时服务器无法处理这种情况。但我只需要返回一个空字符串。有没有办法做到这一点而不必通过http请求知道它会失败?我基本上有这个:

val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val flow = source.via(httpRequest).via(httpResponse)

我唯一能想到的就是在我的 httpResponse 流中捕获 400 错误并返回一个默认值。但是我希望能够避免为我知道会事先失败的请求而访问服务器的开销。

4

2 回答 2

16

你可以使用flatMapConcat

(警告:从未编译,但你会明白它的要点)

val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val makeHttpCall: Flow[HttpRequest, HttpResponse, _]
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse
val emptyStringSource = Source.single("")
val cleanerSource = source.flatMapConcat({
  case "" => emptyStringSource
  case other => Source.single(other) via someHttpTransformation
})
于 2015-11-20T12:03:23.757 回答
14

Viktor Klang 的解决方案简洁而优雅。我只是想演示一个使用 Graphs 的替代方案。

您可以将字符串源拆分为两个流,并为有效字符串过滤一个流,为无效字符串过滤另一个流。然后合并结果(“跨流”)。

根据文档

val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] =>
  import FlowGraph.Implicits._

  val source = Source(List("1", "2", "", "3", "4"))
  val sink : Sink[String,_] = ???

  val bcast = builder.add(Broadcast[String](2))
  val merge = builder.add(Merge[String](2))

  val validReq =   Flow[String].filter(_.size > 0)
  val invalidReq = Flow[String].filter(_.size == 0)

  val httpRequest: Flow[String, HttpRequest, _] = ???
  val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ???
  val httpResponse: Flow[HttpResponse, String, _] = ???
  val someHttpTransformation = httpRequest via makeHttpCall via httpResponse

  source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink
            bcast ~>      invalidReq                    ~> merge
  ClosedShape
})

注意:此解决方案会拆分流,因此接收器可能会以与基于输入的预期不同的顺序处理字符串值结果。

于 2015-11-20T13:07:55.973 回答