我似乎无法将主题的序列化程序覆盖为Serdes.String()
. 我正在尝试一个从主题(流)读取并写入 KTable 的简单用例。到目前为止我所拥有的:
@Component
class Processor {
@Autowired
public void process(final StreamsBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
builder.stream("input_topic", Consumed.with(stringSerde, stringSerde))
.filter((key, value) -> value.contains("ACTION"))
.toTable(Materialized.as("output_table_materialized"))
.toStream().to("output_table", Produced.with(stringSerde, stringSerde)); // EDIT: added this last line
}
}
我得到的例外是:
org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
根据我收集的信息,它理解消息是 aString
但它正在使用默认的反序列化器ByteArraySerializer
。我在上面的代码中哪里出错了?