I have two Input Stream. I would like to merge two stream element based on same ID. Here is the code details
implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()
case class Foo(id: Int, value: String)
case class Bar(id: Int, value: String)
case class MergeResult(id: Int, fooValue: String, barValue: String)
val sourceOne = Source(List.fill(100)(Foo(Random.nextInt(100), value = "foo")))
val sourceTwo = Source(List.fill(100)(Bar(Random.nextInt(100), value = "bar")))
What I would like to get the result is MergeResult
, which is based on the same id in Foo
and Bar
.
Also, for some Foo
and Bar
which has mismatched id, I would like to keep in the memory, I wonder if there is a clean way to do it because it is stateful.
More importantly, the source elements are in order. If there are ID duplicates found, the strategy should be first matched first served. That means if Foo(1, "foo-1"), Foo(1, "foo-2")
and Bar(1, "Bar-1")
, the match should be MergeResult(1, "foo-1", "Bar-1")
.
I am looking at some solutions from akka stream at the moment. If there are some good solution like Spark, Flink and so on, that would be helpful as well.
Thanks in advance.