我正在使用带有简单反应流的 Akka-Stream 1.0:
- 发布者发送 N 条消息
- 订阅者消费了 N 条消息
和
override val requestStrategy = new MaxInFlightRequestStrategy(max = 20) {
override def inFlightInternally: Int = messageBacklog.size
发布者将通过发送OnComplete
消息在 N 条消息(动态)后关闭流。
订阅者收到消息并立即进入canceled
状态。问题是,订阅者需要一些时间来处理每条消息,这意味着我通常会积压一些消息 - 随着订阅者的获取,这些消息无法再处理canceled
- 恕我直言ActorSubscriber.scala:195
处理消息意味着我的订阅者会将工作卸载给其他人(通过 Spray 发送内容ChunkedMessage
)并在消息完成后立即收到确认消息。由于 Actor 被取消,ack 消息永远不会被处理,并且 backlog 会被处理。
有什么建议让我完成积压工作?我可以“发明”我自己的“完成标记”,但这对我来说听起来很奇怪。显然,我的代码MaxInFlightRequestStrategy
最多可以使用 1 - 因为那里的需求总是只有 1 - 这意味着我从来没有积压的消息。