给定xs: Seq[Source[A, Mat]]
一个将单个物化器组合成一个单一物化器的功能,是否可以合并xs
成一个物化成一个聚合的聚合源Mat
?
考虑这个实际的例子:
拥有N
由N
Source
s 类型的Kafka 主题Source[A, Consumer.Control]
,我想将它们组合(合并)成相同的Source[A, Consumer.Control]
,以便结果Consumer.Control
处理所有原始控件。
对于 2 个来源,这是微不足道的。有这个:
class ConsumerControlBoth(c1: Consumer.Control,
c2: Consumer.Control)
(implicit ec: ExecutionContext) extends Consumer.Control {
override def stop(): Future[Done] = {
(c1.stop().zip(c2.stop())).map(_ => Done)
}
override def shutdown(): Future[Done] = {
(c1.shutdown().zip(c2.shutdown())).map(_ => Done)
}
override def isShutdown: Future[Done] = {
(c1.isShutdown.zip(c2.isShutdown)).map(_ => Done)
}
}
我可以做这个:
Source.combineMat(s0, s1)(Merge(_))(new ConsumerControlBoth(_, _))
很想去
sources.foldLeft(sources.head) { case (acc, src) =>
Source.combineMat(acc, src)(Merge(_))(new ConsumerControlBoth(_, _))
}
但是,我担心,由于每个combineMat
都试图均匀地分配从其 2 个输入中获取的元素,这可能导致最终分布不均匀:从最后一个来源获取元素将有概率1/2
,从倒数第二个获取-1/4
等等。
另一方面,有 vararg 方法用于组合源而不考虑具体化的值,例如Source.combine
,它的类型为Source[A, NotUsed]
。我一直无法弄清楚如何调整它以使用组合物化价值。
我在这个假设上是对的,还是从这个意义上说结果是一致的?在一般情况下如何正确执行此操作?
升级版。我刚刚想出了这个(只是一个 POC,没有健全性检查等):
def merge[A, M](xs: List[Source[A, M]])
(implicit
mat: Materializer,
M: Monoid[M]): Source[A, M] = {
require(xs.lengthCompare(2) >= 0, "works for sources > 2")
val seed: (M, List[Source[A, NotUsed]]) = (M.empty, List.empty[Source[A, NotUsed]])
val (mat, sourcesRev) = xs.foldLeft(seed) { case ((ma, sa), s) =>
val (mMat, mSrc) = s.preMaterialize()
(M.combine(ma, mMat), mSrc :: sa)
}
val sources: List[Source[A, NotUsed]] = sourcesRev.reverse
Source
.combine(sources(0), sources(1), sources.drop(2): _*)(Merge(_))
.mapMaterializedValue(_ => mat)
}
看起来没有上面提到的缺点,但我不确定我是否喜欢它。任何意见?