0

I am creating a simple Kafka Streaming application. My Producer is producing protobuf serialize messages to one topic and I'm using that topic in Kafka Streaming application to consumer messages. I'm trying to deserialize messages using valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde in my application.yml file. I'm getting below errors.

Errors:

org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 2, 0, 10, 13, 84, 105, 109, 101, 114, 32, 109, 101, 115, 115, 97, 103, 101, 16, 1, 34, 12, 8, -126, -107, -127, -120, 6, 16, -12, -88, -117, -114, 2]] from topic [MYINPUT-TOPIC]
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x1ff0a0d (above 0x0010ffff) at char #1, byte #7)
    at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195) ~[jackson-core-2.11.3.jar:2.11.3]
    at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158) ~[jackson-core-2.11.3.jar:2.11.3] 

My application.yml configuration file:

spring:
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-in-0:
          consumer:
            max-attempts: 3
            valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
            back-off-initial-interval: 100
            retryable-exceptions:
              javax:
                validation:
                  ValidationException: false
          destination: MYINPUT-TOPIC
          group: consumer-group
          concurrency: 2
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            schema-registry-u-r-l: http://localhost:8081
            auto-offset-reset: "earliest"
            configuration:
              commit-interval-ms: 100

process method to print actual deserialize message in the log:

@Component
@Slf4j
public class ProcessStream {

//Here below Timer object is Protobuf's auto-generated class, I am using it to deserialize messages. 
//I'm getting byte Aarry on this method when I'm debugging it.  
    @Bean
    public Consumer<KStream<String, Timer>> process() {
        return (InputStream) -> {
            InputStream.foreach((k,v) -> log.info(String.format("key: %s, value: %s",k, v)));
        };
    }
}

Please help me to resolve this issue. How can I deserialize messages using protobuf in Kafka Stream?

4

1 回答 1

0

您需要使用包装适当的 protobuf 反序列化器的 protobuf Serde。Confluent 模式注册表提供了一个 protobuf Serde 实现。有关详细信息,请参阅。我还没有测试过这个特定的实现,但看起来应该可以。如果您正在使用它(或自定义 protobuf serde 实现),那么您只需提供该类型的 bean 即可在 Spring Cloud Stream 应用程序中注册它。见下文。

@Bean
public KafkaProtobufSerde<Timer> kafkaProtobufSerde() {
}

Spring Cloud Stream 中的 Kafka Streams binder 将检测此 bean 并匹配您的使用者类型。

基于以下评论中共享的示例应用程序进行更新

在您的配置中进行以下更改后,我可以运行您的示例应用程序:

spring:
  cloud:
    stream:
      function:
        definition: process
      bindings:
        process-in-0:
          group: consumer-group
          concurrency: 1
          headerMode: none
          destination: MESSAGING-TIMER-EXAMPLE
      kafka:
        streams:
          binder:
            configuration:
              schema.registry.url: http://localhost:8081

你需要kafka.streams.binder前缀。进行更改后,我可以启动应用程序而不会出现任何错误。我在启动日志中看到了这些。

2021-07-29 15:10:42.995  INFO 65137 --- [           main] .c.s.b.k.s.KafkaStreamsFunctionProcessor : Key Serde used for process-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde
2021-07-29 15:10:42.996  INFO 65137 --- [           main] .c.s.b.k.s.KafkaStreamsFunctionProcessor : Value Serde used for process-in-0: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
...
...
2021-07-29 15:10:43.629  INFO 65137 --- [           main] c.j.ListenerMessagingStreamsApplication  : Started ListenerMessagingStreamsApplication in 1.836 seconds (JVM running for 2.305)

我注意到您将 yaml 配置文件命名为application-LOCAL.yml. 如果这是您正在使用的,请确保在运行时设置此属性--spring.config.name=application-LOCAL。否则,将该文件重命名为application.yml.

于 2021-07-28T15:53:08.523 回答