6

我正在使用kafka connector confluent 3.0.1版本。我创建了一个名为new-group的新组,上面大约有​​20个主题。这些主题大部分都很忙。但是很遗憾,当我启动连接器框架时,系统无法停止重新平衡,所有主题的重新平衡大约需要 2 分钟。我不知道原因。一些错误信息是:

[2017-01-03 21:43:57,718] ERROR Commit of WorkerSinkTask{id=new-connector-0} offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
:

我不知道这是否与持续再平衡有关。

我知道如果 KafkaConsumer.poll() 比配置的 timeout 长,kafka 将撤销分区并因此触发重新平衡,但我很确定每次轮询不会那么长。任何人都可以给我一些线索?

4

2 回答 2

2

我认为max.poll.records可以解决这个问题。它是调整每次循环迭代必须处理的记录数。在 0.10 中有max.poll.records,它为每次调用返回的记录数设置了一个上限。

同样按照 Confluent,confluent.consumer.poll() 应该有相当长的会话超时,例如 30 到 60 秒。

您可能还想微调:

session.timeout.ms
heartbeat.interval.ms 
max.partition.fetch.bytes
于 2017-08-10T05:22:40.487 回答
0

考虑升级到 0.10.1 或更高版本,因为消费者在这些版本中得到了增强,可以更好地处理更长时间的 poll() 调用。

max.poll.interval.ms如果将结果放入 HDFS 的时间超过 5 分钟,则可以增加新参数。这将阻止您的消费者因没有取得进展而被踢出消费者组。

在 0.10.1 发行说明中它说

新的 Java Consumer 现在支持来自后台线程的心跳。有一个新的配置 max.poll.interval.ms 控制在消费者主动离开组之前轮询调用之间的最长时间(默认为 5 分钟)。配置 request.timeout.ms 的值必须始终大于 max.poll.interval.ms 因为这是消费者重新平衡时 JoinGroup 请求可以在服务器上阻塞的最长时间,所以我们更改了它的默认值到刚刚超过 5 分钟。最后将 session.timeout.ms 的默认值调整为 10 秒,将 max.poll.records 的默认值改为 500。

于 2017-08-11T07:25:50.687 回答