2

我已经在 Google Groups 上问过这个问题,但还没有收到任何回复。所以在这里为不同的观众发布这个。

我们正在为我们的应用程序使用 Reactive-Kafka。我们有一个如下场景,如果在处理消息时发生任何异常,我们希望停止向消费者发送消息。该消息应在规定的时间后或在消费者方的明确要求下重试。假设使用我们目前的方法,如果消费者的数据库关闭了一段时间,它仍然会尝试从 kafka 读取并处理消息,但由于数据库问题而导致处理失败。这将使应用程序不必要地忙碌。取而代之的是,我们希望暂停消费者以在规定的时间内接收消息(例如,等待 30 分钟重试)。我们不确定如何处理这种情况。

是否可以这样做?我错过了什么吗?

以下是从响应式 kafka 中获取的示例代码:

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
      .mapAsync(1) { msg =>
        Future {
          /**
            * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
            */
        }.map(_ => msg.committableOffset).recover {
          case ex => {
            /**
              * HOW TO DO ????????
              * On exception, I would like to tell stream to stop sending messages and pause the consumer and try again within stipulated time
              * or on demand from the last committed offset
              */
            throw ex
          }
        }
      }
      .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
        batch.updated(elem)
      }
      .mapAsync(3)(_.commitScaladsl())
      .runWith(Sink.ignore)
4

2 回答 2

1

请注意,您可能需要将srcfrom的具体化值映射akka.kafka.scaladsl.Consumer.Controlakka.NotUsed以便在以下位置引用它recoverWithRetries

val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .mapAsync(1) { msg =>
    Future {
      /**
        * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
        */
    }.map(_ => msg.committableOffset)
  .mapMaterializedValue(_ => akka.NotUsed)
于 2017-02-06T17:23:21.863 回答
0

recoverWithRetries为此目的有一个组合器。作为参考,请参阅此答案文档

你可以提取你的来源

val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
      .mapAsync(1) { msg =>
        Future {
          /**
            * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
            */
        }.map(_ => msg.committableOffset)

然后做

src
  .recoverWithRetries(attempts = -1, {case e: MyDatabaseException => 
    logger.error(e)
    src.delay(30.minutes, DelayOverflowStrategy.backpressure)})
  ...

(以尝试次数=-1 重试意味着无限期地重试)

于 2017-02-03T08:04:34.807 回答