3

我有一个流

case class Msg(keys: Seq[Char], value: String)

现在我想过滤一个键子集,例如 val filterKeys = Set[Char]('k','f','c')然后Filter(k.exists(filterKeys.contains))) 拆分这些键,以便某些键由不同的流处理,然后在最后合并在一起;

                                 /-key=k-> f1 --\
Source[Msg] ~> Filter ~> router |--key=f-> f2 ----> Merge --> f4
                                 \-key=c-> f3 --/

我该怎么做呢?

FlexiRoute以旧的方式似乎是一个不错的方法,但在新的 API 中,我猜我想GraphStage从 DSL 进行自定义或创建我自己的图表,因为我认为无法通过内置阶段做到这一点。 .?

4

1 回答 1

4

小键组解决方案

如果您的密钥集很小且不可变,那么广播和过滤器的组合可能是最容易理解的实现。您首先需要定义您描述的过滤器:

def goodKeys(keySet : Set[Char]) = Flow[Msg] filter (_.keys exists keySet.contains)

然后可以按照文档中的说明向广播公司提供数据。所有Msg具有良好键的值都将被广播到三个过滤器中的每一个,并且每个过滤器将只允许一个特定的键:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._

  val source : Source[Msg] = ???

  val goodKeyFilter = goodKeys(Set('k','f','c'))

  val bcast = builder.add(BroadCast[Msg](3))
  val merge = builder.add(Merge[Msg](3))

  val kKey = goodKeys(Set('k'))
  val fKey = goodKeys(Set('f'))
  val cKey = goodKeys(Set('c'))

  //as described in the question
  val f1 : Flow[Msg, Msg, _] = ???
  val f2 : Flow[Msg, Msg, _] = ???
  val f3 : Flow[Msg, Msg, _] = ???

  val f4 : Sink[Msg,_] = ???

  source ~> goodKeyFilter ~> bcast ~> kKey ~> f1 ~> merge ~> f4
                             bcast ~> fKey ~> f2 ~> merge
                             bcast ~> cKey ~> f3 ~> merge

大密钥集解决方案

如果您的键集很大,那么 groupBy 会更好。假设您有一个Map功能键:

//e.g. 'k' -> f1
val keyFuncs : Map[Set[Char], (Msg) => Msg]

此映射可与 groupBy 函数一起使用:

source
  .via(goodKeys(Set('k','f','c'))
  .groupBy(keyFuncs.size, _.keys)
  .map(keyFuncs(_.keys)) //apply one of f1,f2,f3 to the Msg
  .mergeSubstreams
于 2016-10-06T12:34:28.660 回答