想要使用高级消费者 api 实现延迟消费者
大意:
- 按键生成消息(每个消息都包含创建时间戳)这确保每个分区都按生成时间排序消息。
- auto.commit.enable=false(将在每个消息处理后显式提交)
- 消费一条消息
- 检查消息时间戳并检查是否已经过了足够的时间
- 处理消息(此操作永远不会失败)
提交 1 偏移量
while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something.... } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail the consumer consumer.commitOffsets //commit each msg }
关于这个实现的一些担忧:
- 提交每个偏移量可能会减慢 ZK
- consumer.commitOffsets 可以抛出异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)
- 等待很长时间而不提交偏移量的问题,例如延迟时间为 24 小时,将从迭代器获取下一个,休眠 24 小时,处理并提交(ZK 会话超时?)
- ZK 会话如何在不提交新偏移量的情况下保持活动状态?(设置一个 hive zookeeper.session.timeout.ms 可以解决死消费者而不识别它)
- 我还缺少其他问题吗?
谢谢!