1

我只是按照 akka 流 ActorPublisher 示例进行操作,有时我收到了以下消息:

java.lang.IllegalStateException:当流没有请求元素时不允许 onNext,totalDemand 为 0

查看文档,他们解释说:

通过调用 onNext 将元素发送到流。您可以发送流订阅者请求的尽可能多的元素。这个金额可以通过totalDemand查询。只有在isActive且totalDemand>0时才允许使用onNext,否则onNext会抛出IllegalStateException。

当流订阅者请求更多元素时,ActorPublisherMessage.Request 消息将传递给该参与者,您可以对该事件采取行动。totalDemand 会自动更新。

如何防止 totalDemand 为零?当我收到此错误时,我丢失了我试图发送的消息。

这是我一直在关注的示例:

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

这是我的课堂测试

object Test extends App {

  implicit val actorSystem = ActorSystem("ReactiveKafka")
  implicit val materializer = ActorFlowMaterializer()

  val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
  val publisher = kafka.consume("test", "groupName", new StringDecoder())

  val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor")

  Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props))

}

好吧,我收到了来自 kafka 的消息,并且我正在传递给 WorkerActor,但是当向 Kafka 发送大约 10 条消息/秒时,其中一些消息由于这个错误而丢失了。

更新

我遇到了这里描述的错误(使用相同的库):

https://github.com/softwaremill/reactive-kafka/issues/11

我使用缓冲区解决了我的问题,但看起来这个 PR 可以解决问题。

https://github.com/softwaremill/reactive-kafka/pull/13

4

1 回答 1

2

如果下游接收器没有任何需求,那么您唯一的选择是

  1. 告诉数据源馈送Worker没有需求,以便源可以停止生成消息,直到出现更多需求(反应式解决方案)。
  2. 缓冲消息,直到您从接收器获得一些需求,这可能会填满您的缓冲区并且无论如何您都会丢弃消息。
  3. 当需求为 0 时删除消息(这似乎是您当前的实现)。

但是“背压”的全部意义在于防止在没有需求时调用onNext。

要实现上面的缓冲选项,您可以在 Actor 内部或外部进行缓冲:

  • 内部缓冲区:查看文档中的“ActorPublisher”示例,了解在为 ActorPublisher 提供数据的 Actor 中进行缓冲的示例。
  • 外部缓冲区:使用缓冲实现器或Flow.buffer在您的流中使用外部缓冲区。
于 2015-06-18T16:25:09.893 回答