Viktor Klang 的解决方案简洁而优雅。我只是想演示一个使用 Graphs 的替代方案。
您可以将字符串源拆分为两个流,并为有效字符串过滤一个流,为无效字符串过滤另一个流。然后合并结果(“跨流”)。
根据文档:
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val source = Source(List("1", "2", "", "3", "4"))
val sink : Sink[String,_] = ???
val bcast = builder.add(Broadcast[String](2))
val merge = builder.add(Merge[String](2))
val validReq = Flow[String].filter(_.size > 0)
val invalidReq = Flow[String].filter(_.size == 0)
val httpRequest: Flow[String, HttpRequest, _] = ???
val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ???
val httpResponse: Flow[HttpResponse, String, _] = ???
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse
source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink
bcast ~> invalidReq ~> merge
ClosedShape
})
注意:此解决方案会拆分流,因此接收器可能会以与基于输入的预期不同的顺序处理字符串值结果。