0

I have two Input Stream. I would like to merge two stream element based on same ID. Here is the code details

  implicit val system = ActorSystem("sourceDemo")
  implicit val materializer = ActorMaterializer()

  case class Foo(id: Int, value: String)
  case class Bar(id: Int, value: String)
  case class MergeResult(id: Int, fooValue: String, barValue: String)

  val sourceOne = Source(List.fill(100)(Foo(Random.nextInt(100), value = "foo")))
  val sourceTwo = Source(List.fill(100)(Bar(Random.nextInt(100), value = "bar")))

What I would like to get the result is MergeResult, which is based on the same id in Foo and Bar.

Also, for some Foo and Bar which has mismatched id, I would like to keep in the memory, I wonder if there is a clean way to do it because it is stateful.

More importantly, the source elements are in order. If there are ID duplicates found, the strategy should be first matched first served. That means if Foo(1, "foo-1"), Foo(1, "foo-2") and Bar(1, "Bar-1"), the match should be MergeResult(1, "foo-1", "Bar-1") .

I am looking at some solutions from akka stream at the moment. If there are some good solution like Spark, Flink and so on, that would be helpful as well.

Thanks in advance.

4

1 回答 1

3

您正在准确地描述连接操作。

Akka 流不支持连接操作。您可能会找到一种方法来做到这一点,使用每个流上的窗口和一些演员/状态转换来在它们之间进行查找,但上次我搜索这个时我什么也没找到(不久前),所以你可能处于未知领域.

您只会在更重量级的框架上找到流连接:Flink、Spark Streaming、Kafka 流。原因是 join 从根本上说是一个流与另一个流的查找,这意味着它需要比 Akka 流的设计者想要处理的更复杂的东西(状态管理)。

于 2018-01-06T22:41:22.057 回答