-2

当我使用 kafka 流加入 Windows 时,自动创建一个 avro 模式

像这样 “* KSTREAM-JOINTHIS-0000000125-store-changelog-value”**

我想知道,为什么这可以创建 avro 模式?

有我的代码:

Serde<FactCallProviderMessage> specificAvroSerdeForCallProviderMessage = ProcessStreamUtil.getAndRegisterSerde(isKeySerde);
        KStream<String, FactCallProviderMessage> callProviderMessageKStream = builder.stream(
                callProviderMessageTopic /* input topic */,
                Consumed.with(Serdes.String(), specificAvroSerdeForCallProviderMessage));


public static <T extends SpecificRecord> Serde<T> getAndRegisterSerde(boolean isKeySerde) {
        Serde<T> specificAvroSerde = new SpecificAvroSerde<T>();
        specificAvroSerde.configure(Collections.singletonMap(
                AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
                MyConfig.getSchemaRegistryUrl()),
                isKeySerde);
        return specificAvroSerde;
    }

4

1 回答 1

0

Kafka Stream 为有状态的操作者创建所谓的更改日志主题,例如在后台加入以在 Kafka 集群中以容错方式备份状态。

如果您对输入消息使用 Avro 格式,您的输入消息将以 Avro 格式写入此更改日志主题。因此,在写入时,将为此更改日志主题注册相应的模式。

于 2018-01-14T01:58:53.990 回答