我在akka流中实现了一个自定义组件,它将元素作为输入,根据一个键对它们进行分组和合并,然后通过十几个出口之一将它们发送出去。您可以将此组件视为一种 GroupBy 组件,它不会将流划分为子流,而是实际流。除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生了一些缓冲,因此 1 个元素输入并不一定意味着 1 个元素通过出口输出。
下面是所述组件的简化实现。
class CustomGroupBy[A,B](k: Int, f: A => Int) extends GraphStage[FlowShape[B, B]] {
val in = Inlet[A]("CustomGroupBy.in")
val outs = (0 until k).map(i => Outlet[B](s"CustomGroupBy.$i.out"))
override val shape = new AmorphousShape(scala.collection.immutable.Seq(in), outs)
/* ... */
}
我现在将该组件的每个出口连接到不同的 Sink 并结合所有这些 sink 的物化值。
我已经用图形 DSL 尝试了一些东西,但还没有完全成功。有人愿意为我提供一个片段来做到这一点或指出我正确的方向吗?
提前致谢!