7

我是 Kafka 的新手,到目前为止,我对消费者的理解是基本上有两种类型的实现。
1)高级消费者/消费者群体
2)简单消费者

关于高级抽象最重要的部分是当 Kafka 不关心处理偏移量时使用它,而简单消费者对偏移量管理提供更好的控制。让我感到困惑的是,如果我想在多线程环境中运行消费者并且还想控制偏移量。如果我使用消费者组,这是否意味着我必须从存储在 zookeeper 中的最后一个偏移量中读取?这是我唯一的选择。

4

2 回答 2

8

大多数情况下,高级消费者 API 不允许您直接控制偏移量。

首次创建消费者组时,您可以使用该auto.offset.reset属性告诉它是从 kafka 存储的最旧消息还是最新消息开始。

auto.commit.enable您还可以通过设置为 false来控制高级消费者何时向 zookeeper 提交新的偏移量。

由于高级消费者将偏移量存储在 zookeeper 中,因此您的应用可以直接访问 zookeeper 并操作偏移量——但这将在高级消费者 API 之外。

您的问题有点令人困惑,但您可以在多线程环境中使用简单的使用者。这就是高级消费者所做的。

于 2013-08-02T10:46:31.597 回答
1

在 Apache Kafka 0.9 和 0.10 中,消费者组管理完全在 Kafka 应用程序中由 Broker(用于协调)和主题(用于状态存储)处理。

当消费者组首次订阅主题时,设置auto.offset.reset决定消费者开始消费消息的位置(http://kafka.apache.org/documentation.html#newconsumerconfigs

您可以注册 aConsumerRebalanceListener以在为特定使用者分配主题/分区时接收通知。

消费者运行后,您可以使用seek,seekToBeginningseekToEnd从特定偏移量获取消息。seek影响该poll消费者的下一个,并存储在下一次提交时(例如commitSynccommitAsync或者当 auto.commit.interval 过去时,如果启用。)

消费者 javadocs 提到了更具体的情况: http: //kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

分配分区后,您可以将 Kafka 提供的组管理与通过 seek(..) 手动管理偏移量相结合。

于 2016-08-14T06:10:47.797 回答