我有一些 apache Kafka 消费者代码正在运行,并获取用户详细信息和进程并相应地更新。
目前我正在使用 kafka 偏移量来跟踪我正确处理的记录。
每当我的消费者由于任何原因重新启动时(一个节点出现故障,其他节点获取数据,或消费者重新启动等),它首先根据处理的偏移量设置 Kafka 偏移量读取
consumer.seek(//get the offset from db);
并开始投票
consumer.poll()
现在的问题是由于一些不同区域的失败测试,相同的应用程序将在其他地方运行并开始处理新数据。
即数据库是全局同步的,但是不同区域的kafka集群之间没有同步,所以我得到了偏移量,它没有针对不同区域的相同主题进行排序。
因此,我最终在其他地区寻求不同的偏移量。
每当发生故障转移时,第一个集群中的数据不会被转移到第二个集群,这可以根据业务需求进行。
当前的问题是,当新记录到达第二个集群时,我不应该从第一个集群应用程序设置的偏移量开始,这可以通过将偏移量与 Kafka clusterID(name) 一起保存来管理,因此每当寻找偏移量时,我都可以与集群一起查询获得基于区域的偏移量。
有没有更好的方法来处理这种情况?