1

我在从本地 Kafka INPUT_TOPIC 读取并写入另一个本地 Kafka OUTPUT_TOPIC 的光束管道下方运行。我创建了一个发布者来提供 INPUT_TOPIC (手动)和一个消费者来检查我在 OUTPUT_TOPIC 上得到了什么,但想知道它是否是测试精确一次语义的正确设置。

Beam 和 Kafka 相对较新,因此正在寻找有关如何以更好的方式测试此管道的建议,并确认恰好一次语义在本地环境中有效。

注意: 我已经在我的机器中安装了 Apache Spark 并使用-Pspark-runner选项运行管道。

光束管线示例

p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic(INPUT_TOPIC)
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "test.group"))
.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false))
.withReadCommitted()
.commitOffsetsInFinalize()
.withoutMetadata())
.apply(Values.<String>create())
.apply(KafkaIO.<Void, String>write()
  .withBootstrapServers("localhost:9092")
  .withTopic(OUTPUT_TOPIC)c
  .withValueSerializer(StringSerializer.class)
  .withEOS(1, "eos-sink-group-id")
  .values()
);

p.run();

谢谢

4

0 回答 0