0
import KafkaDataType;
...
...
final Serde<KafkaDataType> eventSchema = new SpecificAvroSerde<>();
...
...
StreamsBuilder builder = new StreamsBuilder();
KStream<String, KafkaDataType> eventStream = builder.stream(STREAM_TOPIC);

MyKafkaDataType是从关联.avsc文件自动生成的 avro 模式。我的理解是KafkaDataType必须预先定义。但是,是否存在允许动态或通用的现有方法KafkaDataType?如果是这样,将不胜感激示例代码块。

目标是KafkaDataType成为一种通用数据类型,以便可以交换具有不同 avro 模式的不同 Kafka 流并由 Java 代码处理。目前,对于每个不同的 avro 模式,我需要KafkaDataType从模式中更改为特定的 Java 自动生成的类.avsc

让我知道是否需要进一步澄清。

4

1 回答 1

0

我的理解是 KafkaDataType 必须是预定义的。

对于使用 SpecificRecord / serde,是的

但是,是否存在允许动态或通用 KafkaDataType 的现有方法?

Avro SpecificRecord 类(例如生成的类)扩展自GenericRecord,您可以将其与 GenericSerde 一起使用而不是您的特定类型来处理同一流中的多种类型的记录

于 2021-11-04T13:38:19.780 回答