1

我正在使用带有简单反应流的 Akka-Stream 1.0:

  • 发布者发送 N 条消息
  • 订阅者消费了 N 条消息

 override val requestStrategy = new MaxInFlightRequestStrategy(max = 20) {
    override def inFlightInternally: Int = messageBacklog.size

发布者将通过发送OnComplete消息在 N 条消息(动态)后关闭流。

订阅者收到消息并立即进入canceled状态。问题是,订阅者需要一些时间来处理每条消息,这意味着我通常会积压一些消息 - 随着订阅者的获取,这些消息无法再处理canceled- 恕我直言ActorSubscriber.scala:195

处理消息意味着我的订阅者会将工作卸载给其他人(通过 Spray 发送内容ChunkedMessage)并在消息完成后立即收到确认消息。由于 Actor 被取消,ack 消息永远不会被处理,并且 backlog 会被处理。

有什么建议让我完成积压工作?我可以“发明”我自己的“完成标记”,但这对我来说听起来很奇怪。显然,我的代码MaxInFlightRequestStrategy最多可以使用 1 - 因为那里的需求总是只有 1 - 这意味着我从来没有积压的消息。

4

1 回答 1

0

经过长时间的调试和尝试,我想我明白发生了什么/正在发生什么 - 希望它可以节省其他人的时间:

我认为我在如何实现反应式订阅者的概念上存在误解而失败:我在内部ActorSubscriber假脱机消息并在正确的时间通过将这些假脱机消息释放回业务逻辑self ! SpooledMessage- 这导致订阅者的计算变得疯狂:每个假脱机消息都被计为“已接收”的两倍,导致内部要求来自上游的更多消息。

通过处理演员本身内的假脱机消息解决了这个问题 - 让我也可以OnComplete正确使用:一旦收到此消息,订阅者不会收到任何新消息,但我自己处理内部队列(不使用self ! ...) 从而完成整个流处理。

于 2015-08-31T02:47:42.570 回答