2

给定xs: Seq[Source[A, Mat]]一个将单个物化器组合成一个单一物化器的功能,是否可以合并xs成一个物化成一个聚合的聚合源Mat

考虑这个实际的例子:

拥有NN Sources 类型的Kafka 主题Source[A, Consumer.Control],我想将它们组合(合并)成相同的Source[A, Consumer.Control],以便结果Consumer.Control处理所有原始控件。

对于 2 个来源,这是微不足道的。有这个:

class ConsumerControlBoth(c1: Consumer.Control,
                          c2: Consumer.Control)
                         (implicit ec: ExecutionContext) extends Consumer.Control {
  override def stop(): Future[Done] = {
    (c1.stop().zip(c2.stop())).map(_ => Done)
  }

  override def shutdown(): Future[Done] = {
    (c1.shutdown().zip(c2.shutdown())).map(_ => Done)
  }

  override def isShutdown: Future[Done] = {
    (c1.isShutdown.zip(c2.isShutdown)).map(_ => Done)
  }
}

我可以做这个:

Source.combineMat(s0, s1)(Merge(_))(new ConsumerControlBoth(_, _))

很想去

sources.foldLeft(sources.head) { case (acc, src) =>
  Source.combineMat(acc, src)(Merge(_))(new ConsumerControlBoth(_, _))
}

但是,我担心,由于每个combineMat都试图均匀地分配从其 2 个输入中获取的元素,这可能导致最终分布不均​​匀:从最后一个来源获取元素将有概率1/2,从倒数第二个获取-1/4等等。

另一方面,有 vararg 方法用于组合源而不考虑具体化的值,例如Source.combine,它的类型为Source[A, NotUsed]。我一直无法弄清楚如何调整它以使用组合物化价值。

我在这个假设上是对的,还是从这个意义上说结果是一致的?在一般情况下如何正确执行此操作?

升级版。我刚刚想出了这个(只是一个 POC,没有健全性检查等):

def merge[A, M](xs: List[Source[A, M]])
                (implicit
                 mat: Materializer,
                 M: Monoid[M]): Source[A, M] = {
  require(xs.lengthCompare(2) >= 0, "works for sources > 2")

  val seed: (M, List[Source[A, NotUsed]]) = (M.empty, List.empty[Source[A, NotUsed]])
  val (mat, sourcesRev) = xs.foldLeft(seed) { case ((ma, sa), s) =>
    val (mMat, mSrc) = s.preMaterialize()
    (M.combine(ma, mMat), mSrc :: sa)
  }

  val sources: List[Source[A, NotUsed]] = sourcesRev.reverse

  Source
    .combine(sources(0), sources(1), sources.drop(2): _*)(Merge(_))
    .mapMaterializedValue(_ => mat)
}

看起来没有上面提到的缺点,但我不确定我是否喜欢它。任何意见?

4

1 回答 1

0

可以像这样组合任意数量的源(物化值的类型应该相同):

import scalaz.{Ordering => _, _}    

def mergeWithPicker[A](originSources: Seq[Source[Partition, A]])(implicit monoid: Monoid[A], ord: Ordering[Partition]): Source[Partition, A] =
    merge(originSources, picker[A])

 def mergeWithSorter[A](originSources: Seq[Source[Partition, A]])(implicit monoid: Monoid[A], ord: Ordering[Partition]): Source[Partition, A] =
    merge(originSources, sorter[A])

private def merge[A](originSources: Seq[Source[Partition, A]], f: (Source[Partition, A], Source[Partition, A]) => Source[Partition, A])(implicit monoid: Monoid[A]): Source[Partition, A] = originSources match {
    case Nil     =>
      Source.empty[Partition].mapMaterializedValue(_ => monoid.zero)

    case sources =>
      @tailrec
      def reducePairs(sources: Seq[Source[Partition, A]]): Source[Partition, A] =
        sources match {
          case Seq(s) =>
            s

          case _      =>
            reducePairs(sources.grouped(2).map {
              case Seq(a)    => a
              case Seq(a, b) => f(a, b)
            }.toSeq)
        }

      reducePairs(sources)
    }

  private def picker[A](s1: Source[Partition, A], s2: Source[Partition, A])(implicit monoid: Monoid[A], ord: Ordering[Partition]): Source[Partition, A] =
    combineSources(new PartitionPicker[Partition], s1, s2)(monoid.append(_, _))

  private def sorter[A](s1: Source[Partition, A], s2: Source[Partition, A])(implicit monoid: Monoid[A], ord: Ordering[Partition]): Source[Partition, A] =
    combineSources(new MergeSorted[Partition], s1, s2)(monoid.append(_, _))

  private def combineSources[A, MatIn0, MatIn1, Mat](combinator: GraphStage[FanInShape2[A, A, A]], s0: Source[A, MatIn0], s1: Source[A, MatIn1])(combineMat: (MatIn0, MatIn1) => Mat): Source[A, Mat] =
    Source.fromGraph(GraphDSL.create(s0, s1)(combineMat) { implicit builder => (s0, s1) =>
      val merge = builder.add(combinator)
      s0 ~> merge.in0
      s1 ~> merge.in1
      SourceShape(merge.out)
    })

稍后您可以提供隐式 Monoid 来描述如何合并物化值:

import akka.NotUsed

import scalaz.Monoid

object Monoids {

  implicit final val notUsedMonoid: Monoid[NotUsed] = new Monoid[NotUsed] {
    def zero: NotUsed = NotUsed

    def append(f1: NotUsed, f2: => NotUsed): NotUsed = f1
  }

  implicit def setMonoid[A]: Monoid[Set[A]] = new Monoid[Set[A]] {
    override def zero: Set[A] = Set.empty

    override def append(f1: Set[A], f2: => Set[A]): Set[A] = f1 ++ f2
  }

}

您可能也有兴趣观看此问题: https ://github.com/akka/akka/issues/24369

于 2018-02-24T16:30:00.917 回答