我正在尝试使用同步 PULL API 使用 Google PubSub 消息。这在 Apache Beam Google PubSub IO 连接器库中可用。我想使用 KafkaIO 将消费的消息写入 Kafka。我想使用 FlinkRunner 来执行作业,因为我们在 GCP 之外运行这个应用程序。
我面临的问题是消费的消息没有在 GCP PubSub 中得到确认。我已经确认本地 Kafka 实例具有来自 GCP PubSub 的消息。GCP DataFlow 中的文档表明,当管道使用数据接收器(在我的情况下为 Kafka)终止时,数据包已完成。
但是由于代码是在 Apache Flink 而不是 GCP DataFlow 中运行的,我认为某种回调不会被触发与确认提交的消息相关。
我在这里做错了什么?
pipeline
.apply("Read GCP PubSub Messages", PubsubIO.readStrings()
.fromSubscription(subscription)
)
.apply(ParseJsons.of(User.class))
.setCoder(SerializableCoder.of(User.class))
.apply("Filter-1", ParDo.of(new FilterTextFn()))
.apply(AsJsons.of(User.class).withMapper(new ObjectMapper()))
.apply("Write to Local Kafka",
KafkaIO.<Void,String>write()
.withBootstrapServers("127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094")
.withTopic("test-topic")
.withValueSerializer((StringSerializer.class))
.values()
);