2

我们正在使用具有以下配置的消费者 kafka 客户端 0.10.2.0:

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

如您所见,我们正在使用自动提交。我们使用的消费者 API 版本有一个专门的线程来执行自动提交。所以每一秒我们都有一个自动提交,这意味着我们每一秒都有一个心跳。

我们的应用程序处理时间实际上可能需要(有时)超过 40 秒(请求超时间隔)

我想问的是:

1 - 如果处理时间需要,例如,一分钟。尽管每秒都有自动提交heartbean,但是否会重新平衡?

2 - 更奇怪的是,在长时间执行的情况下,我们似乎不止一次收到相同的消息。正常吗?如果消费者已经提交了偏移量,为什么重新平衡会再次使用相同的偏移量?

谢谢, 奥雷尔

4

3 回答 3

0

只是为了澄清一下,在每次轮询中都会调用 AutoCommit 检查,它会检查经过的时间是否大于配置的时间,如果是,那么只有它才会提交

例如。如果提交间隔是 5 秒并且轮询在 7 秒内发生,在这种情况下,提交将在 7 秒后发生

对于您的问题

  1. 自动提交不计入心跳,如果处理时间长,那么显然不会发生提交,并会导致会话超时,进而触发重新平衡

  2. 除非您正在寻找/将偏移量重置为先前提交的偏移量或发生消费者重新平衡,否则这不应该发生

于 2018-02-12T05:02:45.830 回答
0

从 Kafka v0.10.1.0 开始,你不需要手动触发自动提交来做心跳。Kafka 消费者本身在后台为心跳机制启动一个新线程。要了解更多信息,请阅读KIP-62

在您的情况下,您可以设置max.poll.interval.ms处理器处理max.poll.record记录所花费的最长时间。

于 2018-02-12T12:51:41.747 回答
0

您可以使用KafkaConsumer.pause()/KafkaConsumer.resume()在长时间处理暂停期间防止消费者重新平衡。Java 文档。看看这个问题。

Re.2。您确定提交了这些偏移量吗?

于 2018-02-12T13:31:32.020 回答