0

我正在尝试使用同步 PULL API 使用 Google PubSub 消息。这在 Apache Beam Google PubSub IO 连接器库中可用。我想使用 KafkaIO 将消费的消息写入 Kafka。我想使用 FlinkRunner 来执行作业,因为我们在 GCP 之外运行这个应用程序。

我面临的问题是消费的消息没有在 GCP PubSub 中得到确认。我已经确认本地 Kafka 实例具有来自 GCP PubSub 的消息。GCP DataFlow 中的文档表明,当管道使用数据接收器(在我的情况下为 Kafka)终止时,数据包已完成。

但是由于代码是在 Apache Flink 而不是 GCP DataFlow 中运行的,我认为某种回调不会被触发与确认提交的消息相关。
我在这里做错了什么?

                   pipeline
                    .apply("Read  GCP PubSub Messages", PubsubIO.readStrings()
                            .fromSubscription(subscription)
                    )
                    .apply(ParseJsons.of(User.class))
                    .setCoder(SerializableCoder.of(User.class))
                    .apply("Filter-1", ParDo.of(new FilterTextFn()))
                    .apply(AsJsons.of(User.class).withMapper(new ObjectMapper()))
                    .apply("Write to Local Kafka",
                            KafkaIO.<Void,String>write()
                                    .withBootstrapServers("127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094")
                                    .withTopic("test-topic")
                                    .withValueSerializer((StringSerializer.class))
                                    .values()
                    );
4

2 回答 2

0

在PubSub IO 类的 Beam文档中提到了这一点:

检查点既用于向 Pubsub 确认收到的消息(以便它们可能在 Pubsub 端停用),也用于在需要恢复检查点时对已使用的消息进行 NACK(以便 Pubsub 将及时重新发送这些消息)。

ACK 未链接到数据流,您应该在数据流上具有相同的行为。ack 在检查点上发送。通常,检查点是您在流中设置的窗口。

但是,你没有设置窗口!默认情况下,窗口是全局的,并且仅在最后关闭,如果你优雅地停止你的工作(甚至,我不确定这一点)。无论如何,更好的解决方案是使用固定窗口(例如 5 分钟)来确认每个窗口上的消息。

于 2020-11-10T21:34:41.287 回答
0

我修复此解决方案的方法是使用 Guillaume Blaquiere ( https://stackoverflow.com/users/11372593/guillaume-blaquiere ) 建议查看检查点。即使在管道中添加了 Window.into() 函数,源 PubSub 订阅端点也没有收到 ACK。
问题出在 Flink 服务器配置中,我没有提到检查点配置。如果没有这些参数,检查点将被禁用。

state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink-1.9.3/state/checkpoints/

这些配置应该放在 flink_home/conf/flink-conf.yaml 中。添加这些条目并重新启动 flink 后。在 GCP pubsub 监控图表中,所有积压的(未确认的消息)都变为 0。

于 2020-11-13T04:35:26.283 回答