1

我在数据流中使用 KafkaIO 来读取来自一个主题的消息。我使用以下代码。

KafkaIO.<String, String>read()
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 8000).put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000)
                .build())
//                .commitOffsetsInFinalize()
                .withTopics(Collections.singletonList(topicNames))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata();

我使用直接运行器在本地运行数据流程序。一切运行良好。我并行运行同一程序的另一个实例,即另一个消费者。现在我在管道处理中看到重复的消息。

虽然我已经提供了消费者组 ID,但使用相同的消费者组 ID(同一程序的不同实例)启动另一个消费者不应该处理另一个消费者处理的相同元素,对吗?

使用数据流运行器结果如何?

4

1 回答 1

2

我认为您设置的选项不能保证跨管道的消息不重复传递。

  • ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG:这是Kafka 消费者的标志,而不是 Beam 管道本身。似乎这是最好的努力和定期的,所以你可能仍然会看到跨多个管道的重复项。

  • withReadCommitted():这只是意味着 Beam 不会读取未提交的消息。同样,它不会防止跨多个管道的重复。

有关Beam 源用于确定 Kafka 源起点的协议,请参见此处。

为了保证不重复交付,您可能必须从不同的主题或不同的订阅中阅读。

于 2020-05-18T01:26:49.620 回答