1

一个简单的合并组合(如下)有时会在 staartup 打印一条调试消息,说它由于零需求而丢弃消息。我希望合并阶段能够提供无限的需求,所以上述情况绝不应该是这样。我错过了什么?

val sourceRef = Source.actorRef[KeyedHighFreqEvent](0, OverflowStrategy.fail)
.conflateWithSeed(...into hash map...)
.throttle(8, per = 1.second, maxBurst=24, ThrottleMode.shaping)
.mapConcat(...back to individual KeyedHighFreqEvent...)
.groupedWithin(1024, 1.millisecond)
.to(Sink.actorRef(networkPublisher, Nil))
.run()

system.eventStream.subscribe(sourceRef, classOf[KeyedHighFreqEvent])
4

1 回答 1

1

的文档Source.actorRef对此非常清楚:

可以使用bufferSize0 禁用缓冲区,然后如果下游没有需求,则丢弃接收到的消息。什么时候bufferSize为 0overflowStrategy无关紧要。在这个 Source 之后添加了一个异步边界;因此,假设下游总是会产生需求是不安全的。

问题是源和合并阶段之间的异步边界。合并阶段确实提供了无限的需求,但异步边界类型使其传播到源的速度很慢。

您可以在源中使用缓冲区(增加 bufferSize),也可以使用其他源,例如,Source.queue如果合适,因为它不会引入异步边界

于 2019-04-03T07:15:41.767 回答