1

我正在使用 Alpakka AMQP 库 ( https://developer.lightbend.com/docs/alpakka/current/amqp.html ) 来处理反应流中的 RabbitMQ 消息并将它们转储到 Kafka 中。

我们正在使用 Avro 和 Schema Registry,因此格式错误的消息无法通过验证,需要根据具体情况进行处理。我们想要的行为是应用程序死掉,并在 30 秒 - 1 分钟后重新启动,在此期间,我们能够使用 UI 将消息从队列中拉出以查看它有什么问题。

出于某种原因,我nack()似乎没有将消息释放回来,并且它保持在Unacked状态并且在释放之前无法查看。如何做到这一点?我将包含我的代码片段:

Flow[CommittableIncomingMessage]
      .map { msg =>
        rabbitReceivedCounter.increment()
        reconciliationGauge.increment()

        val json = msg.message.bytes.utf8String

        val record = Try(converter.convert(json)) match {
          case Success(result) => result
          case Failure(e) => e match {
            case e: JsonToAvroConversionException ⇒
              val errorMsg = s"Failed to serialize to avro: ${e.getMessage} - Field: ${e.getFieldName} - Value: ${e.getValue}"
              failurePublisher.publishError(errorMsg, StatusCodes.BadRequest.intValue)
              kafkaFailureCounter.increment()
              Await.result(msg.nack(), Duration.Undefined)
              throw e

nack()是 a java.util.Future,所以只是为了测试我Await在它周围扔了一个以确保问题不是我在信号可以将它传递给 Rabbit 之前抛出错误。

4

1 回答 1

1

当流中抛出异常时,导致错误的元素通常会丢失。一个想法是将 Avro 转换卸载到一个既返回转换结果又返回原始消息的参与者:这种方法将允许您acknack消息取决于它是否可转换为 Avro。

例如,演员可能看起来像下面这样:

case class AvroResult(avro: Option[Avro], msg: CommittableIncomingMessage)
                                // ^ change this to whatever type you're using

class ConverterActor extends Actor {
  val converter = ???

  def receive = {
    case msg: CommittableIncomingMessage =>
      try {
        val json = msg.message.bytes.utf8String
        val avro = converter.convert(json)
        sender() ! AvroResult(Some(avro), msg)
      } catch {
        case _ =>
          sender() ! AvroResult(None, msg)
      }
  }
}

然后你可以在你的流中询问这个演员:

val converterActor = system.actorOf(Props[ConverterActor])

val source: Source[Avro, _] = amqpSource
  .ask[AvroResult](converterActor)
  .mapAsync(1) {
    case AvroResult(Some(avro), msg) => // ack
      msg.ack().map(_ => Some(avro))
    case AvroResult(None, msg) => // nack
      msg.nack().map(_ => None)
  }
  .collect {
    case Some(avro) => avro
  }

上面Source的代码向下游发出了 Avro 转换的消息,这些消息使用ack. 不可转换的消息将被拒绝nack并且不会传递到下游。

于 2018-08-09T12:13:42.010 回答