我是 Kafka 的新手,到目前为止,我对消费者的理解是基本上有两种类型的实现。
1)高级消费者/消费者群体
2)简单消费者
关于高级抽象最重要的部分是当 Kafka 不关心处理偏移量时使用它,而简单消费者对偏移量管理提供更好的控制。让我感到困惑的是,如果我想在多线程环境中运行消费者并且还想控制偏移量。如果我使用消费者组,这是否意味着我必须从存储在 zookeeper 中的最后一个偏移量中读取?这是我唯一的选择。
我是 Kafka 的新手,到目前为止,我对消费者的理解是基本上有两种类型的实现。
1)高级消费者/消费者群体
2)简单消费者
关于高级抽象最重要的部分是当 Kafka 不关心处理偏移量时使用它,而简单消费者对偏移量管理提供更好的控制。让我感到困惑的是,如果我想在多线程环境中运行消费者并且还想控制偏移量。如果我使用消费者组,这是否意味着我必须从存储在 zookeeper 中的最后一个偏移量中读取?这是我唯一的选择。
大多数情况下,高级消费者 API 不允许您直接控制偏移量。
首次创建消费者组时,您可以使用该auto.offset.reset
属性告诉它是从 kafka 存储的最旧消息还是最新消息开始。
auto.commit.enable
您还可以通过设置为 false来控制高级消费者何时向 zookeeper 提交新的偏移量。
由于高级消费者将偏移量存储在 zookeeper 中,因此您的应用可以直接访问 zookeeper 并操作偏移量——但这将在高级消费者 API 之外。
您的问题有点令人困惑,但您可以在多线程环境中使用简单的使用者。这就是高级消费者所做的。
在 Apache Kafka 0.9 和 0.10 中,消费者组管理完全在 Kafka 应用程序中由 Broker(用于协调)和主题(用于状态存储)处理。
当消费者组首次订阅主题时,设置auto.offset.reset
决定消费者开始消费消息的位置(http://kafka.apache.org/documentation.html#newconsumerconfigs)
您可以注册 aConsumerRebalanceListener
以在为特定使用者分配主题/分区时接收通知。
消费者运行后,您可以使用seek
,seekToBeginning
和seekToEnd
从特定偏移量获取消息。seek
影响该poll
消费者的下一个,并存储在下一次提交时(例如commitSync
,commitAsync
或者当 auto.commit.interval 过去时,如果启用。)
消费者 javadocs 提到了更具体的情况: http: //kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
分配分区后,您可以将 Kafka 提供的组管理与通过 seek(..) 手动管理偏移量相结合。