我正在使用 Alpakka AMQP 库 ( https://developer.lightbend.com/docs/alpakka/current/amqp.html ) 来处理反应流中的 RabbitMQ 消息并将它们转储到 Kafka 中。
我们正在使用 Avro 和 Schema Registry,因此格式错误的消息无法通过验证,需要根据具体情况进行处理。我们想要的行为是应用程序死掉,并在 30 秒 - 1 分钟后重新启动,在此期间,我们能够使用 UI 将消息从队列中拉出以查看它有什么问题。
出于某种原因,我nack()
似乎没有将消息释放回来,并且它保持在Unacked
状态并且在释放之前无法查看。如何做到这一点?我将包含我的代码片段:
Flow[CommittableIncomingMessage]
.map { msg =>
rabbitReceivedCounter.increment()
reconciliationGauge.increment()
val json = msg.message.bytes.utf8String
val record = Try(converter.convert(json)) match {
case Success(result) => result
case Failure(e) => e match {
case e: JsonToAvroConversionException ⇒
val errorMsg = s"Failed to serialize to avro: ${e.getMessage} - Field: ${e.getFieldName} - Value: ${e.getValue}"
failurePublisher.publishError(errorMsg, StatusCodes.BadRequest.intValue)
kafkaFailureCounter.increment()
Await.result(msg.nack(), Duration.Undefined)
throw e
这nack()
是 a java.util.Future
,所以只是为了测试我Await
在它周围扔了一个以确保问题不是我在信号可以将它传递给 Rabbit 之前抛出错误。