6

我有一个这样的流和两个接收器,但一次只使用一个:

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink1)

或者

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink2)

我们使用哪个接收器是可配置的,但是如果我同时使用两个接收器怎么办。我怎样才能做到这一点?

我考虑过 Sink.combine,但它还需要一个合并策略,我不想以任何方式组合这些接收器的结果。我并不真正关心它们,所以我只想通过 HTTP 将相同的数据发送到某个端点,同时将它们发送到数据库。Sink combine 与广播非常相似,但从头开始实现广播会降低代码的可读性,现在我只有简单的源、流和接收器,没有低级图形阶段。

你知道如何做到这一点的正确方法(有背压和其他我只使用一个水槽的东西)吗?

4

2 回答 2

12

您可以使用alsoTo(参见API 文档):

Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
于 2017-12-19T22:24:03.470 回答
4

以最简单的形式使用广播GraphDSL不应降低可读性——事实上,人们甚至可能会争辩说,这些~>子句以某种方式有助于可视化流结构:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._
  val bcast = builder.add(Broadcast[Int](2))

  Source.fromElements(1, 2, 3) ~> flow ~> bcast.in
  bcast.out(0) ~> sink1
  bcast.out(1) ~> sink2

  ClosedShape
})
graph.run()
于 2017-12-20T04:34:05.647 回答