0

Spring Cloud Stream Kafka,KTable 作为输入不起作用

接收器.java

public interface EventSink {
    @Input("inputTable")
    KTable<?, ?> inputTable();
}

消息接收器.java

@EnableBinding(EventSink .class)
public class MessageReceiver {

    @StreamListener
    public void process(@Input("inputTable") KTable<String, Event> KTable) {

        // below code is just for representation. I need to do lot of things after getting this KTable
        KTable.toStream()
                .foreach((key, value) -> System.out.println(value));
    }
}

应用程序.yml

server:
  port: 8083

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            application-id: kafka-stream-demo
            configuration:
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.springframework.kafka.support.serializer.JsonSerde
          bindings:
            inputTable:
              materialized-as: event_store
        binder:
          brokers: localhost:9092
      bindings:
        inputTable:
          destination: nscevent
          group: nsceventGroup

我得到以下错误

Exception in thread "kafka-stream-demo-1e64cf93-de19-4185-bee4-8fc882275010-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:677)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.util.Assert.state(Assert.java:73)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:370)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    ... 7 more

有人可以请教是什么问题吗?使用 KStream 作为输入,它可以工作,但不能作为 KTable。提前致谢

4

1 回答 1

0

KTable 始终使用 Kafka Streams 的本机 Serde 功能进行转换。KTable 上没有进行框架级别的转换(尽管添加它存在问题)。由于您使用自定义类型的值,因此您需要指定适当的 Serde 而不是使用默认的 String serde。您可以将这些添加到配置中。

spring.cloud.stream.kafka.streams.binder.configuration:
  default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
  spring.json.value.default.type: RawAccounting

KTable 不会自动转换为输入通道

于 2019-11-13T08:46:55.607 回答