对于 Kafka Streams,如果我们使用较低级别的处理器 API,我们可以控制是否提交。因此,如果我们的代码中出现问题,并且我们不想提交此消息。在这种情况下,Kafka 将多次重新传递此消息,直到问题得到解决。
但是在使用其更高级别的流 DSL API 时如何控制是否提交消息呢?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
对于 Kafka Streams,如果我们使用较低级别的处理器 API,我们可以控制是否提交。因此,如果我们的代码中出现问题,并且我们不想提交此消息。在这种情况下,Kafka 将多次重新传递此消息,直到问题得到解决。
但是在使用其更高级别的流 DSL API 时如何控制是否提交消息呢?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
你的说法并不完全正确。您不能“控制是否提交”——至少不能直接控制(无论是在处理器 API 中还是在 DSL 中)。您只能用于ProcessorContext#commit()
请求额外的提交。因此,在调用#commit()
Streams 后会尝试尽快提交,但这不是立即提交。此外,即使您从不调用 Streams 也会自动提交#commit()
。您可以通过 Streams 配置控制 Streams 提交间隔commit.interval.m
(参见http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)
如果出现“问题”,这取决于您遇到的问题类型如何应对它:
Processor#process()
或KeyValueMapper#apply()
直到问题解决并且您可以成功处理当前消息(注意,您可能会遇到超时,即异常,使用这种策略——参见消费者配置heartbeat.interval.ms
和 0.10.1 session.timeout.ms
[KIP-62])StateStore
并稍后处理它们。但是,很难做到正确,并且还破坏了一些 Streams 假设(例如,处理顺序)。不建议使用,如果使用,你必须非常小心的含义如果有一个未捕获的异常,StreamThread
将会死掉并且不会发生提交(您可以注册一个异常处理程序以获得有关此的通知:http: //docs.confluent.io/current/streams/developer-guide.html#using-kafka- stream-within-your-application-code . 如果你StreamThread
死了,你需要创建一个新的实例KafkaStreams
来重启你的应用程序。
在成功处理消息之前,您不得从用户代码返回,因为如果您返回,Streams 会假定消息已成功处理(因此可能会提交相应的偏移量)。关于要点 (3),将记录放入特殊的 StateStore 以供以后处理被视为“成功”处理的记录。