2

很抱歉发布与 Kafka 图书馆相关的问题,因为没有多少人对图书馆的特定问题感兴趣。但是这个库是 golang-Kafka 实现中最常用的库之一。

我想使用监听主题的 Sarama 库创建一个简单的消费者。现在据我所知,在高级 Kafka API 中,默认情况下,如果未指定特定分区,消费者会监听所有主题分区。但是,在这个库中,Consumer 接口只有 ConsumePartition 函数,其中需要分区参数。函数的签名是:

ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) 

这让我有点困惑。有谁在这方面工作过?

另外,我有一个关于 Kafka 的基本问题。如果我有一个由 3 个消费者实例组成的消费者组,并且他们正在收听假设 2 个主题,每个主题有 2 个分区,那么我是否需要特别提及哪个消费者实例将消费到哪个分区或 Kafka Fetch API 将处理它它自己基于负载?

4

2 回答 2

3

我使用sarama-cluster,它是 Sarama 的开源扩展(Shopify Sarama 也推荐
使用 Sarama 集群,您可以使用此 API 创建消费者:

cluster.NewConsumer(brokers, consumerGroup, topics, kafkaConfig)

所以不需要分区。您应该只提供您的 Kafka 的地址、您brokers的名称consumer group以及topics您希望使用的名称。


消费者处理

为了维持秩序,您应该只为每个分区分配一个消费者。
因此,如果您的消费者组中有 3 个消费者,并且您希望他们消费 2 个主题,每个主题有 2 个分区,您应该分配如下:

partitions 1,2 -> consumer A  
partition 3 -> consumer B  
partition 4 -> consumer C 

您最终可能会发现其中一位消费者进展得更快(其中一个主题具有更高的吞吐量),您将需要重新平衡。
建议使用为您处理此问题的库(如 sarama-cluster)。

于 2018-08-28T06:13:38.747 回答
0

同时,Sarama 集群项目已被弃用,请参阅其 repo 中的弃用通知。好消息它已经被 PR 弃用了实施一个更高级别的消费者组 #1099,它专注于消费主题而不是专用分区。

官方 sarama repo 中的examples/文件夹提供了一个很好的Consumer Group Implementation Example。它就像一个魅力,不需要在 Sarama 之上的任何其他库。

预习:

config := sarama.NewConfig()
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), "mygroup", config)
client.Consume(ctx, strings.Split(topics, ","), &consumer) // should run in a loop
于 2021-05-21T10:37:16.193 回答