一个简单的合并组合(如下)有时会在 staartup 打印一条调试消息,说它由于零需求而丢弃消息。我希望合并阶段能够提供无限的需求,所以上述情况绝不应该是这样。我错过了什么?
val sourceRef = Source.actorRef[KeyedHighFreqEvent](0, OverflowStrategy.fail)
.conflateWithSeed(...into hash map...)
.throttle(8, per = 1.second, maxBurst=24, ThrottleMode.shaping)
.mapConcat(...back to individual KeyedHighFreqEvent...)
.groupedWithin(1024, 1.millisecond)
.to(Sink.actorRef(networkPublisher, Nil))
.run()
system.eventStream.subscribe(sourceRef, classOf[KeyedHighFreqEvent])