0

我正在尝试了解 GlobalKtable 在 kafka 中的工作方式,为此我正在尝试编写示例代码。我已经创建了 globalKtable 但我也想看看我已经尝试过 peek 功能但它不可用,现在我正在尝试通过视图但它给出了编译时错误。

在scala中查看globalktable的写法是什么?

我试过的是

'''
 val genderGlobalTable: GlobalKTable[String, abc] = builder
        .globalTable(kafkaStreamConfig.getString("abc-topic"),
          Materialized.as("abcStore")
            .withKeySerde(stringSerde)
            .withValueSerde(abcSerde))
    
      implicit val streams: KafkaStreams = new KafkaStreams(builder.build(), properties)
    
      val view: ReadOnlyKeyValueStore[String, abc] = streams
        .store("abcStore", new QueryableStoreType[abc])

'''
4

1 回答 1

0

您不能创建一个QueryableStoreType接口的实例。相反,您需要使用工厂类QueryableStoreTypes(注意,名称是复数,而接口名称是单数)来获取您需要的类型。

GlobalKTable可以使用QueryableStoreTypes.keyValueStore()(或取决于您的 Kafka Streams 版本QueryableStoreTypes.timestampedKeyValueStore())。

于 2020-09-02T05:01:17.253 回答