我在数据流中使用 KafkaIO 来读取来自一个主题的消息。我使用以下代码。
KafkaIO.<String, String>read()
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 8000).put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000)
.build())
// .commitOffsetsInFinalize()
.withTopics(Collections.singletonList(topicNames))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata();
我使用直接运行器在本地运行数据流程序。一切运行良好。我并行运行同一程序的另一个实例,即另一个消费者。现在我在管道处理中看到重复的消息。
虽然我已经提供了消费者组 ID,但使用相同的消费者组 ID(同一程序的不同实例)启动另一个消费者不应该处理另一个消费者处理的相同元素,对吗?
使用数据流运行器结果如何?