0

为了使用 Apache Beam 处理 Avro 编码的消息KafkaIO,需要传递一个 的实例ConfluentSchemaRegistryDeserializerProvider作为值反序列化器。

一个典型的例子如下所示:

PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
  .apply(KafkaIO.<Long, GenericRecord>read()
     .withBootstrapServers("kafka-broker:9092")
     .withTopic("my_topic")
     .withKeyDeserializer(LongDeserializer.class)
     .withValueDeserializer(
         ConfluentSchemaRegistryDeserializerProvider.of("http://my-local-schema-registry:8081", "my_subject"))

但是,我想使用的一些 Kafka 主题有多个不同的主题(事件类型)(出于排序原因)。因此,我无法提前提供一个固定的主题名称。如何解决这个困境?

(我的目标是最终BigQueryIO将这些事件推送到云端。)

4

1 回答 1

1

您可以进行多次阅读,每个主题一次,然后将它们展平

于 2021-07-01T07:20:33.193 回答