13

我已经阅读了其他线程,并且通过使用新的组 ID 解决了这个问题,但是我想了解可能导致这种情况的原因。

我有一个包含 16 个分区的主题,我设置了 session.timeout.ms=30000 和 max.poll.interval.ms=30000000。

我运行我的程序,然后按 ctrl+c,所以它没有正确关闭。在我猜测 16 次之后,我陷入了这个重新加入的问题。session.timeout.ms 是心跳超时,所以 30 秒后它应该踢我的消费者,我的分区应该“释放”对吗?还是只听我的 max.poll.interval.ms?

编辑:我仍然间歇性地收到这个错误,当它发生时我必须重新启动我的所有消费者。即使我的消费者运行良好然后他们都开始陷入重新加入(没有添加/删除消费者),也会发生这种情况。这是我在新消费者卡在该状态后尝试连接它时的错误日志:

https://pastebin.com/AXJeSHkp

2017-06-29 17:28:16,215 DEBUG [AbstractCoordinator] - [scheduler-1] - Sending JoinGroup ((type: JoinGroupRequest, groupId=ingestion-matching-kafka-consumer-group-dev1, sessionTimeout=30000, rebalanceTimeout=43200000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@b45e5583)) to coordinator kafka04-prod01.messagehub.services.us-south.bluemix.net:9093 (id: 2147483644 rack: null)

2017-06-29 17:37:21,261 DEBUG [NetworkClient] - [scheduler-1] - Node 2147483644 disconnected.
2017-06-29 17:37:21,263 DEBUG [ConsumerNetworkClient] - [scheduler-1] - Cancelled JOIN_GROUP request {api_key=11,api_version=1,correlation_id=19,client_id=ingestion-matching-kafka-consumer-dev1} with correlation id 19 due to node 2147483644 being disconnected

这些是我认为相关的第一条和最后一条消息。以下是我设置的相关超时:

session.timeout.ms=30000
max.poll.interval.ms=43200000    
request.timeout.ms=43205000 # the docs said to keep this higher than max.poll.interval.ms
enable.auto.commit=false

我也应该设置 heartbeat.interval.ms 吗?这是消费者在某个后台线程中自动将心跳发送到代理的间隔(我已经阅读了文档,但由于某种原因我无法完全理解它)?

4

2 回答 2

9

我知道这是一个很老的问题,但我有类似的问题,最后我明白了这种情况的原因并想分享。

当重新平衡开始时,Kafka 等待组中的所有消费者 poll() 并发送 joinGroup 请求。重新平衡超时等于 max.poll.interval.ms。因此,Kafka 一直等到重新平衡超时或每个消费者的流程结束。

在您的情况下,您将 max.poll.interval.ms 设置为 12 小时。唯一合理的理由就是你必须有一个漫长的过程。因此,当重新平衡开始时,Kafka 将等待您的流程完成或 12 小时过去。这就是您的消费者似乎陷入困境的原因。

于 2019-03-22T20:53:45.740 回答
7

如果您的客户端没有正确断开连接(崩溃或 SIGINT),服务器将需要 session.timeout.ms(在您的情况下为 30 秒)将其从组中踢出。在这段时间内,服务器仍然会认为消费者是组的一部分,所以它不会做任何重新分配。一旦这个延迟结束,分配的分区将被重新分配给其他消费者(如果有的话)。

如果您使用新的组 ID,这当然不会发生。虽然每次开发时都想使用一个新组(因为您不必等待),但您会丢失前一组提交的任何偏移量,这可能不代表您的应用程序在生产中运行时的状态。

关于 max.poll.interval.ms,它是消费者逻辑中两次调用 poll() 之间允许的最大延迟。我认为此设置与此问题无关。

于 2017-06-15T10:08:56.123 回答