Alpakka kafka 消费者处理记录,直到遇到无法反序列化并静默死亡而不留下错误消息的记录。如何强制它报告错误信息?
问问题
243 次
1 回答
0
根据文件,
当流失败时,库内部将处理所有底层资源。
(反)序列化
如果从 Kafka 读取失败是由其他原因引起的,例如反序列化问题,那么该阶段将立即失败。如果您预计会出现这种情况,请考虑使用原始字节数组并在随后的映射阶段进行反序列化,您可以在其中使用监督来跳过失败的元素。
建议使用字节数组作为值(反)序列化消息,并在 Akka Stream 的映射操作中进行(反)序列化,而不是直接在 Kafka(反)序列化器中实现它。
Spray JSON示例(此示例使用恢复对无法正确解析的数据做出反应并忽略错误元素):
import spray.json._
final case class SampleData(name: String, value: Int)
object SampleDataSprayProtocol extends DefaultJsonProtocol {
implicit val sampleDataProtocol: RootJsonFormat[SampleData] = jsonFormat2(SampleData)
}
import SampleDataSprayProtocol._
val resumeOnParsingException = ActorAttributes.withSupervisionStrategy {
new akka.japi.function.Function[Throwable, Supervision.Directive] {
override def apply(t: Throwable): Supervision.Directive = t match {
case _: spray.json.JsonParser.ParsingException => Supervision.Resume
case _ => Supervision.stop
}
}
}
val consumer = Consumer
.plainSource(consumerSettings, Subscriptions.topics(topic))
.map { consumerRecord =>
val value = consumerRecord.value()
val sampleData = value.parseJson.convertTo[SampleData]
sampleData
}
.withAttributes(resumeOnParsingException)
.toMat(Sink.seq)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
于 2019-06-14T16:47:55.407 回答