0

我正在使用 DirectRunner 运行多个 Apache Beam KafkaIO 实例,这些实例从同一主题中读取。但是消息正在传递到所有正在运行的实例。在看到我发现的 Kafka 配置后,组名会附加一些唯一的前缀,并且每个实例都有唯一的组名。

  1. group.id = Reader- 0_offset_consumer_559337182_my_group
  2. group.id = Reader- 0_offset_consumer_559337345_my_group

因此,每个实例都分配了唯一的group.id,这就是消息被传递到所有实例的原因。

pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read().withReadCommitted()
            .withConsumerConfigUpdates(
                    new ImmutableMap.Builder<String, Object>().put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                            .put(ConsumerConfig.GROUP_ID_CONFIG, "my_group")
                            .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5).build())
            .withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class)
            .withBootstrapServers(servers).withTopics(Collections.singletonList(topicName)).withoutMetadata()

那么我必须提供什么配置才能使组中的所有消费者都不会阅读相同的消息

4

1 回答 1

0

是的,这是因为组名附加了一些唯一的前缀,并且每个实例都有唯一的组名。因为这个kafka不知道你是否再启动一个实例。因此,相同的消息会传递给所有消费者。

因此,我可以想到的一种解决方法是,不要给出主题并让 beam 计算所有分区的消费者数量,您可以使用 DirectRunner 为每个 apache beam KafkaIO 实例显式地给出主题分区。

您必须将 aList类型传递TopicPartition给方法withTopicPartitions

KafkaIO.<String, String>read()
                .withCreateTime(Duration.standardMinutes(1))
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                        .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                        .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                        .build())
                .withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0)))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata();

上面的代码将只读取来自partition 0. 因此,通过这种方式,您可以启动同一程序的多个实例,而无需将相同的消息传递给所有消费者

于 2020-07-21T01:49:49.703 回答