我正在尝试在 apache Beam 上使用 KafkaIO 从多个 kafka 经纪人那里读取数据。偏移管理的默认选项是 kafka 分区本身(不再使用 kafka >0.9 的 zookeper)。使用此设置,当我重新启动作业/管道时,存在重复和丢失记录的问题。
根据我的阅读,处理此问题的最佳方法是管理外部数据存储的偏移量。是否可以使用当前版本的 apache beam 和 KafkaIO 来做到这一点?我现在使用的是 2.2.0 版本。
而且,从 kafka 阅读后,我会将其写入 BigQuery。KafkaIO 中是否有设置,只有在将消息插入 BigQuery 后才能设置提交的消息?我现在只能找到自动提交设置。