Apache Beam KafkaIO 支持 kafka 消费者仅从指定分区读取。我有以下代码。
KafkaIO.<String, String>read()
.withCreateTime(Duration.standardMinutes(1))
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build())
.commitOffsetsInFinalize()
.withTopicPartitions(List<TopicPartitions>)
我有以下两个问题。
- 如何从 kafka 获取分区名称?我如何在 kafkaIO 中提及它?
- Apache Beam 生成的 kafka 消费者数量是否等于创建 kafka 消费者时提到的分区列表?