import KafkaDataType;
...
...
final Serde<KafkaDataType> eventSchema = new SpecificAvroSerde<>();
...
...
StreamsBuilder builder = new StreamsBuilder();
KStream<String, KafkaDataType> eventStream = builder.stream(STREAM_TOPIC);
MyKafkaDataType
是从关联.avsc
文件自动生成的 avro 模式。我的理解是KafkaDataType
必须预先定义。但是,是否存在允许动态或通用的现有方法KafkaDataType
?如果是这样,将不胜感激示例代码块。
目标是KafkaDataType
成为一种通用数据类型,以便可以交换具有不同 avro 模式的不同 Kafka 流并由 Java 代码处理。目前,对于每个不同的 avro 模式,我需要KafkaDataType
从模式中更改为特定的 Java 自动生成的类.avsc
。
让我知道是否需要进一步澄清。