-1

Apache Beam KafkaIO 支持 kafka 消费者仅从指定分区读取。我有以下代码。

KafkaIO.<String, String>read()
                .withCreateTime(Duration.standardMinutes(1))
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                        .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                        .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                        .build())
                .commitOffsetsInFinalize()
                .withTopicPartitions(List<TopicPartitions>)

我有以下两个问题。

  1. 如何从 kafka 获取分区名称?我如何在 kafkaIO 中提及它?
  2. Apache Beam 生成的 kafka 消费者数量是否等于创建 kafka 消费者时提到的分区列表?
4

1 回答 1

-1

我自己找到了答案。

如何告诉 kafkaIO 从特定分区读取?

kafkaIO 具有withTopicPartitions(List<TopicPartitions>)接受TopicPartition对象列表的方法。

主题分区被命名为从零开始的序号。因此,以下应该工作

KafkaIO.<String, String>read()
                .withCreateTime(Duration.standardMinutes(1))
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                        .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                        .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                        .build())
                .commitOffsetsInFinalize()
                .withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0),new TopicPartition(topicName, 1),new TopicPartition(topicName, 2)))

要对其进行测试,请使用kafkacat以下命令

kafkacat -P -b localhost:9092 -t sample -p 0- 此命令生成到指定分区。

Apache Beam 生成的 kafka 消费者数量是否等于创建 kafka 消费者时提到的分区列表?

它将生成一个消费者组,其消费者数量与在构建 kafka Producer 对象期间明确提到的分区数量相同。

于 2020-07-05T18:39:33.350 回答