4

enable.auto.commit当我的消费者成功处理了一条记录时,我想告诉 Kafka,所以我通过设置为 false 来关闭自动提交。我在偏移量 0 和 1 处订阅了关于我订阅的主题的两条消息,并创建了一个使用者,以便每次调用poll最多返回一条记录(通过设置max.poll.records为 1)。

我现在打电话consumer.poll(5000)并收到第一条消息,但我不承认;我不打电话commitSynccommitAsync。如果我现在consumer.poll(5000)再次调用,使用相同的消费者,我希望得到与我刚刚阅读的完全相同的消息,但相反,我收到了第二条消息。

consumer.poll在我明确承认之前,我如何才能继续分发相同的消息?

4

1 回答 1

3

您所描述的是预期的行为。每次调用时poll(),它都会返回下一条消息。您提交的偏移量仅在连接新消费者时使用,因此它知道从哪里(重新)开始。

在 MessageHub 中,我们将 设置session.timeout为 30 秒。因此,您需要poll()稍微加快调用速度以避免断开连接。如果您的处理时间比这更长,那么我可以考虑 2 个选项:

  • 使用 Kafka 0.10.2 并设置告诉您的 Kafka 客户端在处理之前的记录时max.poll.interval.ms保持会话活动(无需调用)。poll()(此功能是在 0.10.1 中添加的,但我们不支持该版本。0.10.2 有效,因为它能够与 0.10.0 代理一起使用)

  • 之后使用 seek() 移回上一个偏移量,poll以便它不断返回相同的记录。

希望这可以帮助!

于 2017-04-05T11:28:54.593 回答