1

这个问题与https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/455几乎相同,但对于Functional方法

    @Bean
    public Consumer<Message<Foo>> fooConsumer() {
        return message -> {
            String key = // From message header - KafkaHeaders.RECEIVED_MESSAGE_KEY
            Foo payLoad = message.getPayload();

            if (payLoad == null) {
                deleteSomething(key);
            } else {
                addSomething(key, payLoad);
            }
        };
    }

当产生正确的 'Foo' (json) 时,调用 fooConsumer。但是,当产生“空”有效负载/墓碑时,永远不会调用使用者函数。

解决方法我尝试使用有效的自定义“转换器”方法:

@Component
public class KafkaNullConverter extends MappingJackson2MessageConverter {

    @Override
    protected Object convertFromInternal(Message<?> message, @NonNull Class<?> targetClass, Object conversionHint) {
        if (message.getPayload() instanceof KafkaNull) {
            return Foo.builder().isDeleted(true).build(); // Note: new `isDeleted` field added to `Foo`
        }

        return super.convertFromInternal(message, targetClass, conversionHint);
    }
}

然后fooConsumer可以检查payload.isDeleted()处理null情况。但这是冗长的,会污染 pojo/model 类,并且必须为每个消费者重复。

我知道 spring-integration 不能与 null 一起使用。但是有没有更好/标准的方法来处理使用功能方法的“墓碑”用例?

版本:3.0.4.RELEASE

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-kafka</artifactId>
   <version>3.0.4.RELEASE</version>
</dependency>
4

1 回答 1

1

这最近在主分支上修复了https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/936https://github.com/spring-cloud/spring-cloud-功能/问题/557

于 2020-07-15T15:45:57.927 回答