28

想要使用高级消费者 api 实现延迟消费者

大意:

  • 按键生成消息(每个消息都包含创建时间戳)这确保每个分区都按生成时间排序消息。
  • auto.commit.enable=false(将在每个消息处理后显式提交)
  • 消费一条消息
  • 检查消息时间戳并检查是否已经过了足够的时间
  • 处理消息(此操作永远不会失败)
  • 提交 1 偏移量

    while (it.hasNext()) {
      val msg = it.next().message()
      //checks timestamp in msg to see delay period exceeded
      while (!delayedPeriodPassed(msg)) { 
         waitSomeTime() //Thread.sleep or something....
      }
      //certain that the msg was delayed and can now be handled
      Try { process(msg) } //the msg process will never fail the consumer
      consumer.commitOffsets //commit each msg
    }
    

关于这个实现的一些担忧:

  1. 提交每个偏移量可能会减慢 ZK
  2. consumer.commitOffsets 可以抛出异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)
  3. 等待很长时间而不提交偏移量的问题,例如延迟时间为 24 小时,将从迭代器获取下一个,休眠 24 小时,处理并提交(ZK 会话超时?)
  4. ZK 会话如何在不提交新偏移量的情况下保持活动状态?(设置一个 hive zookeeper.session.timeout.ms 可以解决死消费者而不识别它)
  5. 我还缺少其他问题吗?

谢谢!

4

5 回答 5

24

解决此问题的一种方法是使用不同的主题来推送所有要延迟的消息。如果所有延迟的消息都应该在相同的时间延迟之后处理,这将是相当简单的:

while(it.hasNext()) {
    val message = it.next().message()
    
    if(shouldBeDelayed(message)) {
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message, delay, delayTo)
    }
    else {
       process(message)
    }

    consumer.commitOffset()
}

现在将尽快处理所有常规消息,而那些需要延迟的消息将放在另一个主题上。

好消息是我们知道延迟主题头部的消息是应该首先处理的消息,因为它的 delayTo 值将是最小的。因此,我们可以设置另一个读取头消息的消费者,检查时间戳是否在过去,如果是,则处理消息并提交偏移量。如果不是,它不会提交偏移量,而是会一直休眠到那个时间:

while(it.hasNext()) {
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) {
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
    } else {
        delayProcessingUntil(delayedMessage.delayTo)
    }
}

如果有不同的延迟时间,您可以按延迟划分主题(例如 24 小时、12 小时、6 小时)。如果延迟时间比那个更动态,它会变得更复杂一些。您可以通过引入两个延迟主题来解决它。从延迟主题中读取所有消息A并处理其delayTo值为过去的所有消息。在其他人中,您只需找到最接近的人delayTo,然后将它们放在主题上B。睡眠直到最接近的一个应该被处理并反向执行,即处理来自 topic 的消息B并将一次不应该被处理的消息放回 topic A

回答您的具体问题(有些问题已在对您问题的评论中得到解决)

  1. 提交每个偏移量可能会减慢 ZK

您可以考虑切换到在 Kafka 中存储偏移量(从 0.8.2 开始提供的功能,请查看offsets.storage消费者配置中的属性)

  1. consumer.commitOffsets 可以抛出异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)

我相信它可以,例如,如果它无法与偏移存储进行通信。正如你所说,使用幂等消息可以解决这个问题。

  1. 等待很长时间而不提交偏移量的问题,例如延迟时间为 24 小时,将从迭代器获取下一个,休眠 24 小时,处理并提交(ZK 会话超时?)

除非消息本身的处理时间超过会话超时,否则上述解决方案不会出现问题。

  1. ZK 会话如何在不提交新偏移量的情况下保持活动状态?(设置一个 hive zookeeper.session.timeout.ms 可以解决死消费者而不识别它)

再次使用上述内容,您不需要设置长时间的会话超时。

  1. 我还缺少其他任何问题吗?

总有;)

于 2015-08-20T09:23:26.060 回答
6

使用 Tibco EMS 或其他 JMS 队列。他们内置了重试延迟。Kafka 可能不是您正在做的事情的正确设计选择

于 2017-04-04T14:34:14.060 回答
3

在您的情况下,我会建议另一条路线。

解决消费者主线程中的等待时间是没有意义的。这将是如何使用队列的反模式。从概念上讲,您需要尽可能快地处理消息并将队列保持在低负载因子。

相反,我会使用一个调度程序来为您需要延迟的每条消息安排作业。通过这种方式,您可以处理队列并创建将在预定义时间点触发的异步作业。

使用这种技术的缺点是它对将计划作业保存在内存中的 JVM 的状态是敏感的。如果该 JVM 失败,您将失去计划的作业,并且您不知道该任务是否已执行。

有一些调度器实现,尽管可以配置为在集群环境中运行,从而使您免受 JVM 崩溃的影响。

看看这个java调度框架:http ://www.quartz-scheduler.org/

于 2016-09-23T09:52:21.840 回答
1

在我们的一项任务中,我们遇到了同样的问题。虽然最终在不使用延迟队列的情况下解决了这个问题,但在探索解决方案时,我们发现最好的方法是使用API提供的功能pause。这种方法及其动机在这里得到了完美的描述:https ://medium.com/naukri-engineering/retry-mechanism-and-delay-queues-in-apache-kafka-528a6524f722resumeKafkaConsumer

于 2021-10-07T09:56:12.870 回答
0

Keyed-list on schedule 或其 redis 替代方案可能是最好的方法。

于 2018-01-25T10:22:11.570 回答