我正在使用 DirectRunner 运行多个 Apache Beam KafkaIO 实例,这些实例从同一主题中读取。但是消息正在传递到所有正在运行的实例。在看到我发现的 Kafka 配置后,组名会附加一些唯一的前缀,并且每个实例都有唯一的组名。
- group.id = Reader- 0_offset_consumer_559337182_my_group
- group.id = Reader- 0_offset_consumer_559337345_my_group
因此,每个实例都分配了唯一的group.id,这就是消息被传递到所有实例的原因。
pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read().withReadCommitted()
.withConsumerConfigUpdates(
new ImmutableMap.Builder<String, Object>().put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group")
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5).build())
.withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class)
.withBootstrapServers(servers).withTopics(Collections.singletonList(topicName)).withoutMetadata()
那么我必须提供什么配置才能使组中的所有消费者都不会阅读相同的消息