3

我正在使用 reactor kafka 并有一个自定义 AvroDeserializer 类用于消息的反序列化。

现在我有一个案例,对于某些有效负载,反序列化类会引发异常。我的 Kafka 侦听器在尝试读取此类记录时立即死亡。我尝试使用onErrorReturn(doOnErroronErrorContinue) 的组合来处理此异常,但是,它有助于记录异常,但未能使用后续记录。

public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
   public T deserialize( String topic, byte[] data ) {
       try {
         //logic to deserialize
       }
       catch {
          // throw new SerializationException("error deserializing" );
       }
   }
}

在侦听器端,我正在尝试这样处理->

@EventListener(ApplicationStartedEvent.class)

public class listener() {
   KakfaReceiver<String, Object> receiver; // kafka listener
   receiver.receive()
   .delayUntil(do something)//logic to update the record to db
   .doOnError(handle error)//trying to handle the exceptions including desrialization exception - logging them to a system
   .onErrorContinue((throwable, o) -> log.info("continuing"))
   .doOnNext(r->r.receiverOffset().acknowledge())
   .subscribe()

}

一种选择是不从 Deserializer 类中抛出异常,但我想在单独的系统中记录此类异常以进行分析,因此希望在 kafka 侦听器端处理此类记录。关于如何实现这一点的任何想法?

4

2 回答 2

1

看起来像评论中关于使用的建议

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue

在大多数情况下都可以使用。但我想不出办法让它在 Reactor Spring Kafka 中工作。目前,我继续采用不从反序列化器中抛出异常并添加逻辑以将其记录在其中的方法,这解决了 Kafka 消费者在毒记录之后无法消费后续记录的问题

于 2020-08-16T10:43:45.543 回答
0

我认为您的问题是您确实抛出了异常,并且即使在反应流的上下文中,任何异常都是终端。只是不要抛出异常。

将您的 T(从反序列化()返回)包装在一个类中,这可以指示反序列化是成功还是失败。如果您不关心确切的错误,您可以使用 Optional<T> ,其中空的 optional 将指示反序列化错误。如果您使用的是 vavr 库,则可以从那里获取 Option<T> 或 Try<T> 以将实际的反序列化存储在失败端。

例如,您的反序列化器看起来像

public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
   public Optional<T> deserialize( String topic, byte[] data ) {
       try {
         //logic to deserialize
         return Optional.of(result);
       }
       catch {
          return Optional.empty();
       }
   }
}

现在您不再投掷,您的通量仍然存在,例如您可以过滤掉空的 Optional 实例。

PS 如果不想使用包装器类型,可以返回 null 而不是抛出并过滤掉 null 值。不过,我不建议这样做 - 更容易忘记反序列化()行为并稍后获得 NullPointerException。

于 2020-12-12T21:39:34.880 回答