我正在使用 reactor kafka 并有一个自定义 AvroDeserializer 类用于消息的反序列化。
现在我有一个案例,对于某些有效负载,反序列化类会引发异常。我的 Kafka 侦听器在尝试读取此类记录时立即死亡。我尝试使用onErrorReturn
(doOnError
和onErrorContinue
) 的组合来处理此异常,但是,它有助于记录异常,但未能使用后续记录。
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
public T deserialize( String topic, byte[] data ) {
try {
//logic to deserialize
}
catch {
// throw new SerializationException("error deserializing" );
}
}
}
在侦听器端,我正在尝试这样处理->
@EventListener(ApplicationStartedEvent.class)
public class listener() {
KakfaReceiver<String, Object> receiver; // kafka listener
receiver.receive()
.delayUntil(do something)//logic to update the record to db
.doOnError(handle error)//trying to handle the exceptions including desrialization exception - logging them to a system
.onErrorContinue((throwable, o) -> log.info("continuing"))
.doOnNext(r->r.receiverOffset().acknowledge())
.subscribe()
}
一种选择是不从 Deserializer 类中抛出异常,但我想在单独的系统中记录此类异常以进行分析,因此希望在 kafka 侦听器端处理此类记录。关于如何实现这一点的任何想法?