14

我正在阅读 Kafka: The Definitive Guide,并希望更好地了解重新平衡侦听器。书中的示例 simple 使用 aHashMap来维护当前已处理的偏移量,并将在撤销分区时提交当前状态。我的担忧是:

关于代码示例,我有两个问题/疑问:

  1. 使用的语言让我假设这些回调是在不同的线程上进行的。那么,在应用当前偏移量时不应该考虑线程安全吗?此外,在提交后不应该取消当前批次吗?
  2. 它说使用 commitSync 来确保在重新平衡进行之前提交偏移量。但是,这仅在该消费者中是同步的。是否有某种机制使协调器在收到所有订阅消费者的回复后才会继续?
4

1 回答 1

11
  1. 我重新阅读了书中的部分,我同意我也有点困惑!

    Javadoc指出:

    每当分区分配更改时,此回调将仅作为 poll(long) 调用的一部分在用户线程中执行。

    我查看了代码并确认重新平衡侦听器方法确实在拥有消费者的同一线程中调用。

  2. 是的,您应该commitSync()在重新平衡侦听器中提交时使用。

    为了解释原因,让我们看一下黄金路径的例子。我们从消费者愉快地消费开始,并定期向协调器发送心跳。在某些时候,协调REBALANCE_IN_PROGRESS器会向心跳请求返回错误。这可能是由想要加入组的新成员、成员离开或心跳失败或从订阅中添加/删除新分区引起的。此时,所有消费者都需要重新加入群组。

    在尝试重新加入组之前,消费者将同步执行 ConsumerRebalanceListener.onPartitionsRevoked(). 一旦监听器返回,消费者将向协调器发送一个 JoinRequest 以重新加入该组。

    也就是说,我认为这就是您正在考虑的,如果您的回调花费太长时间 (> session.timeout.ms) 提交,则该组可能已经在另一代中,并且具有偏移量的分区试图被分配给另一个成员。在这种情况下,即使是同步的,提交也会失败。但是通过commitSync()在侦听器中使用,您可以保证消费者在完成提交之前不会重新加入组。

于 2018-04-20T15:19:56.450 回答