10

对于 Kafka Streams,如果我们使用较低级别的处理器 API,我们可以控制是否提交。因此,如果我们的代码中出现问题,并且我们不想提交此消息。在这种情况下,Kafka 将多次重新传递此消息,直到问题得到解决。

但是在使用其更高级别的流 DSL API 时如何控制是否提交消息呢?

资源:

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

4

1 回答 1

17

你的说法并不完全正确。您不能“控制是否提交”——至少不能直接控制(无论是在处理器 API 中还是在 DSL 中)。您只能用于ProcessorContext#commit()请求额外的提交。因此,在调用#commit()Streams 后会尝试尽快提交,但这不是立即提交。此外,即使您从不调用 Streams 也会自动提交#commit()。您可以通过 Streams 配置控制 Streams 提交间隔commit.interval.m(参见http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application

如果出现“问题”,这取决于您遇到的问题类型如何应对它:

  • 如果您检测到无法恢复的问题,则只能抛出异常并“停止世界”(参见下文)。
  • 如果您有一个可恢复的错误,您需要在您自己的代码中“循环”(例如,在问题解决之内Processor#process()KeyValueMapper#apply()直到问题解决并且您可以成功处理当前消息(注意,您可能会遇到超时,即异常,使用这种策略——参见消费者配置heartbeat.interval.ms和 0.10.1 session.timeout.ms [KIP-62]
  • 另一种方法是将现在无法处理的记录放入一个StateStore并稍后处理它们。但是,很难做到正确,并且还破坏了一些 Streams 假设(例如,处理顺序)。不建议使用,如果使用,你必须非常小心的含义

如果有一个未捕获的异常StreamThread将会死掉并且不会发生提交(您可以注册一个异常处理程序以获得有关此的通知:http: //docs.confluent.io/current/streams/developer-guide.html#using-kafka- stream-within-your-application-code . 如果你StreamThread死了,你需要创建一个新的实例KafkaStreams来重启你的应用程序。

在成功处理消息之前,您不得从用户代码返回,因为如果您返回,Streams 会假定消息已成功处理(因此可能会提交相应的偏移量)。关于要点 (3),将记录放入特殊的 StateStore 以供以后处理被视为“成功”处理的记录。

于 2017-02-05T19:30:10.190 回答