0

我似乎无法将主题的序列化程序覆盖为Serdes.String(). 我正在尝试一个从主题(流)读取并写入 KTable 的简单用例。到目前为止我所拥有的:

@Component
class Processor {
    @Autowired
    public void process(final StreamsBuilder builder) {
        final Serde<String> stringSerde = Serdes.String();
        builder.stream("input_topic", Consumed.with(stringSerde, stringSerde))
                .filter((key, value) -> value.contains("ACTION"))
                .toTable(Materialized.as("output_table_materialized"))
                .toStream().to("output_table", Produced.with(stringSerde, stringSerde)); // EDIT: added this last line

    }
}

我得到的例外是:

org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

根据我收集的信息,它理解消息是 aString但它正在使用默认的反序列化器ByteArraySerializer。我在上面的代码中哪里出错了?

4

1 回答 1

-1

The Consumed.with would be a Deserializer.

The error is on the Serializer, or the toTable call, which you may add Produced.with or modify your application properties to configure the defaults there

于 2020-12-16T00:23:30.613 回答