4

我在akka流中实现了一个自定义组件,它将元素作为输入,根据一个键对它们进行分组和合并,然后通过十几个出口之一将它们发送出去。您可以将此组件视为一种 GroupBy 组件,它不会将流划分为子流,而是实际流。除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生了一些缓冲,因此 1 个元素输入并不一定意味着 1 个元素通过出口输出。

下面是所述组件的简化实现。

class CustomGroupBy[A,B](k: Int, f: A => Int) extends GraphStage[FlowShape[B, B]] {

  val in = Inlet[A]("CustomGroupBy.in")
  val outs = (0 until k).map(i => Outlet[B](s"CustomGroupBy.$i.out"))

  override val shape = new AmorphousShape(scala.collection.immutable.Seq(in), outs)

  /* ... */
}

我现在将该组件的每个出口连接到不同的 Sink 并结合所有这些 sink 的物化值。

我已经用图形 DSL 尝试了一些东西,但还没有完全成功。有人愿意为我提供一个片段来做到这一点或指出我正确的方向吗?

提前致谢!

4

2 回答 2

4

您很可能需要内置广播舞台。示例用法可以在这里找到:

val bcast = builder.add(Broadcast[Int](2))

in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
            bcast ~> f4 ~> merge
于 2017-03-30T13:35:47.827 回答
1

你可能想要akka.stream.scaladsl.Partition[T](outputPorts: Int, partitioner: T ⇒ Int) 舞台

编辑:

要连接所有端口并保留物化值,您必须将阶段作为参数提供给GraphDSL.create方法。

这允许您为物化值定义组合器,并将阶段添加到您的GraphDSLBuilder, 作为最后一个参数的参数。请注意,此重载create方法不采用可变参数参数,因此可能无法以这种方式处理 14 个不同的阶段。

假设您的阶段有一些名称,以下是我将如何实现它,在 3 个输出的情况下:

val runnable = RunnableGraph.fromGraph(
  GraphDSL.create(
    source, customGroupBy, sink1, sink2, sink3)(combiner) {  //the combiner is the function to combine the materialized values
      implicit b => //this is the builder, needed as implicit to make the connections 
      (src, cgb, s1, s2, s3) => //here are the stages added to the builder
      import GraphDSL.Implicits._

      src.out ~> cgb.in
      List(s1, s2, s3).map(_.in).zip(cgb.outlets).foreach{
        case (in, out) => in ~> out
      }

      ClosedShape
    }
  )
)

请记住,如果您不需要阶段的物化值之一,您可以通过执行将其添加到 DSL 中val cgb = b.add(customGroupBy)

于 2017-03-30T14:51:00.693 回答