这个问题与https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/455几乎相同,但对于Functional
方法
@Bean
public Consumer<Message<Foo>> fooConsumer() {
return message -> {
String key = // From message header - KafkaHeaders.RECEIVED_MESSAGE_KEY
Foo payLoad = message.getPayload();
if (payLoad == null) {
deleteSomething(key);
} else {
addSomething(key, payLoad);
}
};
}
当产生正确的 'Foo' (json) 时,调用 fooConsumer。但是,当产生“空”有效负载/墓碑时,永远不会调用使用者函数。
解决方法我尝试使用有效的自定义“转换器”方法:
@Component
public class KafkaNullConverter extends MappingJackson2MessageConverter {
@Override
protected Object convertFromInternal(Message<?> message, @NonNull Class<?> targetClass, Object conversionHint) {
if (message.getPayload() instanceof KafkaNull) {
return Foo.builder().isDeleted(true).build(); // Note: new `isDeleted` field added to `Foo`
}
return super.convertFromInternal(message, targetClass, conversionHint);
}
}
然后fooConsumer
可以检查payload.isDeleted()
处理null
情况。但这是冗长的,会污染 pojo/model 类,并且必须为每个消费者重复。
我知道 spring-integration 不能与 null 一起使用。但是有没有更好/标准的方法来处理使用功能方法的“墓碑”用例?
版本:3.0.4.RELEASE
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.4.RELEASE</version>
</dependency>